Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Leak Logs #1159

Merged
merged 15 commits into from
Nov 21, 2024
16 changes: 14 additions & 2 deletions openfl/component/aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from openfl.pipelines import NoCompressionPipeline, TensorCodec
from openfl.protocols import base_pb2, utils
from openfl.utilities import TaskResultKey, TensorKey, change_tags
from openfl.utilities.logs import write_metric
from openfl.utilities.logs import get_memory_usage, write_metric


class Aggregator:
Expand Down Expand Up @@ -76,6 +76,7 @@ def __init__(
compression_pipeline=None,
db_store_rounds=1,
write_logs=False,
log_memory_usage=False,
log_metric_callback=None,
**kwargs,
):
Expand Down Expand Up @@ -123,7 +124,9 @@ def __init__(
)
self._end_of_round_check_done = [False] * rounds_to_train
self.stragglers = []

# Flag can be enabled to get memory usage details for ubuntu system
self.log_memory_usage = log_memory_usage
self.memory_details = []
self.rounds_to_train = rounds_to_train

# if the collaborator requests a delta, this value is set to true
Expand Down Expand Up @@ -969,6 +972,13 @@ def _end_of_round_check(self):
for task_name in all_tasks:
self._compute_validation_related_task_metrics(task_name)

if self.log_memory_usage:
# This is the place to check the memory usage of the aggregator
memory_detail = get_memory_usage()
memory_detail["round_number"] = self.round_number
payalcha marked this conversation as resolved.
Show resolved Hide resolved
memory_detail["metric_origin"] = "aggregator"
self.memory_details.append(memory_detail)

# Once all of the task results have been processed
self._end_of_round_check_done[self.round_number] = True

Expand All @@ -984,6 +994,8 @@ def _end_of_round_check(self):

# TODO This needs to be fixed!
if self._time_to_quit():
if self.log_memory_usage:
self.logger.info(f"Publish memory usage: {self.memory_details}")
self.logger.info("Experiment Completed. Cleaning up...")
else:
self.logger.info("Starting round %s...", self.round_number)
Expand Down
15 changes: 13 additions & 2 deletions openfl/component/collaborator/collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@


"""Collaborator module."""

from enum import Enum
from logging import getLogger
from time import sleep
Expand All @@ -13,6 +12,7 @@
from openfl.pipelines import NoCompressionPipeline, TensorCodec
from openfl.protocols import utils
from openfl.utilities import TensorKey
from openfl.utilities.logs import get_memory_usage


class DevicePolicy(Enum):
Expand Down Expand Up @@ -80,6 +80,7 @@ def __init__(
delta_updates=False,
compression_pipeline=None,
db_store_rounds=1,
log_memory_usage=False,
**kwargs,
):
"""Initialize the Collaborator object.
Expand Down Expand Up @@ -123,7 +124,8 @@ def __init__(
self.delta_updates = delta_updates

self.client = client

# Flag can be enabled to get memory usage details for ubuntu system
self.log_memory_usage = log_memory_usage
self.task_config = task_config

self.logger = getLogger(__name__)
Expand Down Expand Up @@ -158,6 +160,7 @@ def set_available_devices(self, cuda: Tuple[str] = ()):

def run(self):
"""Run the collaborator."""
memory_details = []
while True:
tasks, round_number, sleep_time, time_to_quit = self.get_tasks()
if time_to_quit:
Expand All @@ -171,6 +174,14 @@ def run(self):

# Cleaning tensor db
self.tensor_db.clean_up(self.db_store_rounds)
if self.log_memory_usage:
# This is the place to check the memory usage of the collaborator
memory_detail = get_memory_usage()
memory_detail["round_number"] = round_number
payalcha marked this conversation as resolved.
Show resolved Hide resolved
memory_details["metric_origin"] = self.collaborator_name
memory_details.append(memory_detail)
if self.log_memory_usage:
self.logger.info(f"Publish memory usage: {memory_details}")

self.logger.info("End of Federation reached. Exiting...")

Expand Down
35 changes: 35 additions & 0 deletions openfl/utilities/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
"""Logs utilities."""

import logging
import os

import psutil
from rich.console import Console
from rich.logging import RichHandler
from tensorboardX import SummaryWriter
Expand Down Expand Up @@ -57,3 +59,36 @@ def setup_loggers(log_level=logging.INFO):
formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] - %(message)s")
handler.setFormatter(formatter)
root.addHandler(handler)


def get_memory_usage() -> dict:
"""Return memory usage details of the current process.

