Skip to content

Commit

Permalink
[Localhost] Fix function get_obj
Browse files Browse the repository at this point in the history
  • Loading branch information
JosepSampe committed Jun 4, 2024
1 parent fc347c5 commit 5686a80
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 20 deletions.
6 changes: 3 additions & 3 deletions lithops/localhost/v1/localhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ def job_manager():
logger.debug(f'ExecutorID {executor_id} | JobID {job_id} - Running '
f'{total_calls} activations in the localhost worker')
process = self.env.run_job(job_key, job_filename)
stdout, stderr = process.communicate() # blocks until the process finishes
process.communicate() # blocks until the process finishes
if process.returncode != 0:
logger.error(f"ExecutorID {executor_id} | JobID {job_id} - Job failed with return code {process.returncode}")
logger.error(f"ExecutorID {executor_id} | JobID {job_id} - Error output from job process: {stderr}")
logger.error(f"ExecutorID {executor_id} | JobID {job_id} - Job "
f"process failed with return code {process.returncode}")
logger.debug(f'ExecutorID {executor_id} | JobID {job_id} - Execution finished')

if self.job_queue.empty():
Expand Down
7 changes: 2 additions & 5 deletions lithops/localhost/v2/localhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,9 @@ def run_task(self, job_key, call_id):
cmd = [self.runtime_name, RUNNER_FILE, 'run_job', task_filename]
process = sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE, start_new_session=True)
self.task_processes[job_key_call_id] = process
stdout, stderr = process.communicate() # blocks until the process finishes
process.communicate() # blocks until the process finishes
if process.returncode != 0:
logger.error(f"Task process {job_key_call_id} failed with return code {process.returncode}")
logger.error(f"Error output from task process {job_key_call_id}: {stderr}")
del self.task_processes[job_key_call_id]
logger.debug(f"Task process {job_key_call_id} finished")

Expand Down Expand Up @@ -439,11 +438,9 @@ def run_task(self, job_key, call_id):

process = sp.Popen(shlex.split(cmd), stdout=sp.PIPE, stderr=sp.PIPE, start_new_session=True)
self.task_processes[job_key_call_id] = process
stdout, stderr = process.communicate() # blocks until the process finishes
process.communicate() # blocks until the process finishes
if process.returncode != 0:
logger.error(f"Task process {job_key_call_id} failed with return code {process.returncode}")
logger.error(f"Error output from task process {job_key_call_id}: {stderr}")
del self.task_processes[job_key_call_id]
logger.debug(f"Task process {job_key_call_id} finished")

def stop(self, job_keys=None):
Expand Down
13 changes: 1 addition & 12 deletions lithops/worker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,9 @@ def get_function_and_modules(job, internal_storage):
func_path = '/'.join([SA_INSTALL_DIR, job.func_key])
with open(func_path, "rb") as f:
func_obj = f.read()
elif os.path.exists(func_path):
logger.info(f"Loading {job.func_key} from local cache")
try:
with open(func_path, 'rb') as f:
func_obj = f.read()
except Exception:
logger.debug(f"Could not load {job.func_key} from local cache")

if not func_obj:
else:
logger.info(f"Loading {job.func_key} from storage")
func_obj = internal_storage.get_func(job.func_key)
os.makedirs(os.path.dirname(func_path), exist_ok=True)
with open(func_path, 'wb') as f:
f.write(func_obj)

loaded_func_all = pickle.loads(func_obj)

Expand Down

0 comments on commit 5686a80

Please sign in to comment.