Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

phlop runtime process monitoring #881

Merged
merged 7 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pyphare/pyphare/cpp/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,12 @@ def log_runtime_config():
git_hash=get_git_hash(),
)

rank_info_dir = DOT_PHARE_DIR / "rank_info"
if cpp_lib.mpi_rank() == 0:
DOT_PHARE_DIR.mkdir(exist_ok=True, parents=True)
rank_info_dir.mkdir(exist_ok=True, parents=True)
cpp_lib.mpi_barrier()

rank_dir = DOT_PHARE_DIR / f"rank_{cpp_lib.mpi_rank()}"
rank_dir = rank_info_dir / f"{cpp_lib.mpi_rank()}"
Comment on lines +110 to +115
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider moving the MPI barrier for better synchronization

The current placement of the MPI barrier might not fully prevent race conditions. Other ranks could potentially proceed to create their rank-specific directories before rank 0 has finished creating the parent directory.

Consider moving the MPI barrier after the creation of rank_info_dir:

 rank_info_dir = DOT_PHARE_DIR / "rank_info"
 if cpp_lib.mpi_rank() == 0:
     rank_info_dir.mkdir(exist_ok=True, parents=True)
-cpp_lib.mpi_barrier()
+cpp_lib.mpi_barrier()  # Move the barrier here

 rank_dir = rank_info_dir / f"{cpp_lib.mpi_rank()}"
 rank_dir.mkdir(exist_ok=True)

This change ensures that all ranks wait until the parent directory is created before proceeding, preventing potential issues in directory creation.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
rank_info_dir = DOT_PHARE_DIR / "rank_info"
if cpp_lib.mpi_rank() == 0:
DOT_PHARE_DIR.mkdir(exist_ok=True, parents=True)
rank_info_dir.mkdir(exist_ok=True, parents=True)
cpp_lib.mpi_barrier()
rank_dir = DOT_PHARE_DIR / f"rank_{cpp_lib.mpi_rank()}"
rank_dir = rank_info_dir / f"{cpp_lib.mpi_rank()}"
rank_info_dir = DOT_PHARE_DIR / "rank_info"
if cpp_lib.mpi_rank() == 0:
rank_info_dir.mkdir(exist_ok=True, parents=True)
cpp_lib.mpi_barrier() # Move the barrier here
rank_dir = rank_info_dir / f"{cpp_lib.mpi_rank()}"

rank_dir.mkdir(exist_ok=True)

with open(rank_dir / "runtime_config.json", "w") as f:
Expand Down
48 changes: 48 additions & 0 deletions pyphare/pyphare/simulator/monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from pathlib import Path


def have_phlop():
from importlib.util import find_spec

try:
return find_spec("phlop.dict") is not None
except (ImportError, ModuleNotFoundError):
return False
Comment on lines +8 to +10
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify the have_phlop function by removing unnecessary exception handling

The find_spec function returns None if the module is not found and does not raise an exception. The try-except block catching ImportError and ModuleNotFoundError is unnecessary and can be removed for simplicity.

Apply this diff to simplify the function:

 def have_phlop():
     from importlib.util import find_spec

-    try:
-        return find_spec("phlop.dict") is not None
-    except (ImportError, ModuleNotFoundError):
-        return False
+    return find_spec("phlop.dict") is not None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return find_spec("phlop.dict") is not None
except (ImportError, ModuleNotFoundError):
return False
return find_spec("phlop.dict") is not None



def valdict(**kwargs):
if not have_phlop():
return dict
Comment on lines +14 to +15
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Return a dictionary instance instead of the dict class

In the valdict function, when have_phlop() is False, return dict returns the dict class itself, not an instance. This can lead to issues when the code expects a dictionary instance. Consider returning a dictionary instance by calling dict(**kwargs) instead.

Apply this diff to fix the issue:

 def valdict(**kwargs):
     if not have_phlop():
-        return dict
+        return dict(**kwargs)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if not have_phlop():
return dict
if not have_phlop():
return dict(**kwargs)


