From a929b298c70653f7f7c78bb0fd9db239e3345efb Mon Sep 17 00:00:00 2001 From: Keerti Talwar Date: Thu, 13 Apr 2023 15:50:39 +0530 Subject: [PATCH 1/5] implemented iterative approach --- openfl/experimental/interface/fl_spec.py | 31 +-- openfl/experimental/placement/placement.py | 7 +- openfl/experimental/runtime/local_runtime.py | 217 ++++++++++++------- 3 files changed, 160 insertions(+), 95 deletions(-) diff --git a/openfl/experimental/interface/fl_spec.py b/openfl/experimental/interface/fl_spec.py index aec582580b..1f1bb6f4a5 100644 --- a/openfl/experimental/interface/fl_spec.py +++ b/openfl/experimental/interface/fl_spec.py @@ -65,6 +65,13 @@ def run(self) -> None: print(f"Created flow {self.__class__.__name__}") try: self.start() + self.runtime.execute_task( + self, + self.execute_task_args[0], + self.execute_task_args[1], + self.execute_task_args[2], + **self.execute_task_args[3], + ) except Exception as e: if "cannot pickle" in str(e) or "Failed to unpickle" in str(e): msg = ( @@ -148,12 +155,12 @@ def _display_transition_logs(self, f: Callable, parent_func: Callable) -> None: elif collaborator_to_aggregator(f, parent_func): print("Sending state from collaborator to aggregator") - def next(self, f: Callable, **kwargs) -> None: + def next(self, next_func: Callable, **kwargs) -> None: """ Next task in the flow to execute Args: - f: The next task that will be executed in the flow + next_func: The next task that will be executed in the flow """ # Get the name and reference to the calling function @@ -165,22 +172,16 @@ def next(self, f: Callable, **kwargs) -> None: # Take back-up of current state of self agg_to_collab_ss = [] - if aggregator_to_collaborator(f, parent_func): + if aggregator_to_collaborator(next_func, parent_func): agg_to_collab_ss = self._capture_instance_snapshot(kwargs=kwargs) # Remove included / excluded attributes from next task - filter_attributes(self, f, **kwargs) + filter_attributes(self, next_func, **kwargs) - if self._is_at_transition_point(f, parent_func): - # Collaborator is done executing for now - return + self._display_transition_logs(next_func, parent_func) - self._display_transition_logs(f, parent_func) + # get the function to be executed + self.to_exec = getattr(self, next_func.__name__) - self._runtime.execute_task( - self, - f, - parent_func, - instance_snapshot=agg_to_collab_ss, - **kwargs, - ) + # update parameters for execute_task function + self.execute_task_args = [next_func, parent_func, agg_to_collab_ss, kwargs] \ No newline at end of file diff --git a/openfl/experimental/placement/placement.py b/openfl/experimental/placement/placement.py index 0662137add..2b60a3ef3e 100644 --- a/openfl/experimental/placement/placement.py +++ b/openfl/experimental/placement/placement.py @@ -27,10 +27,15 @@ def ray_call_put(self, ctx, func): def get_remote_clones(self): clones = deepcopy(ray.get(self.remote_functions)) - del self.remote_functions + cln = clones[0] + if cln._is_at_transition_point(cln.execute_task_args[0], cln.execute_task_args[1]): + del self.remote_functions + else: + self.remote_functions= [] # Remove clones from ray object store for ctx in self.remote_contexts: ray.cancel(ctx) + return clones diff --git a/openfl/experimental/runtime/local_runtime.py b/openfl/experimental/runtime/local_runtime.py index efac60efae..b9b4c7c41f 100644 --- a/openfl/experimental/runtime/local_runtime.py +++ b/openfl/experimental/runtime/local_runtime.py @@ -124,7 +124,6 @@ def execute_task( Performs the execution of a task as defined by the implementation and underlying backend (single_process, ray, etc) on a single node - Args: flspec_obj: Reference to the FLSpec (flow) object. Contains information about task sequence, flow attributes, that are needed to @@ -135,98 +134,158 @@ def execute_task( (i.e. restoring aggregator state after collaborator execution) """ + while f.__name__ != "end": + if "foreach" in kwargs: + # save collab first info + self._collab_start_func,self._collab_start_parent_func,self._collab_start_kwargs,= f, parent_func, kwargs + f, parent_func, instance_snapshot, kwargs = self.execute_foreach_task( + flspec_obj, f, parent_func, instance_snapshot, **kwargs ) + else: + f,parent_func,instance_snapshot,kwargs,= self.execute_no_transition_task(flspec_obj) + else: + self.execute_end_task(flspec_obj, f) + + def execute_no_transition_task(self, flspec_obj): + flspec_obj.to_exec() + # update the params + return flspec_obj.execute_task_args + + def execute_end_task(self, flspec_obj, f): + from openfl.experimental.interface import (final_attributes) + global final_attributes + flspec_obj.to_exec() + checkpoint(flspec_obj, f) + artifacts_iter, _ = generate_artifacts(ctx=flspec_obj) + final_attributes = artifacts_iter() + return + + def execute_foreach_task( + self, flspec_obj, f, parent_func, instance_snapshot, **kwargs + ): from openfl.experimental.interface import ( FLSpec, - final_attributes, ) - global final_attributes + agg_func = None + flspec_obj._foreach_methods.append(f.__name__) + selected_collaborators = flspec_obj.__getattribute__(kwargs["foreach"]) - if "foreach" in kwargs: - flspec_obj._foreach_methods.append(f.__name__) - selected_collaborators = flspec_obj.__getattribute__( - kwargs["foreach"] - ) + self.filter_exclude_include_private_attr( + flspec_obj, f, parent_func, selected_collaborators, **kwargs + ) + + if self.backend == "ray": + ray_executor = RayExecutor() + + for col in selected_collaborators: + clone = FLSpec._clones[col] + # Set new LocalRuntime for clone as it is required + # and also new runtime object will not contain private attributes of + # aggregator or other collaborators + clone.runtime = LocalRuntime(backend="single_process") + + # set collab private attributes + for name, attr in self.__collaborators[ + clone.input + ].private_attributes.items(): + setattr(clone, name, attr) + + # write the clone to the object store + # ensure clone is getting latest _metaflow_interface + clone._metaflow_interface = flspec_obj._metaflow_interface - for col in selected_collaborators: - clone = FLSpec._clones[col] - if ( - "exclude" in kwargs and hasattr(clone, kwargs["exclude"][0]) - ) or ( - "include" in kwargs and hasattr(clone, kwargs["include"][0]) - ): - filter_attributes(clone, f, **kwargs) - artifacts_iter, _ = generate_artifacts(ctx=flspec_obj) - for name, attr in artifacts_iter(): - setattr(clone, name, deepcopy(attr)) - clone._foreach_methods = flspec_obj._foreach_methods - - for col in selected_collaborators: - clone = FLSpec._clones[col] - clone.input = col - if aggregator_to_collaborator(f, parent_func): - # remove private aggregator state - for attr in self._aggregator.private_attributes: - self._aggregator.private_attributes[attr] = getattr( - flspec_obj, attr - ) - if hasattr(clone, attr): - delattr(clone, attr) - - func = None - if self.backend == "ray": - ray_executor = RayExecutor() - for col in selected_collaborators: - clone = FLSpec._clones[col] - # Set new LocalRuntime for clone as it is required - # for calling execute_task and also new runtime - # object will not contain private attributes of - # aggregator or other collaborators - clone.runtime = LocalRuntime(backend="single_process") - for name, attr in self.__collaborators[ - clone.input - ].private_attributes.items(): - setattr(clone, name, attr) + # execute all collab steps for each collab + for each_step in flspec_obj._foreach_methods: to_exec = getattr(clone, f.__name__) - # write the clone to the object store - # ensure clone is getting latest _metaflow_interface - clone._metaflow_interface = flspec_obj._metaflow_interface if self.backend == "ray": ray_executor.ray_call_put(clone, to_exec) else: to_exec() - if self.backend == "ray": + f, parent_func, _, kwargs = clone.execute_task_args + if clone._is_at_transition_point(f, parent_func): + # get collab starting point for next collab to execute + f, parent_func, kwargs = self._collab_start_func,self._collab_start_parent_func,self._collab_start_kwargs + break + + if self.backend == "ray": + # get the initial collab put methods + clones = ray_executor.get_remote_clones() + + # iterate until all collab steps re finished and get the next set of collab steps + while not hasattr( clones[0], 'execute_next'): + for clone_obj in clones: + func_name = clone_obj.execute_task_args[0].name + to_exec = getattr(clone_obj,func_name) + ray_executor.ray_call_put(clone_obj, to_exec) + + # update clone clones = ray_executor.get_remote_clones() - FLSpec._clones.update(zip(selected_collaborators, clones)) - del ray_executor - del clones - gc.collect() - for col in selected_collaborators: - clone = FLSpec._clones[col] - func = clone.execute_next - for attr in self.__collaborators[ - clone.input - ].private_attributes: + + clone = clones[0] + FLSpec._clones.update(zip(selected_collaborators, clones)) + del ray_executor + del clones + gc.collect() + + self.remove_collab_private_attr(selected_collaborators) + + # Restore the flspec_obj state if back-up is taken + self.restore_instance_snapshot(flspec_obj, instance_snapshot) + del instance_snapshot + + # get next aggregator function to be executed + agg_func = clone.execute_next + + g = getattr(flspec_obj, agg_func) + # remove private collaborator state + gc.collect() + g([FLSpec._clones[col] for col in selected_collaborators]) + return flspec_obj.execute_task_args + + def remove_collab_private_attr(self, selected_collaborators): + # Removes private attributes of collaborator after transition + from openfl.experimental.interface import ( + FLSpec, + ) + + for col in selected_collaborators: + clone = FLSpec._clones[col] + for attr in self.__collaborators[clone.input].private_attributes: + if hasattr(clone, attr): + self.__collaborators[clone.input].private_attributes[ + attr + ] = getattr(clone, attr) + delattr(clone, attr) + + def filter_exclude_include_private_attr( + self, flspec_obj, f, parent_func, selected_collaborators, **kwargs + ): + # This function filters exclude/include attributes + # Removes private attributes of aggregator + from openfl.experimental.interface import ( + FLSpec, + ) + + for col in selected_collaborators: + clone = FLSpec._clones[col] + clone.input = col + if ("exclude" in kwargs and hasattr(clone, kwargs["exclude"][0])) or ( + "include" in kwargs and hasattr(clone, kwargs["include"][0]) + ): + filter_attributes(clone, f, **kwargs) + artifacts_iter, _ = generate_artifacts(ctx=flspec_obj) + for name, attr in artifacts_iter(): + setattr(clone, name, deepcopy(attr)) + clone._foreach_methods = flspec_obj._foreach_methods + + # remove private aggregator state + if aggregator_to_collaborator(f, parent_func): + for attr in self._aggregator.private_attributes: + self._aggregator.private_attributes[attr] = getattr( + flspec_obj, attr + ) if hasattr(clone, attr): - self.__collaborators[clone.input].private_attributes[ - attr - ] = getattr(clone, attr) delattr(clone, attr) - # Restore the flspec_obj state if back-up is taken - self.restore_instance_snapshot(flspec_obj, instance_snapshot) - del instance_snapshot - - g = getattr(flspec_obj, func) - # remove private collaborator state - gc.collect() - g([FLSpec._clones[col] for col in selected_collaborators]) - else: - to_exec = getattr(flspec_obj, f.__name__) - to_exec() - if f.__name__ == "end": - checkpoint(flspec_obj, f) - artifacts_iter, _ = generate_artifacts(ctx=flspec_obj) - final_attributes = artifacts_iter() def __repr__(self): return "LocalRuntime" From 2ce92fbc876319e989783bf2845570dd8016d9c5 Mon Sep 17 00:00:00 2001 From: Keerti Talwar Date: Fri, 14 Apr 2023 15:09:56 +0530 Subject: [PATCH 2/5] restrctered functionality --- openfl/experimental/interface/fl_spec.py | 19 +-- openfl/experimental/placement/placement.py | 7 +- openfl/experimental/runtime/local_runtime.py | 167 +++++++++---------- 3 files changed, 91 insertions(+), 102 deletions(-) diff --git a/openfl/experimental/interface/fl_spec.py b/openfl/experimental/interface/fl_spec.py index 1f1bb6f4a5..a9baea7ef7 100644 --- a/openfl/experimental/interface/fl_spec.py +++ b/openfl/experimental/interface/fl_spec.py @@ -23,7 +23,6 @@ class FLSpec: - _clones = [] _initial_state = None @@ -65,6 +64,8 @@ def run(self) -> None: print(f"Created flow {self.__class__.__name__}") try: self.start() + + # execute_task_args will be updated in self.start() after the next function is executed self.runtime.execute_task( self, self.execute_task_args[0], @@ -137,9 +138,7 @@ def _is_at_transition_point(self, f: Callable, parent_func: Callable) -> bool: if parent_func.__name__ in self._foreach_methods: self._foreach_methods.append(f.__name__) if should_transfer(f, parent_func): - print( - f"Should transfer from {parent_func.__name__} to {f.__name__}" - ) + print(f"Should transfer from {parent_func.__name__} to {f.__name__}") self.execute_next = f.__name__ return True return False @@ -155,7 +154,7 @@ def _display_transition_logs(self, f: Callable, parent_func: Callable) -> None: elif collaborator_to_aggregator(f, parent_func): print("Sending state from collaborator to aggregator") - def next(self, next_func: Callable, **kwargs) -> None: + def next(self, f: Callable, **kwargs) -> None: """ Next task in the flow to execute @@ -172,16 +171,16 @@ def next(self, next_func: Callable, **kwargs) -> None: # Take back-up of current state of self agg_to_collab_ss = [] - if aggregator_to_collaborator(next_func, parent_func): + if aggregator_to_collaborator(f, parent_func): agg_to_collab_ss = self._capture_instance_snapshot(kwargs=kwargs) # Remove included / excluded attributes from next task - filter_attributes(self, next_func, **kwargs) + filter_attributes(self, f, **kwargs) - self._display_transition_logs(next_func, parent_func) + self._display_transition_logs(f, parent_func) # get the function to be executed - self.to_exec = getattr(self, next_func.__name__) + self.to_exec = getattr(self, f.__name__) # update parameters for execute_task function - self.execute_task_args = [next_func, parent_func, agg_to_collab_ss, kwargs] \ No newline at end of file + self.execute_task_args = [f, parent_func, agg_to_collab_ss, kwargs] diff --git a/openfl/experimental/placement/placement.py b/openfl/experimental/placement/placement.py index 2b60a3ef3e..493acf9302 100644 --- a/openfl/experimental/placement/placement.py +++ b/openfl/experimental/placement/placement.py @@ -27,15 +27,10 @@ def ray_call_put(self, ctx, func): def get_remote_clones(self): clones = deepcopy(ray.get(self.remote_functions)) - cln = clones[0] - if cln._is_at_transition_point(cln.execute_task_args[0], cln.execute_task_args[1]): - del self.remote_functions - else: - self.remote_functions= [] + self.remote_functions =[] # Remove clones from ray object store for ctx in self.remote_contexts: ray.cancel(ctx) - return clones diff --git a/openfl/experimental/runtime/local_runtime.py b/openfl/experimental/runtime/local_runtime.py index b9b4c7c41f..6a0e18715f 100644 --- a/openfl/experimental/runtime/local_runtime.py +++ b/openfl/experimental/runtime/local_runtime.py @@ -9,6 +9,7 @@ import gc from openfl.experimental.runtime import Runtime from typing import TYPE_CHECKING + if TYPE_CHECKING: from openfl.experimental.interface import Aggregator, Collaborator, FLSpec from openfl.experimental.placement import RayExecutor @@ -101,9 +102,7 @@ def collaborators(self, collaborators: List[Type[Collaborator]]): } def restore_instance_snapshot( - self, - ctx: Type[FLSpec], - instance_snapshot: List[Type[FLSpec]] + self, ctx: Type[FLSpec], instance_snapshot: List[Type[FLSpec]] ): """Restores attributes from backup (in instance snapshot) to ctx""" for backup in instance_snapshot: @@ -118,40 +117,43 @@ def execute_task( f: Callable, parent_func: Callable, instance_snapshot: List[Type[FLSpec]] = [], - **kwargs + **kwargs, ): """ - Performs the execution of a task as defined by the - implementation and underlying backend (single_process, ray, etc) - on a single node + Defines which function to be executed based on name and kwargs + Updates the arguments and executes until end is not reached + Args: flspec_obj: Reference to the FLSpec (flow) object. Contains information - about task sequence, flow attributes, that are needed to - execute a future task + about task sequence, flow attributes. f: The next task to be executed within the flow parent_func: The prior task executed in the flow instance_snapshot: A prior FLSpec state that needs to be restored from (i.e. restoring aggregator state after collaborator execution) """ + while f.__name__ != "end": if "foreach" in kwargs: - # save collab first info - self._collab_start_func,self._collab_start_parent_func,self._collab_start_kwargs,= f, parent_func, kwargs f, parent_func, instance_snapshot, kwargs = self.execute_foreach_task( - flspec_obj, f, parent_func, instance_snapshot, **kwargs ) + flspec_obj, f, parent_func, instance_snapshot, **kwargs + ) else: - f,parent_func,instance_snapshot,kwargs,= self.execute_no_transition_task(flspec_obj) + f, parent_func, instance_snapshot, kwargs = self.execute_agg_task( + flspec_obj + ) else: self.execute_end_task(flspec_obj, f) - def execute_no_transition_task(self, flspec_obj): + def execute_agg_task(self, flspec_obj): + """Performs execution of aggregator task""" flspec_obj.to_exec() - # update the params return flspec_obj.execute_task_args def execute_end_task(self, flspec_obj, f): - from openfl.experimental.interface import (final_attributes) + """Performs execution of end task""" + from openfl.experimental.interface import final_attributes + global final_attributes flspec_obj.to_exec() checkpoint(flspec_obj, f) @@ -162,25 +164,42 @@ def execute_end_task(self, flspec_obj, f): def execute_foreach_task( self, flspec_obj, f, parent_func, instance_snapshot, **kwargs ): - from openfl.experimental.interface import ( - FLSpec, - ) + """ + Performs + 1. Filter include/exclude + 2. Remove aggregator private attributes + 3. Set runtime, collab private attributes , metaflow_interface + 4. Execution of all collaborator for each task + 5. Remove collaborator private attributes + 6. Execute the next function after transition + """ + from openfl.experimental.interface import FLSpec - agg_func = None flspec_obj._foreach_methods.append(f.__name__) - selected_collaborators = flspec_obj.__getattribute__(kwargs["foreach"]) + selected_collaborators = getattr(flspec_obj, kwargs["foreach"]) + + # filter exclude/include attributes for clone + self.filter_exclude_include(flspec_obj, f, selected_collaborators, **kwargs) + + # Remove aggregator private attributes + for col in selected_collaborators: + clone = FLSpec._clones[col] + if aggregator_to_collaborator(f, parent_func): + for attr in self._aggregator.private_attributes: + self._aggregator.private_attributes[attr] = getattr( + flspec_obj, attr + ) + if hasattr(clone, attr): + delattr(clone, attr) - self.filter_exclude_include_private_attr( - flspec_obj, f, parent_func, selected_collaborators, **kwargs - ) - if self.backend == "ray": ray_executor = RayExecutor() + # set runtime,collab private attributes and metaflowinterface for col in selected_collaborators: clone = FLSpec._clones[col] # Set new LocalRuntime for clone as it is required - # and also new runtime object will not contain private attributes of + # new runtime object will not contain private attributes of # aggregator or other collaborators clone.runtime = LocalRuntime(backend="single_process") @@ -194,60 +213,41 @@ def execute_foreach_task( # ensure clone is getting latest _metaflow_interface clone._metaflow_interface = flspec_obj._metaflow_interface - # execute all collab steps for each collab - for each_step in flspec_obj._foreach_methods: + # For initial step assume there is no trasition from collab_to_agg + not_at_transition_point = True + + # loop until there is no transition + while not_at_transition_point: + # execute to_exec for for each collab + for collab in selected_collaborators: + clone = FLSpec._clones[collab] + # get the function to be executed to_exec = getattr(clone, f.__name__) + if self.backend == "ray": ray_executor.ray_call_put(clone, to_exec) else: to_exec() - f, parent_func, _, kwargs = clone.execute_task_args - if clone._is_at_transition_point(f, parent_func): - # get collab starting point for next collab to execute - f, parent_func, kwargs = self._collab_start_func,self._collab_start_parent_func,self._collab_start_kwargs - break - if self.backend == "ray": - # get the initial collab put methods - clones = ray_executor.get_remote_clones() - - # iterate until all collab steps re finished and get the next set of collab steps - while not hasattr( clones[0], 'execute_next'): - for clone_obj in clones: - func_name = clone_obj.execute_task_args[0].name - to_exec = getattr(clone_obj,func_name) - ray_executor.ray_call_put(clone_obj, to_exec) - - # update clone + if self.backend == "ray": + # Execute the collab steps clones = ray_executor.get_remote_clones() - - clone = clones[0] - FLSpec._clones.update(zip(selected_collaborators, clones)) - del ray_executor - del clones - gc.collect() - - self.remove_collab_private_attr(selected_collaborators) + FLSpec._clones.update(zip(selected_collaborators, clones)) - # Restore the flspec_obj state if back-up is taken - self.restore_instance_snapshot(flspec_obj, instance_snapshot) - del instance_snapshot + # update the next arguments + f, parent_func, _, kwargs = FLSpec._clones[collab].execute_task_args - # get next aggregator function to be executed - agg_func = clone.execute_next - - g = getattr(flspec_obj, agg_func) - # remove private collaborator state - gc.collect() - g([FLSpec._clones[col] for col in selected_collaborators]) - return flspec_obj.execute_task_args + # check for transition + if FLSpec._clones[collab]._is_at_transition_point(f, parent_func): + not_at_transition_point = False - def remove_collab_private_attr(self, selected_collaborators): - # Removes private attributes of collaborator after transition - from openfl.experimental.interface import ( - FLSpec, - ) + # remove clones after transition + if self.backend == "ray": + del ray_executor + del clones + gc.collect() + # Removes collaborator private attributes after transition for col in selected_collaborators: clone = FLSpec._clones[col] for attr in self.__collaborators[clone.input].private_attributes: @@ -257,14 +257,18 @@ def remove_collab_private_attr(self, selected_collaborators): ] = getattr(clone, attr) delattr(clone, attr) - def filter_exclude_include_private_attr( - self, flspec_obj, f, parent_func, selected_collaborators, **kwargs - ): - # This function filters exclude/include attributes - # Removes private attributes of aggregator - from openfl.experimental.interface import ( - FLSpec, - ) + # Restore the flspec_obj state if back-up is taken + self.restore_instance_snapshot(flspec_obj, instance_snapshot) + del instance_snapshot + + g = getattr(flspec_obj, f.__name__) + gc.collect() + g([FLSpec._clones[col] for col in selected_collaborators]) + return flspec_obj.execute_task_args + + def filter_exclude_include(self, flspec_obj, f, selected_collaborators, **kwargs): + """This function filters exclude/include attributes""" + from openfl.experimental.interface import FLSpec for col in selected_collaborators: clone = FLSpec._clones[col] @@ -278,14 +282,5 @@ def filter_exclude_include_private_attr( setattr(clone, name, deepcopy(attr)) clone._foreach_methods = flspec_obj._foreach_methods - # remove private aggregator state - if aggregator_to_collaborator(f, parent_func): - for attr in self._aggregator.private_attributes: - self._aggregator.private_attributes[attr] = getattr( - flspec_obj, attr - ) - if hasattr(clone, attr): - delattr(clone, attr) - def __repr__(self): return "LocalRuntime" From 83fb9509f9d6cbbcdbd379715d2db58254953599 Mon Sep 17 00:00:00 2001 From: Keerti Talwar Date: Fri, 14 Apr 2023 15:16:42 +0530 Subject: [PATCH 3/5] fixed lint suggestions --- openfl/experimental/interface/fl_spec.py | 3 ++- openfl/experimental/placement/placement.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/openfl/experimental/interface/fl_spec.py b/openfl/experimental/interface/fl_spec.py index a9baea7ef7..e4545400ea 100644 --- a/openfl/experimental/interface/fl_spec.py +++ b/openfl/experimental/interface/fl_spec.py @@ -65,7 +65,8 @@ def run(self) -> None: try: self.start() - # execute_task_args will be updated in self.start() after the next function is executed + # execute_task_args will be updated in self.start() + # after the next function is executed self.runtime.execute_task( self, self.execute_task_args[0], diff --git a/openfl/experimental/placement/placement.py b/openfl/experimental/placement/placement.py index 493acf9302..105676043a 100644 --- a/openfl/experimental/placement/placement.py +++ b/openfl/experimental/placement/placement.py @@ -27,7 +27,7 @@ def ray_call_put(self, ctx, func): def get_remote_clones(self): clones = deepcopy(ray.get(self.remote_functions)) - self.remote_functions =[] + self.remote_functions = [] # Remove clones from ray object store for ctx in self.remote_contexts: ray.cancel(ctx) From 7e3501168e8149890a7ce64bbd3b144e0b9bdd13 Mon Sep 17 00:00:00 2001 From: Keerti Talwar Date: Fri, 14 Apr 2023 19:16:40 +0530 Subject: [PATCH 4/5] incorported review comments --- openfl/experimental/interface/fl_spec.py | 5 +- openfl/experimental/placement/placement.py | 2 + openfl/experimental/runtime/local_runtime.py | 62 ++++++++++++++++---- 3 files changed, 55 insertions(+), 14 deletions(-) diff --git a/openfl/experimental/interface/fl_spec.py b/openfl/experimental/interface/fl_spec.py index e4545400ea..257284ea98 100644 --- a/openfl/experimental/interface/fl_spec.py +++ b/openfl/experimental/interface/fl_spec.py @@ -160,7 +160,7 @@ def next(self, f: Callable, **kwargs) -> None: Next task in the flow to execute Args: - next_func: The next task that will be executed in the flow + f: The next task that will be executed in the flow """ # Get the name and reference to the calling function @@ -180,8 +180,5 @@ def next(self, f: Callable, **kwargs) -> None: self._display_transition_logs(f, parent_func) - # get the function to be executed - self.to_exec = getattr(self, f.__name__) - # update parameters for execute_task function self.execute_task_args = [f, parent_func, agg_to_collab_ss, kwargs] diff --git a/openfl/experimental/placement/placement.py b/openfl/experimental/placement/placement.py index 105676043a..6a8afd7671 100644 --- a/openfl/experimental/placement/placement.py +++ b/openfl/experimental/placement/placement.py @@ -27,6 +27,8 @@ def ray_call_put(self, ctx, func): def get_remote_clones(self): clones = deepcopy(ray.get(self.remote_functions)) + # delete remote_functions to free ray memory and reinitialize + del self.remote_functions self.remote_functions = [] # Remove clones from ray object store for ctx in self.remote_contexts: diff --git a/openfl/experimental/runtime/local_runtime.py b/openfl/experimental/runtime/local_runtime.py index 6a0e18715f..af1556b6ae 100644 --- a/openfl/experimental/runtime/local_runtime.py +++ b/openfl/experimental/runtime/local_runtime.py @@ -22,6 +22,7 @@ from typing import List from typing import Type from typing import Callable +import importlib class LocalRuntime(Runtime): @@ -140,22 +141,43 @@ def execute_task( ) else: f, parent_func, instance_snapshot, kwargs = self.execute_agg_task( - flspec_obj + flspec_obj, f ) else: self.execute_end_task(flspec_obj, f) - def execute_agg_task(self, flspec_obj): - """Performs execution of aggregator task""" - flspec_obj.to_exec() + def execute_agg_task(self, flspec_obj, f): + """ + Performs execution of aggregator task + Args: + flspec_obj : Reference to the FLSpec (flow) object + f : The task to be executed within the flow + + Returns: + list: updated arguments to be executed + """ + + to_exec = getattr(flspec_obj, f.__name__) + to_exec() return flspec_obj.execute_task_args def execute_end_task(self, flspec_obj, f): - """Performs execution of end task""" - from openfl.experimental.interface import final_attributes + """ + Performs execution of end task + Args: + flspec_obj : Reference to the FLSpec (flow) object + f : The task to be executed within the flow + + Returns: + list: updated arguments to be executed + """ global final_attributes - flspec_obj.to_exec() + final_attr_module = importlib.import_module("openfl.experimental.interface") + final_attributes = getattr(final_attr_module, "final_attributes") + + to_exec = getattr(flspec_obj, f.__name__) + to_exec() checkpoint(flspec_obj, f) artifacts_iter, _ = generate_artifacts(ctx=flspec_obj) final_attributes = artifacts_iter() @@ -172,9 +194,20 @@ def execute_foreach_task( 4. Execution of all collaborator for each task 5. Remove collaborator private attributes 6. Execute the next function after transition + + Args: + flspec_obj : Reference to the FLSpec (flow) object + f : The task to be executed within the flow + parent_func : The prior task executed in the flow + instance_snapshot : A prior FLSpec state that needs to be restored + + Returns: + list: updated arguments to be executed """ - from openfl.experimental.interface import FLSpec + final_attr_module = importlib.import_module("openfl.experimental.interface") + FLSpec = getattr(final_attr_module, "FLSpec") + flspec_obj._foreach_methods.append(f.__name__) selected_collaborators = getattr(flspec_obj, kwargs["foreach"]) @@ -267,8 +300,17 @@ def execute_foreach_task( return flspec_obj.execute_task_args def filter_exclude_include(self, flspec_obj, f, selected_collaborators, **kwargs): - """This function filters exclude/include attributes""" - from openfl.experimental.interface import FLSpec + """ + This function filters exclude/include attributes + Args: + flspec_obj : Reference to the FLSpec (flow) object + f : The task to be executed within the flow + selected_collaborators : all collaborators + """ + + final_attr_module = importlib.import_module("openfl.experimental.interface") + FLSpec = getattr(final_attr_module, "FLSpec") + for col in selected_collaborators: clone = FLSpec._clones[col] From eca9f7de58a4ccf4de9cfa6948fb0e081fa77ef9 Mon Sep 17 00:00:00 2001 From: Keerti Talwar Date: Fri, 14 Apr 2023 21:15:00 +0530 Subject: [PATCH 5/5] fixed lint errors --- openfl/experimental/runtime/local_runtime.py | 30 +++++++++----------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/openfl/experimental/runtime/local_runtime.py b/openfl/experimental/runtime/local_runtime.py index af1556b6ae..0bddb126d6 100644 --- a/openfl/experimental/runtime/local_runtime.py +++ b/openfl/experimental/runtime/local_runtime.py @@ -205,9 +205,8 @@ def execute_foreach_task( list: updated arguments to be executed """ - final_attr_module = importlib.import_module("openfl.experimental.interface") - FLSpec = getattr(final_attr_module, "FLSpec") - + flspec_module = importlib.import_module("openfl.experimental.interface") + flspec_class = getattr(flspec_module, "FLSpec") flspec_obj._foreach_methods.append(f.__name__) selected_collaborators = getattr(flspec_obj, kwargs["foreach"]) @@ -216,7 +215,7 @@ def execute_foreach_task( # Remove aggregator private attributes for col in selected_collaborators: - clone = FLSpec._clones[col] + clone = flspec_class._clones[col] if aggregator_to_collaborator(f, parent_func): for attr in self._aggregator.private_attributes: self._aggregator.private_attributes[attr] = getattr( @@ -230,7 +229,7 @@ def execute_foreach_task( # set runtime,collab private attributes and metaflowinterface for col in selected_collaborators: - clone = FLSpec._clones[col] + clone = flspec_class._clones[col] # Set new LocalRuntime for clone as it is required # new runtime object will not contain private attributes of # aggregator or other collaborators @@ -253,7 +252,7 @@ def execute_foreach_task( while not_at_transition_point: # execute to_exec for for each collab for collab in selected_collaborators: - clone = FLSpec._clones[collab] + clone = flspec_class._clones[collab] # get the function to be executed to_exec = getattr(clone, f.__name__) @@ -265,13 +264,13 @@ def execute_foreach_task( if self.backend == "ray": # Execute the collab steps clones = ray_executor.get_remote_clones() - FLSpec._clones.update(zip(selected_collaborators, clones)) + flspec_class._clones.update(zip(selected_collaborators, clones)) # update the next arguments - f, parent_func, _, kwargs = FLSpec._clones[collab].execute_task_args + f, parent_func, _, kwargs = flspec_class._clones[collab].execute_task_args # check for transition - if FLSpec._clones[collab]._is_at_transition_point(f, parent_func): + if flspec_class._clones[collab]._is_at_transition_point(f, parent_func): not_at_transition_point = False # remove clones after transition @@ -282,7 +281,7 @@ def execute_foreach_task( # Removes collaborator private attributes after transition for col in selected_collaborators: - clone = FLSpec._clones[col] + clone = flspec_class._clones[col] for attr in self.__collaborators[clone.input].private_attributes: if hasattr(clone, attr): self.__collaborators[clone.input].private_attributes[ @@ -296,7 +295,7 @@ def execute_foreach_task( g = getattr(flspec_obj, f.__name__) gc.collect() - g([FLSpec._clones[col] for col in selected_collaborators]) + g([flspec_class._clones[col] for col in selected_collaborators]) return flspec_obj.execute_task_args def filter_exclude_include(self, flspec_obj, f, selected_collaborators, **kwargs): @@ -307,13 +306,12 @@ def filter_exclude_include(self, flspec_obj, f, selected_collaborators, **kwargs f : The task to be executed within the flow selected_collaborators : all collaborators """ - - final_attr_module = importlib.import_module("openfl.experimental.interface") - FLSpec = getattr(final_attr_module, "FLSpec") - + + flspec_module = importlib.import_module("openfl.experimental.interface") + flspec_class = getattr(flspec_module, "FLSpec") for col in selected_collaborators: - clone = FLSpec._clones[col] + clone = flspec_class._clones[col] clone.input = col if ("exclude" in kwargs and hasattr(clone, kwargs["exclude"][0])) or ( "include" in kwargs and hasattr(clone, kwargs["include"][0])