Skip to content

Commit

Permalink
update fio and dlio examples
Browse files Browse the repository at this point in the history
  • Loading branch information
songjiaxun committed Mar 28, 2024
1 parent 893349d commit a967261
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 122 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@ Session.vim
# Kubernetes test files
_artifacts/
_rundir/

# Python cache
__pycache__
64 changes: 9 additions & 55 deletions examples/dlio/parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import subprocess, os, json
import pprint
from typing import Tuple
import json, os, pprint, subprocess

import sys
sys.path.append("../")
from utils.utils import get_memory, get_cpu, standard_timestamp, is_mash_installed

LOCAL_LOGS_LOCATION = "../../bin/dlio-logs"

Expand All @@ -38,57 +40,6 @@
"lowest_cpu": 0.0,
}

def get_memory(pod_name: str, start: str, end: str) -> Tuple[int, int]:
# for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later.
result = subprocess.run(["mash", "--namespace=cloud_prod", "--output=csv",
f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/memory/used_bytes'), {{'project': '641665282868', 'metric:memory_type': 'non-evictable'}})| Window(Align('10m'))| GroupBy(['pod_name', 'container_name'], Max()), TimeInterval('{start}', '{end}'), '5s')"],
capture_output=True, text=True)

data_points_int = []
data_points_by_pod_container = result.stdout.strip().split("\n")
for data_points in data_points_by_pod_container[1:]:
data_points_split = data_points.split(",")
pn = data_points_split[4]
container_name = data_points_split[5]
if pn == pod_name and container_name == "gke-gcsfuse-sidecar":
try:
data_points_int = [int(d) for d in data_points_split[7:]]
except:
print(f"failed to parse memory for pod {pod_name}, {start}, {end}, data {data_points_int}")
break
if not data_points_int:
return 0, 0

return int(min(data_points_int) / 1024 ** 2) , int(max(data_points_int) / 1024 ** 2)

def get_cpu(pod_name: str, start: str, end: str) -> Tuple[float, float]:
# for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later.
result = subprocess.run(["mash", "--namespace=cloud_prod", "--output=csv",
f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/cpu/core_usage_time'), {{'project': '641665282868'}})| Window(Rate('10m'))| GroupBy(['pod_name', 'container_name'], Max()), TimeInterval('{start}', '{end}'), '5s')"],
capture_output=True, text=True)

data_points_float = []
data_points_by_pod_container = result.stdout.split("\n")
for data_points in data_points_by_pod_container[1:]:
data_points_split = data_points.split(",")
pn = data_points_split[4]
container_name = data_points_split[5]
if pn == pod_name and container_name == "gke-gcsfuse-sidecar":
try:
data_points_float = [float(d) for d in data_points_split[6:]]
except:
print(f"failed to parse CPU for pod {pod_name}, {start}, {end}, data {data_points_float}")

break

if not data_points_float:
return 0.0, 0.0

return round(min(data_points_float), 5) , round(max(data_points_float), 5)

def standard_timestamp(timestamp: int) -> str:
return timestamp.split('.')[0].replace('T', ' ') + " UTC"

if __name__ == "__main__":
bucketNames = ["gke-dlio-unet3d-100kb-500k", "gke-dlio-unet3d-150mb-5k", "gke-dlio-unet3d-3mb-100k", "gke-dlio-unet3d-500kb-1m"]

Expand All @@ -114,6 +65,9 @@ def standard_timestamp(timestamp: int) -> str:
"gcsfuse-no-file-cache": [record1, record2, record3, record4]
'''
output = {}
mash_installed = is_mash_installed()
if not mash_installed:
print("Mash is not installed, will skip parsing CPU and memory usage.")

for root, _, files in os.walk(LOCAL_LOGS_LOCATION):
if files:
Expand Down Expand Up @@ -152,7 +106,7 @@ def standard_timestamp(timestamp: int) -> str:
r["train_throughput_mb_per_second"] = int(r["train_throughput_samples_per_second"] * int(output[key]["mean_file_size"]) / (1024 ** 2))
r["start"] = standard_timestamp(per_epoch_stats_data[str(i+1)]["start"])
r["end"] = standard_timestamp(per_epoch_stats_data[str(i+1)]["end"])
if r["scenario"] != "local-ssd":
if r["scenario"] != "local-ssd" and mash_installed:
r["lowest_memory"], r["highest_memory"] = get_memory(r["pod_name"], r["start"], r["end"])
r["lowest_cpu"], r["highest_cpu"] = get_cpu(r["pod_name"], r["start"], r["end"])

Expand Down
71 changes: 9 additions & 62 deletions examples/fio/parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import subprocess, os, json
import pprint
import datetime
from typing import Tuple
import json, os, pprint, subprocess

import sys
sys.path.append("../")
from utils.utils import get_memory, get_cpu, unix_to_timestamp, is_mash_installed

LOCAL_LOGS_LOCATION = "../../bin/fio-logs"

Expand All @@ -38,63 +39,6 @@
"lowest_cpu": 0.0,
}

def get_memory(pod_name: str, start: str, end: str) -> Tuple[int, int]:
# for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later.
result = subprocess.run(["mash", "--namespace=cloud_prod", "--output=csv",
f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/memory/used_bytes'), {{'project': '641665282868', 'metric:memory_type': 'non-evictable'}})| Window(Align('10m'))| GroupBy(['pod_name', 'container_name'], Max()), TimeInterval('{start}', '{end}'), '5s')"],
capture_output=True, text=True)

data_points_int = []
data_points_by_pod_container = result.stdout.strip().split("\n")
for data_points in data_points_by_pod_container[1:]:
data_points_split = data_points.split(",")
pn = data_points_split[4]
container_name = data_points_split[5]
if pn == pod_name and container_name == "gke-gcsfuse-sidecar":
try:
data_points_int = [int(d) for d in data_points_split[7:]]
except:
print(f"failed to parse memory for pod {pod_name}, {start}, {end}, data {data_points_int}")
break
if not data_points_int:
return 0, 0

return int(min(data_points_int) / 1024 ** 2) , int(max(data_points_int) / 1024 ** 2)

def get_cpu(pod_name: str, start: str, end: str) -> Tuple[float, float]:
# for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later.
result = subprocess.run(["mash", "--namespace=cloud_prod", "--output=csv",
f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/cpu/core_usage_time'), {{'project': '641665282868'}})| Window(Rate('10m'))| GroupBy(['pod_name', 'container_name'], Max()), TimeInterval('{start}', '{end}'), '5s')"],
capture_output=True, text=True)

data_points_float = []
data_points_by_pod_container = result.stdout.split("\n")
for data_points in data_points_by_pod_container[1:]:
data_points_split = data_points.split(",")
pn = data_points_split[4]
container_name = data_points_split[5]
if pn == pod_name and container_name == "gke-gcsfuse-sidecar":
try:
data_points_float = [float(d) for d in data_points_split[6:]]
except:
print(f"failed to parse CPU for pod {pod_name}, {start}, {end}, data {data_points_float}")

break

if not data_points_float:
return 0.0, 0.0

return round(min(data_points_float), 5) , round(max(data_points_float), 5)

def unix_to_timestamp(unix_timestamp: int) -> str:
# Convert Unix timestamp to a datetime object (aware of UTC)
datetime_utc = datetime.datetime.fromtimestamp(unix_timestamp / 1000, tz=datetime.timezone.utc)

# Format the datetime object as a string (if desired)
utc_timestamp_string = datetime_utc.strftime('%Y-%m-%d %H:%M:%S UTC')

return utc_timestamp_string

if __name__ == "__main__":
logLocations = [("gke-fio-64k-1m", "64K"), ("gke-fio-128k-1m", "128K"), ("gke-fio-1mb-1m", "1M"), ("gke-fio-100mb-50k", "100M"), ("gke-fio-200gb-1", "200G")]

Expand Down Expand Up @@ -123,6 +67,9 @@ def unix_to_timestamp(unix_timestamp: int) -> str:
"gcsfuse-no-file-cache": [record1, record2, record3, record4]
'''
output = {}
mash_installed = is_mash_installed()
if not mash_installed:
print("Mash is not installed, will skip parsing CPU and memory usage.")

for root, _, files in os.walk(LOCAL_LOGS_LOCATION):
for file in files:
Expand Down Expand Up @@ -158,7 +105,7 @@ def unix_to_timestamp(unix_timestamp: int) -> str:
r["throughput_mb_per_second"] = int(per_epoch_output_data["jobs"][0]["read"]["bw_bytes"] / (1024 ** 2))
r["start"] = unix_to_timestamp(per_epoch_output_data["jobs"][0]["job_start"])
r["end"] = unix_to_timestamp(per_epoch_output_data["timestamp_ms"])
if r["scenario"] != "local-ssd":
if r["scenario"] != "local-ssd" and mash_installed:
r["lowest_memory"], r["highest_memory"] = get_memory(r["pod_name"], r["start"], r["end"])
r["lowest_cpu"], r["highest_cpu"] = get_cpu(r["pod_name"], r["start"], r["end"])

Expand Down
7 changes: 2 additions & 5 deletions examples/fio/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,8 @@ def run_command(command: str):
f"--set fio.fileSize={fileSize}",
f"--set fio.blockSize={blockSize}"]

match fileSize:
case "200G":
commands.append("--set gcsfuse.metadataStatCacheCapacity=0")
case "100M":
commands.append("--set fio.filesPerThread=1000")
if fileSize == "100M":
commands.append("--set fio.filesPerThread=1000")

helm_command = " ".join(commands)

Expand Down
16 changes: 16 additions & 0 deletions examples/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env python

# Copyright 2018 The Kubernetes Authors.
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
86 changes: 86 additions & 0 deletions examples/utils/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/env python

# Copyright 2018 The Kubernetes Authors.
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import subprocess, datetime
from typing import Tuple

def is_mash_installed() -> bool:
try:
subprocess.run(["mash", "--version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
return True
except subprocess.CalledProcessError:
return False

def get_memory(pod_name: str, start: str, end: str) -> Tuple[int, int]:
# for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later.
result = subprocess.run(["mash", "--namespace=cloud_prod", "--output=csv",
f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/memory/used_bytes'), {{'project': '641665282868', 'metric:memory_type': 'non-evictable'}})| Window(Align('10m'))| GroupBy(['pod_name', 'container_name'], Max()), TimeInterval('{start}', '{end}'), '5s')"],
capture_output=True, text=True)

data_points_int = []
data_points_by_pod_container = result.stdout.strip().split("\n")
for data_points in data_points_by_pod_container[1:]:
data_points_split = data_points.split(",")
pn = data_points_split[4]
container_name = data_points_split[5]
if pn == pod_name and container_name == "gke-gcsfuse-sidecar":
try:
data_points_int = [int(d) for d in data_points_split[7:]]
except:
print(f"failed to parse memory for pod {pod_name}, {start}, {end}, data {data_points_int}")
break
if not data_points_int:
return 0, 0

return int(min(data_points_int) / 1024 ** 2) , int(max(data_points_int) / 1024 ** 2)

def get_cpu(pod_name: str, start: str, end: str) -> Tuple[float, float]:
# for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later.
result = subprocess.run(["mash", "--namespace=cloud_prod", "--output=csv",
f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/cpu/core_usage_time'), {{'project': '641665282868'}})| Window(Rate('10m'))| GroupBy(['pod_name', 'container_name'], Max()), TimeInterval('{start}', '{end}'), '5s')"],
capture_output=True, text=True)

data_points_float = []
data_points_by_pod_container = result.stdout.split("\n")
for data_points in data_points_by_pod_container[1:]:
data_points_split = data_points.split(",")
pn = data_points_split[4]
container_name = data_points_split[5]
if pn == pod_name and container_name == "gke-gcsfuse-sidecar":
try:
data_points_float = [float(d) for d in data_points_split[6:]]
except:
print(f"failed to parse CPU for pod {pod_name}, {start}, {end}, data {data_points_float}")

break

if not data_points_float:
return 0.0, 0.0

return round(min(data_points_float), 5) , round(max(data_points_float), 5)

def unix_to_timestamp(unix_timestamp: int) -> str:
# Convert Unix timestamp to a datetime object (aware of UTC)
datetime_utc = datetime.datetime.fromtimestamp(unix_timestamp / 1000, tz=datetime.timezone.utc)

# Format the datetime object as a string (if desired)
utc_timestamp_string = datetime_utc.strftime('%Y-%m-%d %H:%M:%S UTC')

return utc_timestamp_string

def standard_timestamp(timestamp: int) -> str:
return timestamp.split('.')[0].replace('T', ' ') + " UTC"

0 comments on commit a967261

Please sign in to comment.