diff --git a/ocs_ci/ocs/elasticsearch.py b/ocs_ci/ocs/elasticsearch.py index 860f5a8fff1..311d0a69f63 100644 --- a/ocs_ci/ocs/elasticsearch.py +++ b/ocs_ci/ocs/elasticsearch.py @@ -8,7 +8,7 @@ import time import json -from elasticsearch import Elasticsearch, exceptions as esexp +from elasticsearch import Elasticsearch, helpers, exceptions as esexp from ocs_ci.ocs import constants from ocs_ci.ocs.ocp import OCP @@ -28,8 +28,7 @@ def elasticsearch_load(connection, target_path): Load all data from target_path/results into an elasticsearch (es) server. Args: - connection (dict): a dictionary with the main elasticsearch server information - {host: the server ip address, port: the port to connect} + connection (obj): an elasticsearch connection object target_path (str): the path where data was dumped into Returns: @@ -37,6 +36,38 @@ def elasticsearch_load(connection, target_path): """ + # define a function that will load a text file + def get_data_from_text_file(json_file): + """ + This function will return a list of docs stored in a text file + + Args: + json_file (str): the file name to look for docs in + + Returns: + list : list of documents as json dicts + + """ + + docs = [ + l.strip() for l in open(str(json_file), encoding="utf8", errors="ignore") + ] + log.info(f"String docs length: {len(docs)}") + doc_list = [] + + for num, doc in enumerate(docs): + try: + dict_doc = json.loads(doc) + doc_list += [dict_doc] + except json.decoder.JSONDecodeError as err: + # print the errors + log.error( + f"ERROR for num: {num} -- JSONDecodeError: {err} for doc: {doc}" + ) + + log.info(f"Dict docs length: {len(doc_list)}") + return doc_list + all_files = run_command(f"ls {target_path}/results/", out_format="list") if "Error in command" in all_files: log.error("There is No data to load into ES server") @@ -51,17 +82,16 @@ def elasticsearch_load(connection, target_path): file_name = f"{target_path}/results/{ind}" ind_name = ind.split(".")[0] log.info(f"Loading the {ind} data into the ES server") - with open(file_name) as json_file: - while True: - line = json_file.readline() - if line: - full_data = json.loads(line) - log.debug(f"Loading {full_data} into the ES") - connection.index( - index=ind_name, doc_type="_doc", body=full_data - ) - else: - break + docs_list = get_data_from_text_file(file_name) + + try: + log.info( + "Attempting to index the list of docs using helpers.bulk()" + ) + resp = helpers.bulk(connection, docs_list, index=ind_name) + log.info(f"helpers.bulk() RESPONSE: {resp}") + except Exception as err: + log.error(f"Elasticsearch helpers.bulk() ERROR:{err}") return True