Returns:
dict: A dictionary containing memory usage details.
"""
process = psutil.Process(os.getpid())
virtual_memory = psutil.virtual_memory()
swap_memory = psutil.swap_memory()
memory_usage = {
"process_memory": round(process.memory_info().rss / (1024**2), 2),
"virtual_memory": {
"total": round(virtual_memory.total / (1024**2), 2),
"available": round(virtual_memory.available / (1024**2), 2),
"percent": virtual_memory.percent,
"used": round(virtual_memory.used / (1024**2), 2),
"free": round(virtual_memory.free / (1024**2), 2),
"active": round(virtual_memory.active / (1024**2), 2),
"inactive": round(virtual_memory.inactive / (1024**2), 2),
"buffers": round(virtual_memory.buffers / (1024**2), 2),
"cached": round(virtual_memory.cached / (1024**2), 2),
"shared": round(virtual_memory.shared / (1024**2), 2),
},
"swap_memory": {
"total": round(swap_memory.total / (1024**2), 2),
"used": round(swap_memory.used / (1024**2), 2),
"free": round(swap_memory.free / (1024**2), 2),
"percent": swap_memory.percent,
},
}
return memory_usage
14 changes: 11 additions & 3 deletions tests/end_to_end/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Define a named tuple to store the objects for model owner, aggregator, and collaborators
federation_fixture = collections.namedtuple(
"federation_fixture",
"model_owner, aggregator, collaborators, model_name, disable_client_auth, disable_tls, workspace_path, results_dir",
"model_owner, aggregator, collaborators, model_name, disable_client_auth, disable_tls, workspace_path, results_dir, num_rounds",
)


Expand Down Expand Up @@ -62,6 +62,11 @@ def pytest_addoption(parser):
action="store_true",
help="Disable TLS for communication",
)
parser.addoption(
"--log_memory_usage",
action="store_true",
help="Enable memory log in collaborators and aggregator",
)


@pytest.fixture(scope="session", autouse=True)
Expand Down Expand Up @@ -234,14 +239,16 @@ def fx_federation(request, pytestconfig):
num_rounds = args.num_rounds
disable_client_auth = args.disable_client_auth
disable_tls = args.disable_tls
log_memory_usage = args.log_memory_usage

log.info(
f"Running federation setup using Task Runner API on single machine with below configurations:\n"
f"\tNumber of collaborators: {num_collaborators}\n"
f"\tNumber of rounds: {num_rounds}\n"
f"\tModel name: {model_name}\n"
f"\tClient authentication: {not disable_client_auth}\n"
f"\tTLS: {not disable_tls}"
f"\tTLS: {not disable_tls}\n"
f"\tMemory Logs: {log_memory_usage}"
)

# Validate the model name and create the workspace name
Expand All @@ -251,7 +258,7 @@ def fx_federation(request, pytestconfig):
workspace_name = f"workspace_{model_name}"

# Create model owner object and the workspace for the model
model_owner = participants.ModelOwner(workspace_name, model_name)
model_owner = participants.ModelOwner(workspace_name, model_name, log_memory_usage)
try:
workspace_path = model_owner.create_workspace(results_dir=results_dir)
except Exception as e:
Expand Down Expand Up @@ -318,4 +325,5 @@ def fx_federation(request, pytestconfig):
disable_tls=disable_tls,
workspace_path=workspace_path,
results_dir=results_dir,
num_rounds=num_rounds,
)
8 changes: 7 additions & 1 deletion tests/end_to_end/models/participants.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ class ModelOwner:
4. Importing and exporting the workspace etc.
"""

def __init__(self, workspace_name, model_name):
def __init__(self, workspace_name, model_name, log_memory_usage):
"""
Initialize the ModelOwner class
Args:
workspace_name (str): Workspace name
model_name (str): Model name
log_memory_usage (bool): Memory Log flag
"""
self.workspace_name = workspace_name
self.model_name = model_name
Expand All @@ -38,6 +39,7 @@ def __init__(self, workspace_name, model_name):
self.plan_path = None
self.num_collaborators = constants.NUM_COLLABORATORS
self.rounds_to_train = constants.NUM_ROUNDS
self.log_memory_usage = log_memory_usage

def create_workspace(self, results_dir=None):
"""
Expand Down Expand Up @@ -132,6 +134,10 @@ def modify_plan(self, new_rounds=None, num_collaborators=None, disable_client_au
data = yaml.load(fp, Loader=yaml.FullLoader)

data["aggregator"]["settings"]["rounds_to_train"] = int(self.rounds_to_train)
# Memory Leak related
data["aggregator"]["settings"]["log_memory_usage"] = self.log_memory_usage
data["collaborator"]["settings"]["log_memory_usage"] = self.log_memory_usage

data["data_loader"]["settings"]["collaborator_count"] = int(self.num_collaborators)
data["network"]["settings"]["disable_client_auth"] = disable_client_auth
data["network"]["settings"]["tls"] = not disable_tls
Expand Down
2 changes: 2 additions & 0 deletions tests/end_to_end/utils/conftest_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def parse_arguments():
- model_name (str, default="torch_cnn_mnist"): Model name
- disable_client_auth (bool): Disable client authentication
- disable_tls (bool): Disable TLS for communication
- log_memory_usage (bool): Enable Memory leak logs

Raises:
SystemExit: If the required arguments are not provided or if any argument parsing error occurs.
Expand All @@ -32,6 +33,7 @@ def parse_arguments():
parser.add_argument("--model_name", type=str, help="Model name")
parser.add_argument("--disable_client_auth", action="store_true", help="Disable client authentication")
parser.add_argument("--disable_tls", action="store_true", help="Disable TLS for communication")
parser.add_argument("--log_memory_usage", action="store_true", help="Enable Memory leak logs")
args = parser.parse_known_args()[0]
return args

Expand Down
5 changes: 3 additions & 2 deletions tests/end_to_end/utils/federation_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def verify_federation_run_completion(fed_obj, results):
executor.submit(
_verify_completion_for_participant,
participant,
fed_obj.num_rounds,
results[i]
)
for i, participant in enumerate(fed_obj.collaborators + [fed_obj.aggregator])
Expand All @@ -99,7 +100,7 @@ def verify_federation_run_completion(fed_obj, results):
return all(results)


def _verify_completion_for_participant(participant, result_file):
def _verify_completion_for_participant(participant, num_rounds, result_file, time_for_each_round=100):
"""
Verify the completion of the process for the participant
Args:
Expand All @@ -109,7 +110,7 @@ def _verify_completion_for_participant(participant, result_file):
bool: True if successful, else False
"""
# Wait for the successful output message to appear in the log till timeout
timeout = 900 # in seconds
timeout = 300 + ( time_for_each_round * num_rounds ) # in seconds
log.info(f"Printing the last line of the log file for {participant.name} to track the progress")
with open(result_file, 'r') as file:
content = file.read()
Expand Down
Loading