from phlop.dict import ValDict # pylint: disable=import-error

return ValDict(**kwargs)


_globals = valdict(stats_man=None)


def monitoring_yaml_file(cpplib):
path = Path(".phare") / "stats" / f"rank.{cpplib.mpi_rank()}.yaml"
path.parent.mkdir(exist_ok=True, parents=True)
return path


def setup_monitoring(cpplib, interval=10):
if not have_phlop():
return

from phlop.app import stats_man as sm # pylint: disable=import-error

_globals.stats_man = sm.AttachableRuntimeStatsManager(
valdict(yaml=monitoring_yaml_file(cpplib), interval=interval),
dict(rank=cpplib.mpi_rank()),
).start()


def monitoring_shutdown(cpplib):
if not have_phlop():
return

if _globals.stats_man:
_globals.stats_man.kill().join()
Comment on lines +47 to +48
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Access _globals['stats_man'] using square brackets

Similar to the previous comment, ensure that you access the stats_man key using square brackets since _globals is a dictionary.

Apply this diff to fix the access:

-if _globals.stats_man:
-    _globals.stats_man.kill().join()
+if _globals['stats_man']:
+    _globals['stats_man'].kill().join()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if _globals.stats_man:
_globals.stats_man.kill().join()
if _globals['stats_man']:
_globals['stats_man'].kill().join()

23 changes: 18 additions & 5 deletions pyphare/pyphare/simulator/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
#
#

import os
import datetime
import atexit
import time as timem
import numpy as np
import pyphare.pharein as ph
from pathlib import Path
from . import monitoring as mon


life_cycles = {}
SIM_MONITOR = os.getenv("PHARE_SIM_MON", "False").lower() in ("true", "1", "t")
SCOPE_TIMING = os.getenv("PHARE_SCOPE_TIMING", "False").lower() in ("true", "1", "t")


@atexit.register
Expand All @@ -24,6 +29,9 @@ def simulator_shutdown():
def make_cpp_simulator(dim, interp, nbrRefinedPart, hier):
from pyphare.cpp import cpp_lib

if SCOPE_TIMING:
Path(".phare/timings").mkdir(exist_ok=True)
Comment on lines +32 to +33
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling when creating the directory

While creating the .phare/timings directory, exceptions such as permission errors may occur. To ensure the program handles such cases gracefully, consider adding error handling around the directory creation.

Apply this diff to handle potential exceptions:

+try:
    if SCOPE_TIMING:
        Path(".phare/timings").mkdir(exist_ok=True)
+except Exception as e:
+    print(f"Failed to create directory '.phare/timings': {e}")
+    # Consider additional error handling or logging as needed
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if SCOPE_TIMING:
Path(".phare/timings").mkdir(exist_ok=True)
try:
if SCOPE_TIMING:
Path(".phare/timings").mkdir(exist_ok=True)
except Exception as e:
print(f"Failed to create directory '.phare/timings': {e}")
# Consider additional error handling or logging as needed


make_sim = f"make_simulator_{dim}_{interp}_{nbrRefinedPart}"
return getattr(cpp_lib(), make_sim)(hier)

Expand Down Expand Up @@ -127,6 +135,7 @@ def initialize(self):

self.cpp_sim.initialize()
self._auto_dump() # first dump might be before first advance

return self
except:
import sys
Expand All @@ -140,7 +149,6 @@ def initialize(self):

def _throw(self, e):
import sys
from pyphare.cpp import cpp_lib

print_rank0(e)
sys.exit(1)
Expand Down Expand Up @@ -170,12 +178,19 @@ def times(self):
self.timeStep(),
)

def run(self, plot_times=False):
def run(self, plot_times=False, monitoring=None):
"""monitoring requires phlop"""
from pyphare.cpp import cpp_lib

self._check_init()

if monitoring is None: # check env
monitoring = SIM_MONITOR

