diff --git a/.gitignore b/.gitignore index 9604b1017..7b1c3c347 100755 --- a/.gitignore +++ b/.gitignore @@ -47,3 +47,6 @@ Session.vim # Kubernetes test files _artifacts/ _rundir/ + +# Python cache +__pycache__ \ No newline at end of file diff --git a/examples/dlio/parse_logs.py b/examples/dlio/parse_logs.py index d2cd007f0..a91093e90 100644 --- a/examples/dlio/parse_logs.py +++ b/examples/dlio/parse_logs.py @@ -15,9 +15,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import subprocess, os, json -import pprint -from typing import Tuple +import json, os, pprint, subprocess + +import sys +sys.path.append("../") +from utils.utils import get_memory, get_cpu, standard_timestamp, is_mash_installed LOCAL_LOGS_LOCATION = "../../bin/dlio-logs" @@ -38,57 +40,6 @@ "lowest_cpu": 0.0, } -def get_memory(pod_name: str, start: str, end: str) -> Tuple[int, int]: - # for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later. - result = subprocess.run(["mash", "--namespace=cloud_prod", "--output=csv", - f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/memory/used_bytes'), {{'project': '641665282868', 'metric:memory_type': 'non-evictable'}})| Window(Align('10m'))| GroupBy(['pod_name', 'container_name'], Max()), TimeInterval('{start}', '{end}'), '5s')"], - capture_output=True, text=True) - - data_points_int = [] - data_points_by_pod_container = result.stdout.strip().split("\n") - for data_points in data_points_by_pod_container[1:]: - data_points_split = data_points.split(",") - pn = data_points_split[4] - container_name = data_points_split[5] - if pn == pod_name and container_name == "gke-gcsfuse-sidecar": - try: - data_points_int = [int(d) for d in data_points_split[7:]] - except: - print(f"failed to parse memory for pod {pod_name}, {start}, {end}, data {data_points_int}") - break - if not data_points_int: - return 0, 0 - - return int(min(data_points_int) / 1024 ** 2) , int(max(data_points_int) / 1024 ** 2) - -def get_cpu(pod_name: str, start: str, end: str) -> Tuple[float, float]: - # for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later. - result = subprocess.run(["mash", "--namespace=cloud_prod", "--output=csv", - f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/cpu/core_usage_time'), {{'project': '641665282868'}})| Window(Rate('10m'))| GroupBy(['pod_name', 'container_name'], Max()), TimeInterval('{start}', '{end}'), '5s')"], - capture_output=True, text=True) - - data_points_float = [] - data_points_by_pod_container = result.stdout.split("\n") - for data_points in data_points_by_pod_container[1:]: - data_points_split = data_points.split(",") - pn = data_points_split[4] - container_name = data_points_split[5] - if pn == pod_name and container_name == "gke-gcsfuse-sidecar": - try: - data_points_float = [float(d) for d in data_points_split[6:]] - except: - print(f"failed to parse CPU for pod {pod_name}, {start}, {end}, data {data_points_float}") - - break - - if not data_points_float: - return 0.0, 0.0 - - return round(min(data_points_float), 5) , round(max(data_points_float), 5) - -def standard_timestamp(timestamp: int) -> str: - return timestamp.split('.')[0].replace('T', ' ') + " UTC" - if __name__ == "__main__": bucketNames = ["gke-dlio-unet3d-100kb-500k", "gke-dlio-unet3d-150mb-5k", "gke-dlio-unet3d-3mb-100k", "gke-dlio-unet3d-500kb-1m"] @@ -114,6 +65,9 @@ def standard_timestamp(timestamp: int) -> str: "gcsfuse-no-file-cache": [record1, record2, record3, record4] ''' output = {} + mash_installed = is_mash_installed() + if not mash_installed: + print("Mash is not installed, will skip parsing CPU and memory usage.") for root, _, files in os.walk(LOCAL_LOGS_LOCATION): if files: @@ -152,7 +106,7 @@ def standard_timestamp(timestamp: int) -> str: r["train_throughput_mb_per_second"] = int(r["train_throughput_samples_per_second"] * int(output[key]["mean_file_size"]) / (1024 ** 2)) r["start"] = standard_timestamp(per_epoch_stats_data[str(i+1)]["start"]) r["end"] = standard_timestamp(per_epoch_stats_data[str(i+1)]["end"]) - if r["scenario"] != "local-ssd": + if r["scenario"] != "local-ssd" and mash_installed: r["lowest_memory"], r["highest_memory"] = get_memory(r["pod_name"], r["start"], r["end"]) r["lowest_cpu"], r["highest_cpu"] = get_cpu(r["pod_name"], r["start"], r["end"]) diff --git a/examples/fio/parse_logs.py b/examples/fio/parse_logs.py index 0028ab45f..f804d7dcd 100644 --- a/examples/fio/parse_logs.py +++ b/examples/fio/parse_logs.py @@ -15,10 +15,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import subprocess, os, json -import pprint -import datetime -from typing import Tuple +import json, os, pprint, subprocess + +import sys +sys.path.append("../") +from utils.utils import get_memory, get_cpu, unix_to_timestamp, is_mash_installed LOCAL_LOGS_LOCATION = "../../bin/fio-logs" @@ -38,63 +39,6 @@ "lowest_cpu": 0.0, } -def get_memory(pod_name: str, start: str, end: str) -> Tuple[int, int]: - # for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later. - result = subprocess.run(["mash", "--namespace=cloud_prod", "--output=csv", - f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/memory/used_bytes'), {{'project': '641665282868', 'metric:memory_type': 'non-evictable'}})| Window(Align('10m'))| GroupBy(['pod_name', 'container_name'], Max()), TimeInterval('{start}', '{end}'), '5s')"], - capture_output=True, text=True) - - data_points_int = [] - data_points_by_pod_container = result.stdout.strip().split("\n") - for data_points in data_points_by_pod_container[1:]: - data_points_split = data_points.split(",") - pn = data_points_split[4] - container_name = data_points_split[5] - if pn == pod_name and container_name == "gke-gcsfuse-sidecar": - try: - data_points_int = [int(d) for d in data_points_split[7:]] - except: - print(f"failed to parse memory for pod {pod_name}, {start}, {end}, data {data_points_int}") - break - if not data_points_int: - return 0, 0 - - return int(min(data_points_int) / 1024 ** 2) , int(max(data_points_int) / 1024 ** 2) - -def get_cpu(pod_name: str, start: str, end: str) -> Tuple[float, float]: - # for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later. - result = subprocess.run(["mash", "--namespace=cloud_prod", "--output=csv", - f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/cpu/core_usage_time'), {{'project': '641665282868'}})| Window(Rate('10m'))| GroupBy(['pod_name', 'container_name'], Max()), TimeInterval('{start}', '{end}'), '5s')"], - capture_output=True, text=True) - - data_points_float = [] - data_points_by_pod_container = result.stdout.split("\n") - for data_points in data_points_by_pod_container[1:]: - data_points_split = data_points.split(",") - pn = data_points_split[4] - container_name = data_points_split[5] - if pn == pod_name and container_name == "gke-gcsfuse-sidecar": - try: - data_points_float = [float(d) for d in data_points_split[6:]] - except: - print(f"failed to parse CPU for pod {pod_name}, {start}, {end}, data {data_points_float}") - - break - - if not data_points_float: - return 0.0, 0.0 - - return round(min(data_points_float), 5) , round(max(data_points_float), 5) - -def unix_to_timestamp(unix_timestamp: int) -> str: - # Convert Unix timestamp to a datetime object (aware of UTC) - datetime_utc = datetime.datetime.fromtimestamp(unix_timestamp / 1000, tz=datetime.timezone.utc) - - # Format the datetime object as a string (if desired) - utc_timestamp_string = datetime_utc.strftime('%Y-%m-%d %H:%M:%S UTC') - - return utc_timestamp_string - if __name__ == "__main__": logLocations = [("gke-fio-64k-1m", "64K"), ("gke-fio-128k-1m", "128K"), ("gke-fio-1mb-1m", "1M"), ("gke-fio-100mb-50k", "100M"), ("gke-fio-200gb-1", "200G")] @@ -123,6 +67,9 @@ def unix_to_timestamp(unix_timestamp: int) -> str: "gcsfuse-no-file-cache": [record1, record2, record3, record4] ''' output = {} + mash_installed = is_mash_installed() + if not mash_installed: + print("Mash is not installed, will skip parsing CPU and memory usage.") for root, _, files in os.walk(LOCAL_LOGS_LOCATION): for file in files: @@ -158,7 +105,7 @@ def unix_to_timestamp(unix_timestamp: int) -> str: r["throughput_mb_per_second"] = int(per_epoch_output_data["jobs"][0]["read"]["bw_bytes"] / (1024 ** 2)) r["start"] = unix_to_timestamp(per_epoch_output_data["jobs"][0]["job_start"]) r["end"] = unix_to_timestamp(per_epoch_output_data["timestamp_ms"]) - if r["scenario"] != "local-ssd": + if r["scenario"] != "local-ssd" and mash_installed: r["lowest_memory"], r["highest_memory"] = get_memory(r["pod_name"], r["start"], r["end"]) r["lowest_cpu"], r["highest_cpu"] = get_cpu(r["pod_name"], r["start"], r["end"]) diff --git a/examples/fio/run_tests.py b/examples/fio/run_tests.py index da94cda6c..66a564790 100644 --- a/examples/fio/run_tests.py +++ b/examples/fio/run_tests.py @@ -45,11 +45,8 @@ def run_command(command: str): f"--set fio.fileSize={fileSize}", f"--set fio.blockSize={blockSize}"] - match fileSize: - case "200G": - commands.append("--set gcsfuse.metadataStatCacheCapacity=0") - case "100M": - commands.append("--set fio.filesPerThread=1000") + if fileSize == "100M": + commands.append("--set fio.filesPerThread=1000") helm_command = " ".join(commands) diff --git a/examples/utils/__init__.py b/examples/utils/__init__.py new file mode 100644 index 000000000..4b5e27c8d --- /dev/null +++ b/examples/utils/__init__.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python + +# Copyright 2018 The Kubernetes Authors. +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/examples/utils/utils.py b/examples/utils/utils.py new file mode 100644 index 000000000..88f3f87f6 --- /dev/null +++ b/examples/utils/utils.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python + +# Copyright 2018 The Kubernetes Authors. +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess, datetime +from typing import Tuple + +def is_mash_installed() -> bool: + try: + subprocess.run(["mash", "--version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) + return True + except subprocess.CalledProcessError: + return False + +def get_memory(pod_name: str, start: str, end: str) -> Tuple[int, int]: + # for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later. + result = subprocess.run(["mash", "--namespace=cloud_prod", "--output=csv", + f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/memory/used_bytes'), {{'project': '641665282868', 'metric:memory_type': 'non-evictable'}})| Window(Align('10m'))| GroupBy(['pod_name', 'container_name'], Max()), TimeInterval('{start}', '{end}'), '5s')"], + capture_output=True, text=True) + + data_points_int = [] + data_points_by_pod_container = result.stdout.strip().split("\n") + for data_points in data_points_by_pod_container[1:]: + data_points_split = data_points.split(",") + pn = data_points_split[4] + container_name = data_points_split[5] + if pn == pod_name and container_name == "gke-gcsfuse-sidecar": + try: + data_points_int = [int(d) for d in data_points_split[7:]] + except: + print(f"failed to parse memory for pod {pod_name}, {start}, {end}, data {data_points_int}") + break + if not data_points_int: + return 0, 0 + + return int(min(data_points_int) / 1024 ** 2) , int(max(data_points_int) / 1024 ** 2) + +def get_cpu(pod_name: str, start: str, end: str) -> Tuple[float, float]: + # for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later. + result = subprocess.run(["mash", "--namespace=cloud_prod", "--output=csv", + f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/cpu/core_usage_time'), {{'project': '641665282868'}})| Window(Rate('10m'))| GroupBy(['pod_name', 'container_name'], Max()), TimeInterval('{start}', '{end}'), '5s')"], + capture_output=True, text=True) + + data_points_float = [] + data_points_by_pod_container = result.stdout.split("\n") + for data_points in data_points_by_pod_container[1:]: + data_points_split = data_points.split(",") + pn = data_points_split[4] + container_name = data_points_split[5] + if pn == pod_name and container_name == "gke-gcsfuse-sidecar": + try: + data_points_float = [float(d) for d in data_points_split[6:]] + except: + print(f"failed to parse CPU for pod {pod_name}, {start}, {end}, data {data_points_float}") + + break + + if not data_points_float: + return 0.0, 0.0 + + return round(min(data_points_float), 5) , round(max(data_points_float), 5) + +def unix_to_timestamp(unix_timestamp: int) -> str: + # Convert Unix timestamp to a datetime object (aware of UTC) + datetime_utc = datetime.datetime.fromtimestamp(unix_timestamp / 1000, tz=datetime.timezone.utc) + + # Format the datetime object as a string (if desired) + utc_timestamp_string = datetime_utc.strftime('%Y-%m-%d %H:%M:%S UTC') + + return utc_timestamp_string + +def standard_timestamp(timestamp: int) -> str: + return timestamp.split('.')[0].replace('T', ' ') + " UTC" \ No newline at end of file