Skip to content

Commit

Permalink
Loading elasticsearch data as bulk and not one by one (#3817)
Browse files Browse the repository at this point in the history
Signed-off-by: Avi Liani <[email protected]>
  • Loading branch information
Avilir authored Feb 8, 2021
1 parent 1110a18 commit f4384f5
Showing 1 changed file with 44 additions and 14 deletions.
58 changes: 44 additions & 14 deletions ocs_ci/ocs/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import time
import json

from elasticsearch import Elasticsearch, exceptions as esexp
from elasticsearch import Elasticsearch, helpers, exceptions as esexp

from ocs_ci.ocs import constants
from ocs_ci.ocs.ocp import OCP
Expand All @@ -28,15 +28,46 @@ def elasticsearch_load(connection, target_path):
Load all data from target_path/results into an elasticsearch (es) server.
Args:
connection (dict): a dictionary with the main elasticsearch server information
{host: the server ip address, port: the port to connect}
connection (obj): an elasticsearch connection object
target_path (str): the path where data was dumped into
Returns:
bool: True if loading data succeed, False otherwise
"""

# define a function that will load a text file
def get_data_from_text_file(json_file):
"""
This function will return a list of docs stored in a text file
Args:
json_file (str): the file name to look for docs in
Returns:
list : list of documents as json dicts
"""

docs = [
l.strip() for l in open(str(json_file), encoding="utf8", errors="ignore")
]
log.info(f"String docs length: {len(docs)}")
doc_list = []

for num, doc in enumerate(docs):
try:
dict_doc = json.loads(doc)
doc_list += [dict_doc]
except json.decoder.JSONDecodeError as err:
# print the errors
log.error(
f"ERROR for num: {num} -- JSONDecodeError: {err} for doc: {doc}"
)

log.info(f"Dict docs length: {len(doc_list)}")
return doc_list

all_files = run_command(f"ls {target_path}/results/", out_format="list")
if "Error in command" in all_files:
log.error("There is No data to load into ES server")
Expand All @@ -51,17 +82,16 @@ def elasticsearch_load(connection, target_path):
file_name = f"{target_path}/results/{ind}"
ind_name = ind.split(".")[0]
log.info(f"Loading the {ind} data into the ES server")
with open(file_name) as json_file:
while True:
line = json_file.readline()
if line:
full_data = json.loads(line)
log.debug(f"Loading {full_data} into the ES")
connection.index(
index=ind_name, doc_type="_doc", body=full_data
)
else:
break
docs_list = get_data_from_text_file(file_name)

try:
log.info(
"Attempting to index the list of docs using helpers.bulk()"
)
resp = helpers.bulk(connection, docs_list, index=ind_name)
log.info(f"helpers.bulk() RESPONSE: {resp}")
except Exception as err:
log.error(f"Elasticsearch helpers.bulk() ERROR:{err}")
return True


Expand Down

0 comments on commit f4384f5

Please sign in to comment.