if self.simulation.dry_run:
return self
if monitoring:
mon.setup_monitoring(cpp_lib())
perf = []
end_time = self.cpp_sim.endTime()
t = self.cpp_sim.currentTime()
Expand All @@ -197,6 +212,7 @@ def run(self, plot_times=False):
if plot_times:
plot_timestep_time(perf)

mon.monitoring_shutdown(cpp_lib())
return self.reset()

def _auto_dump(self):
Expand Down Expand Up @@ -263,13 +279,10 @@ def _log_to_file(self):
DATETIME_FILES - logfile with starting datetime timestamp per rank
NONE - no logging files, display to cout
"""
import os

if "PHARE_LOG" not in os.environ:
os.environ["PHARE_LOG"] = "RANK_FILES"
from pyphare.cpp import cpp_lib

if os.environ["PHARE_LOG"] != "NONE" and cpp_lib().mpi_rank() == 0:
from pathlib import Path

Path(".log").mkdir(exist_ok=True)
4 changes: 4 additions & 0 deletions src/amr/solvers/solver_ppc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ void SolverPPC<HybridModel, AMR_Types>::fillMessengerInfo(
template<typename HybridModel, typename AMR_Types>
void SolverPPC<HybridModel, AMR_Types>::saveState_(level_t& level, ModelViews_t& views)
{
PHARE_LOG_SCOPE(1, "SolverPPC::saveState_");

for (auto& state : views)
{
std::stringstream ss;
Expand All @@ -232,6 +234,8 @@ void SolverPPC<HybridModel, AMR_Types>::saveState_(level_t& level, ModelViews_t&
template<typename HybridModel, typename AMR_Types>
void SolverPPC<HybridModel, AMR_Types>::restoreState_(level_t& level, ModelViews_t& views)
{
PHARE_LOG_SCOPE(1, "SolverPPC::restoreState_");

for (auto& state : views)
{
std::stringstream ss;
Expand Down
2 changes: 1 addition & 1 deletion src/hdf5/detail/h5/h5_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class HighFiveFile
fapl.add(HighFive::MPIOFileAccess{MPI_COMM_WORLD, MPI_INFO_NULL});
#else
std::cout << "WARNING: PARALLEL HDF5 not available" << std::endl;
if (core::mpi_size() > 1)
if (core::mpi::size() > 1)
{
throw std::runtime_error("HDF5 NOT PARALLEL!");
}
Expand Down
2 changes: 1 addition & 1 deletion src/phare/phare.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class SamraiLifeCycle
PHARE_WITH_PHLOP( //
if (auto e = core::get_env("PHARE_SCOPE_TIMING", "false"); e == "1" || e == "true")
phlop::ScopeTimerMan::INSTANCE()
.file_name(".phare_times." + std::to_string(core::mpi::rank()) + ".txt")
.file_name(".phare/timings/rank." + std::to_string(core::mpi::rank()) + ".txt")
.init(); //
)
}
Expand Down
4 changes: 2 additions & 2 deletions src/simulator/simulator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ std::string Simulator<_dimension, _interp_order, _nbRefinedPart>::to_str()
template<std::size_t _dimension, std::size_t _interp_order, std::size_t _nbRefinedPart>
void Simulator<_dimension, _interp_order, _nbRefinedPart>::initialize()
{
PHARE_LOG_SCOPE(1, "Simulator::initialize");

try
{
if (isInitialized)
Expand Down Expand Up @@ -414,8 +416,6 @@ double Simulator<_dimension, _interp_order, _nbRefinedPart>::advance(double dt)

try
{
PHARE_LOG_SCOPE(1, "Simulator::advance");

dt_new = integrator_->advance(dt);
currentTime_ = startTime_ + ((*timeStamper) += dt);
}
Expand Down
8 changes: 5 additions & 3 deletions tests/functional/harris/harris_2d_lb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

mpl.use("Agg")

SCOPE_TIMING = os.getenv("PHARE_SCOPE_TIMING", "True").lower() in ("true", "1", "t")
LOAD_BALANCE = os.getenv("LOAD_BALANCE", "True").lower() in ("true", "1", "t")

cpp = cpp_lib()
Expand Down Expand Up @@ -171,12 +172,12 @@ def plot(diag_dir):
for c in ["x", "y", "z"]:
run.GetB(time).plot(
filename=plot_file_for_qty(f"b{c}", time),
qty=f"B{c}",
qty=f"{c}",
plot_patches=True,
)
run.GetJ(time).plot(
filename=plot_file_for_qty("jz", time),
qty="Jz",
qty="z",
plot_patches=True,
vmin=-2,
vmax=2,
Expand All @@ -200,7 +201,8 @@ def test_run(self):
Simulator(config()).run().reset()
if cpp.mpi_rank() == 0:
plot(diag_dir)
m_plotting.plot_run_timer_data(diag_dir, cpp.mpi_rank())
if SCOPE_TIMING:
m_plotting.plot_run_timer_data(diag_dir, cpp.mpi_rank())
cpp.mpi_barrier()
return self

Expand Down
67 changes: 56 additions & 11 deletions tools/python3/phloping.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@
# parsing PHARE scope funtion timers
#

import sys
import argparse
import numpy as np
from dataclasses import dataclass, field

from pyphare.pharesee.run import Run
from pyphare.pharesee.hierarchy import hierarchy_from

from phlop.timing.scope_timer import ScopeTimerFile as phScopeTimerFile
from phlop.timing.scope_timer import file_parser as phfile_parser

from phlop.timing import scope_timer as st

substeps_per_finer_level = 4


@dataclass
class ScopeTimerFile(phScopeTimerFile):
class ScopeTimerFile(st.ScopeTimerFile):
run: Run
rank: str
advances: list = field(default_factory=lambda: [])
Expand Down Expand Up @@ -124,20 +122,67 @@ def normalised_times_for_L(self, ilvl):
"""
Normalise substep time against particle count for that level
at the most recent coarse time, no refined timesteps
Particle counts may include init dump, so be one bigger.
"""
times = self.advance_times_for_L(ilvl)
counts = len(self.particles_per_level_per_time_step[ilvl])

# trim init particle count for lvl
Li_times = (
self.particles_per_level_per_time_step[ilvl]
if counts == len(times)
else self.particles_per_level_per_time_step[ilvl][1:]
)
if ilvl == 0:
return times / self.particles_per_level_per_time_step[0]
return times / Li_times
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent potential division by zero

When ilvl == 0, the function returns times / Li_times. This operation could raise a ZeroDivisionError if any element in Li_times is zero.

Consider adding a check to prevent division by zero. For example:

if ilvl == 0:
    return np.divide(times, Li_times, out=np.zeros_like(times), where=Li_times!=0)

This approach will return 0 for any element where Li_times is 0, preventing the division by zero error.

substeps = self.steps_per_coarse_timestep_for_L(ilvl)
norm_times = times.copy()
return (
norm_times.reshape(int(times.shape[0] / substeps), substeps)
/ self.particles_per_level_per_time_step[ilvl].reshape(
self.particles_per_level_per_time_step[ilvl].shape[0], 1
)
/ Li_times.reshape(Li_times.shape[0], 1)
Comment on lines +125 to +142
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Approve changes with a note on potential division by zero

The changes to handle particle counts and normalization look good. However, there's still a potential issue with division by zero on line 138.

Consider adding a check to prevent division by zero. For example:

if ilvl == 0:
    return np.divide(times, Li_times, out=np.zeros_like(times), where=Li_times!=0)

This approach will return 0 for any element where Li_times is 0, preventing the division by zero error.

).reshape(times.shape[0])


def file_parser(run, rank, times_filepath):
supe = phfile_parser(times_filepath)
supe = st.file_parser(times_filepath)
return ScopeTimerFile(supe.id_keys, supe.roots, run, str(rank))


def write_root_as_csv(scope_timer_file, outfile, headers=None, regex=None):
from contextlib import redirect_stdout

with open(outfile, "w") as f:
with redirect_stdout(f):
Comment on lines +154 to +155
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Combine nested with statements for better readability

Consider combining the nested with statements into a single line to simplify the code and improve readability.

Apply this diff to refactor the code:

-with open(outfile, "w") as f:
-    with redirect_stdout(f):
-        print_root_as_csv(scope_timer_file, headers, regex)
+with open(outfile, "w") as f, redirect_stdout(f):
+    print_root_as_csv(scope_timer_file, headers, regex)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
with open(outfile, "w") as f:
with redirect_stdout(f):
with open(outfile, "w") as f, redirect_stdout(f):
print_root_as_csv(scope_timer_file, headers, regex)
🧰 Tools
🪛 Ruff

154-155: Use a single with statement with multiple contexts instead of nested with statements

Combine with statements

(SIM117)

print_root_as_csv(scope_timer_file, headers, regex)


def print_root_as_csv(scope_timer_file, n_parts, headers=None, regex=None):
stf = scope_timer_file # alias
stf = file_parser(stf) if isinstance(stf, str) else stf

if headers:
print(",".join(headers))
for root in stf.roots:
s = stf(root.k)
if regex and regex not in s:
continue
bits = s.split(",")
print(f"{s}{root.t},{root.t/n_parts}")

Comment on lines +159 to +171
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent potential division by zero in print_root_as_csv

The variable n_parts is used as a divisor in root.t / n_parts on line 170. Ensure that n_parts is validated to be a non-zero value to prevent a ZeroDivisionError.

Consider adding a check at the beginning of the function:

 def print_root_as_csv(scope_timer_file, n_parts, headers=None, regex=None):
+    if n_parts == 0:
+        raise ValueError("n_parts must be a non-zero value")
     stf = scope_timer_file  # alias
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def print_root_as_csv(scope_timer_file, n_parts, headers=None, regex=None):
stf = scope_timer_file # alias
stf = file_parser(stf) if isinstance(stf, str) else stf
if headers:
print(",".join(headers))
for root in stf.roots:
s = stf(root.k)
if regex and regex not in s:
continue
bits = s.split(",")
print(f"{s}{root.t},{root.t/n_parts}")
def print_root_as_csv(scope_timer_file, n_parts, headers=None, regex=None):
if n_parts == 0:
raise ValueError("n_parts must be a non-zero value")
stf = scope_timer_file # alias
stf = file_parser(stf) if isinstance(stf, str) else stf
if headers:
print(",".join(headers))
for root in stf.roots:
s = stf(root.k)
if regex and regex not in s:
continue
bits = s.split(",")
print(f"{s}{root.t},{root.t/n_parts}")
🧰 Tools
🪛 Ruff

169-169: Local variable bits is assigned to but never used

Remove assignment to unused variable bits

(F841)


def print_variance_across(scope_timer_filepath=None):
if scope_timer_filepath is None: # assume cli
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--file", default=None, help="timer file")
scope_timer_filepath = parser.parse_args().file
if not scope_timer_filepath:
parser.print_help()
sys.exit(1)
st.print_variance_across(scope_timer_filepath)


if __name__ == "__main__":
if len(sys.argv) > 1:
fn = sys.argv[1]
sys.argv = [sys.argv[0]] + sys.argv[2:]
globals()[fn]()
2 changes: 1 addition & 1 deletion tools/python3/plotting.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def plot_run_timer_data(diag_dir=None, rank=0):
parser.add_argument("-d", "--dir", default=".", help="Diagnostics directory")
diag_dir = parser.parse_args().dir
run = Run(diag_dir)
res = phloping.file_parser(run, rank, Path(f".phare_times.{rank}.txt"))
res = phloping.file_parser(run, rank, Path(f".phare/timings/rank.{rank}.txt"))
fig, ax = plt.subplots()
L0X = res.time_steps_for_L(0)
ax.plot(L0X, res.normalised_times_for_L(0), ":", label="L0 times", color="black")
Expand Down
Loading