Skip to content

Commit

Permalink
add fio loading test examples
Browse files Browse the repository at this point in the history
  • Loading branch information
songjiaxun committed Feb 17, 2024
1 parent df1a489 commit 028a3d7
Show file tree
Hide file tree
Showing 9 changed files with 565 additions and 14 deletions.
69 changes: 55 additions & 14 deletions examples/dlio/parse_logs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys, subprocess, os, json
import subprocess, os, json
import pprint
from typing import Tuple

LOCAL_LOGS_LOCATION = "../../bin/dlio-logs"

Expand All @@ -16,22 +17,60 @@
"end": "",
"highest_memory": 0,
"lowest_memory": 0,
"highest_cpu": 0.0,
"lowest_cpu": 0.0,
}

def get_memory(pod_name: str, start: str, end: str) -> (int, int):
start = start.split('.')[0].replace('T', ' ') + " UTC"
end = end.split('.')[0].replace('T', ' ') + " UTC"
print(pod_name, start, end)
def get_memory(pod_name: str, start: str, end: str) -> Tuple[int, int]:
# for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later.
result = subprocess.run(["mash", "--namespace=cloud_prod", "--output=csv",
f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/memory/used_bytes'), {{'cluster_name': 'gcsfuse-csi-test-cluster', 'container_name': 'gke-gcsfuse-sidecar', 'pod_name': '{pod_name}', 'project': '641665282868', 'metric:memory_type': 'non-evictable'}})| Window(Align('10m'))| GroupBy(['container_name'], Sum()), TimeInterval('{start}', '{end}'), '5s')"],
f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/memory/used_bytes'), {{'project': '641665282868', 'metric:memory_type': 'non-evictable'}})| Window(Align('10m'))| GroupBy(['pod_name', 'container_name'], Max()), TimeInterval('{start}', '{end}'), '5s')"],
capture_output=True, text=True)

data_points = result.stdout.split("\n")[1].split(",")[7:]
data_points_int = [int(d) for d in data_points]
data_points_int = []
data_points_by_pod_container = result.stdout.strip().split("\n")
for data_points in data_points_by_pod_container[1:]:
data_points_split = data_points.split(",")
pn = data_points_split[4]
container_name = data_points_split[5]
if pn == pod_name and container_name == "gke-gcsfuse-sidecar":
try:
data_points_int = [int(d) for d in data_points_split[7:]]
except:
print(f"failed to parse memory for pod {pod_name}, {start}, {end}, data {data_points_int}")
break
if not data_points_int:
return 0, 0

return int(min(data_points_int) / 1024 / 1024) , int(max(data_points_int) / 1024 / 1024)
return int(min(data_points_int) / 1024 ** 2) , int(max(data_points_int) / 1024 ** 2)

def get_cpu(pod_name: str, start: str, end: str) -> Tuple[float, float]:
# for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later.
result = subprocess.run(["mash", "--namespace=cloud_prod", "--output=csv",
f"Query(Fetch(Raw('cloud.kubernetes.K8sContainer', 'kubernetes.io/container/cpu/core_usage_time'), {{'project': '641665282868'}})| Window(Rate('10m'))| GroupBy(['pod_name', 'container_name'], Max()), TimeInterval('{start}', '{end}'), '5s')"],
capture_output=True, text=True)

data_points_float = []
data_points_by_pod_container = result.stdout.split("\n")
for data_points in data_points_by_pod_container[1:]:
data_points_split = data_points.split(",")
pn = data_points_split[4]
container_name = data_points_split[5]
if pn == pod_name and container_name == "gke-gcsfuse-sidecar":
try:
data_points_float = [float(d) for d in data_points_split[6:]]
except:
print(f"failed to parse CPU for pod {pod_name}, {start}, {end}, data {data_points_float}")

break

if not data_points_float:
return 0.0, 0.0

return round(min(data_points_float), 5) , round(max(data_points_float), 5)

def standard_timestamp(timestamp: int) -> str:
return timestamp.split('.')[0].replace('T', ' ') + " UTC"

if __name__ == "__main__":
bucketNames = ["gke-dlio-unet3d-100kb-500k", "gke-dlio-unet3d-150mb-5k", "gke-dlio-unet3d-3mb-100k", "gke-dlio-unet3d-500kb-1m"]
Expand Down Expand Up @@ -94,9 +133,11 @@ def get_memory(pod_name: str, start: str, end: str) -> (int, int):
r["duration"] = int(float(per_epoch_stats_data[str(i+1)]["duration"]))
r["train_throughput_samples_per_second"] = int(summary_data["metric"]["train_throughput_samples_per_second"][i])
r["train_throughput_mb_per_second"] = int(r["train_throughput_samples_per_second"] * int(output[key]["mean_file_size"]) / (1024 ** 2))
r["start"] = per_epoch_stats_data[str(i+1)]["start"]
r["end"] = per_epoch_stats_data[str(i+1)]["end"]
r["lowest_memory"], r["highest_memory"] = get_memory(r["pod_name"], r["start"], r["end"])
r["start"] = standard_timestamp(per_epoch_stats_data[str(i+1)]["start"])
r["end"] = standard_timestamp(per_epoch_stats_data[str(i+1)]["end"])
if r["scenario"] != "local-ssd":
r["lowest_memory"], r["highest_memory"] = get_memory(r["pod_name"], r["start"], r["end"])
r["lowest_cpu"], r["highest_cpu"] = get_cpu(r["pod_name"], r["start"], r["end"])

pprint.pprint(r)

Expand All @@ -109,7 +150,7 @@ def get_memory(pod_name: str, start: str, end: str) -> (int, int):
scenario_order = ["local-ssd", "gcsfuse-no-file-cache", "gcsfuse-file-cache"]

output_file = open("./output.csv", "a")
output_file.write("File Size,File #,Total Size (GB),Batch Size,Scenario,Epoch,Duration (s),GPU Utilization (%),Throughput (sample/s),Throughput (MB/s),Throughput over Local SSD (%),GCSFuse Lowest Memory (MB),GCSFuse Highest Memory (MB),Pod,Start,End\n")
output_file.write("File Size,File #,Total Size (GB),Batch Size,Scenario,Epoch,Duration (s),GPU Utilization (%),Throughput (sample/s),Throughput (MB/s),Throughput over Local SSD (%),GCSFuse Lowest Memory (MB),GCSFuse Highest Memory (MB),GCSFuse Lowest CPU (core),GCSFuse Highest CPU (core),Pod,Start,End\n")

for key in output_order:
if key not in output:
Expand All @@ -122,6 +163,6 @@ def get_memory(pod_name: str, start: str, end: str) -> (int, int):
r = record_set["records"][scenario][i]
r["throughput_over_local_ssd"] = round(r["train_throughput_mb_per_second"] / record_set["records"]["local-ssd"][i]["train_throughput_mb_per_second"] * 100, 2)
output_file.write(f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario},")
output_file.write(f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['pod_name']},{r['start']},{r['end']}\n")
output_file.write(f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']}\n")

output_file.close()
52 changes: 52 additions & 0 deletions examples/fio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# FIO Loading Tests (TODO: current doc is for DLIO, modify it for FIO)

## Prerequisites

### Create a new node pool

For an existing GKE cluster, use the following command to create a new node pool. Make sure the cluster has the [Workload Identity feature enabled](https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity#enable).

> In this early stage test, the managed GCS FUSE CSI driver feature is disabled, and the driver is manually installed.
```bash
# Replace the cluster name and zone.
gcloud container node-pools create large-pool \
--cluster cluster-1-29-us-west1 \
--ephemeral-storage-local-ssd count=16 \
--machine-type n2-standard-96 \
--zone us-west1-c \
--num-nodes 3
```

### Set up GCS bucket

Create a GCS bucket using `Location type`: `Region`, and select the same region where your cluster runs. Follow the [GKE documentation](https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/cloud-storage-fuse-csi-driver#authentication) to configure the access. This example uses the default Kubernetes service account in the default Kubernetes namespace.

### Install Helm

The example uses Helm charts to manage the applications. Follow the [Helm documentation](https://helm.sh/docs/intro/install/#from-script) to install Helm.

## FIO Loading Tests

Change the directory to `./examples/fio`. Run the following commands to run the loading tests. Each `helm install` command will deploy a Pod to run the test, and upload logs to the bucket.

### Run the tests

```bash
python ./run_tests.py
```

### Delete the tests

```bash
python ./delete_tests.py
```

## Parsing the test results

Run the following python script to parse the logs. The results will be saved in `./examples/fio/output.csv`.

```bash
cd ./examples/fio
python ./parse_logs.py
```
23 changes: 23 additions & 0 deletions examples/fio/delete_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import subprocess

def run_command(command: str):
result = subprocess.run(command.split(" "), capture_output=True, text=True)
print(result.stdout)
print(result.stderr)

fileSizes = ["64K", "128K", "1M", "100M", "200G"]
scenarios = ["gcsfuse-file-cache", "gcsfuse-no-file-cache", "local-ssd"]

for fileSize in fileSizes:
if fileSize in ["100M", "200G"]:
run_command("gcloud container clusters get-credentials --zone us-central1-a gcsfuse-csi-test-cluster")
else:
run_command("gcloud container clusters get-credentials --zone us-west1-c cluster-1-29-us-west1")

for readType in ["read", "randread"]:
for scenario in scenarios:
if readType == "randread" and fileSize in ["64K", "128K"]:
continue

helm_command = f"helm uninstall fio-loading-test-{fileSize.lower()}-{readType}-{scenario}"
run_command(helm_command)
23 changes: 23 additions & 0 deletions examples/fio/loading-test/.helmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/
5 changes: 5 additions & 0 deletions examples/fio/loading-test/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
apiVersion: v2
name: fio-loading-test
description: A Helm chart for FIO loading test
type: application
version: 0.1.0
168 changes: 168 additions & 0 deletions examples/fio/loading-test/templates/fio-tester.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
apiVersion: v1
kind: Pod
metadata:
name: fio-tester-{{ .Values.fio.readType }}-{{ lower .Values.fio.fileSize }}-{{ lower .Values.fio.blockSize }}-{{ .Values.scenario }}
{{- if ne .Values.scenario "local-ssd" }}
annotations:
gke-gcsfuse/volumes: "true"
gke-gcsfuse/cpu-limit: "0"
gke-gcsfuse/memory-limit: "0"
gke-gcsfuse/ephemeral-storage-limit: "0"
{{- end }}
spec:
restartPolicy: Never
nodeSelector:
cloud.google.com/gke-ephemeral-storage-local-ssd: "true"
node.kubernetes.io/instance-type: {{ .Values.nodeType }}
containers:
- name: fio-tester
image: {{ .Values.image }}
securityContext: # for cache dropping in the benchmarking tests.
privileged: true
resources:
limits:
cpu: {{ .Values.resourceLimits.cpu }}
memory: {{ .Values.resourceLimits.memory }}
requests:
cpu: "30"
memory: 300Gi
ephemeral-storage: 7900Gi
command:
- "/bin/sh"
- "-c"
- |
echo "Install dependencies..."
apt-get update
apt-get install -y libaio-dev gcc make git time wget
{{ if eq .Values.scenario "local-ssd" }}
echo "Installing gsutil..."
apt-get install -y apt-transport-https ca-certificates gnupg curl
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | gpg --dearmor -o /usr/share/keyrings/cloud.google.gpg
echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list
apt-get update && apt-get install google-cloud-cli
gsutil -m cp -R gs://{{ .Values.bucketName }}{{- if and (ne .Values.fio.fileSize "100M") (ne .Values.fio.fileSize "200G") }}/{{ .Values.fio.fileSize }}{{- end }}/* /data
echo "Sleeping 5 minutes to wait for Local SSD RAID to populate data."
sleep 300
{{ end }}
# We are building fio from source because of the issue: https://github.com/axboe/fio/issues/1668.
# The sed command below is to address internal bug b/309563824.
# As recorded in this bug, fio by-default supports
# clat percentile values to be calculated accurately upto only
# 2^(FIO_IO_U_PLAT_GROUP_NR + 5) ns = 17.17 seconds.
# (with default value of FIO_IO_U_PLAT_GROUP_NR = 29). This change increases it upto 32, to allow
# latencies upto 137.44s to be calculated accurately.
git clone -b fio-3.36 https://github.com/axboe/fio.git
cd fio
sed -i 's/define \+FIO_IO_U_PLAT_GROUP_NR \+\([0-9]\+\)/define FIO_IO_U_PLAT_GROUP_NR 32/g' stat.h
./configure && make && make install
cd ..
echo "Preparing fio config file..."
filename=/read_cache_load_test.fio
{{ if eq .Values.fio.fileSize "200G" }}
cat > $filename << EOF
[global]
ioengine=libaio
direct=1
fadvise_hint=0
iodepth=64
invalidate=1
nrfiles=1
thread=1
openfiles=1
group_reporting=1
create_serialize=0
allrandrepeat=1
numjobs=100
filename=/data/0
[Workload]
bs=1M
filesize=200G
size=2G
rw={{ .Values.fio.readType }}
offset=0
offset_increment=1%
EOF
{{ else }}
wget -O $filename https://raw.githubusercontent.com/GoogleCloudPlatform/gcsfuse/read_cache_release/perfmetrics/scripts/job_files/read_cache_load_test.fio
{{ end }}
echo "Setup default values..."
epoch=4
no_of_files_per_thread={{ .Values.fio.filesPerThread }}
read_type={{ .Values.fio.readType }}
pause_in_seconds=20
block_size={{ .Values.fio.blockSize }}
file_size={{ .Values.fio.fileSize }}
num_of_threads=50
workload_dir=/data
# Cleaning the pagecache, dentries and inode cache before the starting the workload.
echo "Drop page cache..."
echo 3 > /proc/sys/vm/drop_caches
# Specially for gcsfuse mounted dir: the purpose of this approach is to efficiently
# populate the gcsfuse metadata cache by utilizing the list call, which internally
# works like bulk stat call rather than making individual stat calls.
# And to reduce the logs redirecting the command standard-output to /dev/null.
echo "List workload dir..."
time ls -R $workload_dir 1> /dev/null
echo "Run fio tests..."
mkdir -p /data/fio-output/{{ .Values.scenario }}/$read_type
for i in $(seq $epoch); do
echo "[Epoch ${i}] start time:" `date +%s`
free -mh # Memory usage before workload start.
NUMJOBS=$num_of_threads NRFILES=$no_of_files_per_thread FILE_SIZE=$file_size BLOCK_SIZE=$block_size READ_TYPE=$read_type DIR=$workload_dir fio ${filename} --alloc-size=1048576 --output-format=json --output="/data/fio-output/{{ .Values.scenario }}/${read_type}/epoch${i}.json"
free -mh # Memory usage after workload completion.
echo "[Epoch ${i}] end time:" `date +%s`
# To free pagecache.
# Intentionally not clearing dentries and inodes: clearing them
# will necessitate the repopulation of the type cache in gcsfuse 2nd epoch onwards.
# Since we use "ls -R workload_dir" to populate the cache (sort of hack to fill the cache quickly)
# efficiently in the first epoch, it does not populate the negative
# entry for the stat cache.
# So just to stop the execution of “ls -R workload_dir” command at the start
# of every epoch, not clearing the inodes.
echo 1 > /proc/sys/vm/drop_caches
sleep $pause_in_seconds
done
{{ if eq .Values.scenario "local-ssd" }}
gsutil -m cp -R /data/fio-output/local-ssd gs://{{ .Values.bucketName }}{{- if and (ne .Values.fio.fileSize "100M") (ne .Values.fio.fileSize "200G") }}/{{ .Values.fio.fileSize }}{{- end }}/fio-output
{{ end }}
echo "fio job completed!"
volumeMounts:
- name: dshm
mountPath: /dev/shm
- name: data-vol
mountPath: /data
volumes:
- name: dshm
emptyDir:
medium: Memory
- name: data-vol
{{- if eq .Values.scenario "local-ssd" }}
emptyDir: {}
{{- else if eq .Values.scenario "gcsfuse-file-cache" }}
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: {{ .Values.bucketName }}
mountOptions: implicit-dirs,{{- if and (ne .Values.fio.fileSize "100M") (ne .Values.fio.fileSize "200G") }}only-dir={{ .Values.fio.fileSize }},{{- end }}stat-cache-capacity={{ .Values.gcsfuse.statCacheCapacity }},metadata-cache:ttl-secs:{{ .Values.gcsfuse.metadataCacheTtlSecs }},file-cache:max-size-in-mb:{{ .Values.gcsfuse.fileCacheMaxSizeInMb }},file-cache:cache-file-for-range-read:{{ .Values.gcsfuse.fileCacheForRangeRead }}
{{- else }}
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: {{ .Values.bucketName }}
mountOptions: implicit-dirs,{{- if and (ne .Values.fio.fileSize "100M") (ne .Values.fio.fileSize "200G") }}only-dir={{ .Values.fio.fileSize }},{{- end }}stat-cache-capacity={{ .Values.gcsfuse.statCacheCapacity }},metadata-cache:ttl-secs:{{ .Values.gcsfuse.metadataCacheTtlSecs }}
{{- end }}
Loading

0 comments on commit 028a3d7

Please sign in to comment.