Skip to content

Commit

Permalink
add Impl for llm edge benchmark suite
Browse files Browse the repository at this point in the history
Signed-off-by: yexiaochuan <[email protected]>

adapt Impl in core for llm edge benchmark suite

Signed-off-by: yexiaochuan <[email protected]>

Fix impl on singletast_learning with compression

Signed-off-by: yexiaochuan <[email protected]>

chore: trigger CI

Signed-off-by: yexiaochuan <[email protected]>

CI: fix pylint warnings

Signed-off-by: yexiaochuan <[email protected]>

CI: fix pylint warnings

Signed-off-by: yexiaochuan <[email protected]>

fix: update comments and configuration parameters

Signed-off-by: yexiaochuan <[email protected]>
  • Loading branch information
yexiaochuan committed Oct 30, 2024
1 parent 4de73b2 commit aeaa8e5
Show file tree
Hide file tree
Showing 24 changed files with 701 additions and 2 deletions.
3 changes: 3 additions & 0 deletions core/testcasecontroller/algorithm/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ def __init__(self, name, config):
self.initial_model_url: str = ""
self.modules: list = []
self.modules_list = None
self.mode: str = ""
self.quantization_type: str = ""
self.llama_quantize_path: str = ""
self._parse_config(config)
self._load_third_party_packages()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""Single Task Learning Paradigm"""

import os

import subprocess
from core.common.constant import ParadigmType
from core.testcasecontroller.algorithm.paradigm.base import ParadigmBase

Expand Down Expand Up @@ -49,6 +49,11 @@ class SingleTaskLearning(ParadigmBase):
def __init__(self, workspace, **kwargs):
ParadigmBase.__init__(self, workspace, **kwargs)
self.initial_model = kwargs.get("initial_model_url")
self.mode = kwargs.get("mode")
self.quantization_type = kwargs.get("quantization_type")
self.llama_quantize_path = kwargs.get("llama_quantize_path")
if kwargs.get("use_gpu", True):
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

def run(self):
"""
Expand All @@ -66,10 +71,43 @@ def run(self):

trained_model = self._train(job, self.initial_model)

if trained_model is None:
trained_model = self.initial_model

if self.mode == 'with_compression':
trained_model = self._compress(trained_model)

inference_result = self._inference(job, trained_model)

return inference_result, self.system_metric_info


def _compress(self, trained_model):
if not os.path.exists(trained_model):
return None

if self.llama_quantize_path is None or not os.path.exists(self.llama_quantize_path):
return None

if self.quantization_type is None:
return None

compressed_model = trained_model.replace('.gguf', f'_{self.quantization_type}.gguf')

command = [
self.llama_quantize_path,
trained_model,
compressed_model,
self.quantization_type
]

try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as _:
return trained_model

return compressed_model

def _train(self, job, initial_model):
train_output_dir = os.path.join(self.workspace, "output/train/")
os.environ["BASE_MODEL_URL"] = initial_model
Expand All @@ -84,5 +122,8 @@ def _inference(self, job, trained_model):
inference_output_dir = os.path.join(self.workspace, "output/inference/")
os.environ["RESULT_SAVED_URL"] = inference_output_dir
job.load(trained_model)
infer_res = job.predict(inference_dataset.x)
if hasattr(inference_dataset, 'need_other_info'):
infer_res = job.predict(inference_dataset)
else:
infer_res = job.predict(inference_dataset.x)
return infer_res
3 changes: 3 additions & 0 deletions core/testenvmanager/testenv/testenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self, config):
self.metrics = []
self.incremental_rounds = 2
self.dataset = None
self.use_gpu = False # default false
self._parse_config(config)

def _check_fields(self):
Expand All @@ -60,6 +61,8 @@ def _parse_config(self, config):
for k, v in config_dict.items():
if k == str.lower(Dataset.__name__):
self.dataset = Dataset(v)
elif k == 'use_gpu':
self.use_gpu = bool(v)
else:
if k in self.__dict__:
self.__dict__[k] = v
Expand Down
41 changes: 41 additions & 0 deletions examples/llm-edge-benchmark-suite/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
Large Language Model Edge Benchmark Suite: Implementation on KubeEdge-lanvs


## dataset

### Prepare Data

The data of llm-edge-benchmark-suite example structure is:

```
.
├── test_data
│ └── data.jsonl
└── train_data
└── data.jsonl
```

`train_data/data.jsonl` is empty, and the `test_data/data.jsonl` is as follows:

```
{"question": "Which of the following numbers is the smallest prime number?\nA. 0\nB. 1\nC. 2\nD. 4", "answer": "C"}
```
### prepare env

```shell
python setup.py install
```

### Run Ianvs



```shell
ianvs -f examples/llm-edge-benchmark-suite/single_task_bench/benchmarkingjob.yaml
```


```shell
ianvs -f examples/llm-edge-benchmark-suite/single_task_bench_with_compression/benchmarkingjob.yaml
```

2 changes: 2 additions & 0 deletions examples/llm-edge-benchmark-suite/single_task_bench/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Large Language Model Edge Benchmark Suite: Implementation on KubeEdge-lanvs

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
benchmarkingjob:
name: "benchmarkingjob"
workspace: "./workspace"

testenv: "./examples/llm-edge-benchmark-suite/single_task_bench_with_compression/testenv/testenv.yaml"

test_object:
type: "algorithms"
algorithms:
- name: "llama-cpp"
url: "./examples/llm-edge-benchmark-suite/single_task_bench_with_compression/testalgorithms/algorithm.yaml"

rank:
sort_by:
- { "latency": "descend" }
- { "throughput": "ascend" }
- { "mem_usage": "ascend" }
- { "prefill_latency": "ascend"}

visualization:
mode: "selected_only"
method: "print_table"

selected_dataitem:
paradigms: [ "all" ]
modules: [ "all" ]
hyperparameters: [ "all" ]
metrics: [ "latency", "throughput", "prefill_latency" ]

save_mode: "selected_and_all"
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
algorithm:
paradigm_type: "singletasklearningwithcompression"

initial_model_url: "models/qwen/qwen_1_5_0_5b.gguf"

modules:
- type: "basemodel"
name: "LlamaCppModel"
url: "./examples/llm-edge-benchmark-suite/single_task_bench_with_compression/testalgorithms/basemodel.py"
hyperparameters:
- model_path:
values:
- "models/qwen/qwen_1_5_0_5b.gguf"
- n_ctx:
values:
- 2048
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from sedna.common.class_factory import ClassFactory, ClassType
from llama_cpp import Llama
from contextlib import redirect_stderr
import os
import psutil
import time
import io
import statistics
import logging

logging.getLogger().setLevel(logging.INFO)

@ClassFactory.register(ClassType.GENERAL, alias="LlamaCppModel")
class LlamaCppModel:
def __init__(self, **kwargs):
"""
init llama-cpp
"""
model_path = kwargs.get("model_path")
if not model_path:
raise ValueError("Model path is required.")
quantization_type = kwargs.get("quantization_type", None)
if quantization_type:
logging.info(f"Using quantization type: {quantization_type}")
# Init LLM model
self.model = Llama(
model_path=model_path,
n_ctx=kwargs.get("n_ctx", 512),
n_gpu_layers=kwargs.get("n_gpu_layers", 0),
seed=kwargs.get("seed", -1),
f16_kv=kwargs.get("f16_kv", True),
logits_all=kwargs.get("logits_all", False),
vocab_only=kwargs.get("vocab_only", False),
use_mlock=kwargs.get("use_mlock", False),
embedding=kwargs.get("embedding", False),
)

def predict(self, data, input_shape=None, **kwargs):
data = data[:10]
process = psutil.Process(os.getpid())
start_time = time.time()

results = []
total_times = []
prefill_latencies = []
mem_usages = []

for prompt in data:
prompt_start_time = time.time()

f = io.StringIO()
with redirect_stderr(f):
output = self.model(
prompt=prompt,
max_tokens=kwargs.get("max_tokens", 32),
stop=kwargs.get("stop", ["Q:", "\n"]),
echo=kwargs.get("echo", True),
temperature=kwargs.get("temperature", 0.8),
top_p=kwargs.get("top_p", 0.95),
top_k=kwargs.get("top_k", 40),
repeat_penalty=kwargs.get("repeat_penalty", 1.1),
)
stdout_output = f.getvalue()

# parse timing info
timings = self._parse_timings(stdout_output)
prefill_latency = timings.get('prompt_eval_time', 0.0) # ms
generated_text = output['choices'][0]['text']

prompt_end_time = time.time()
prompt_total_time = (prompt_end_time - prompt_start_time) * 1000 # convert to ms

result_with_time = {
"generated_text": generated_text,
"total_time": prompt_total_time,
"prefill_latency": prefill_latency,
"mem_usage":process.memory_info().rss,
}

results.append(result_with_time)

predict_dict = {
"results": results,
}

return predict_dict

def _parse_timings(self, stdout_output):
import re
timings = {}
for line in stdout_output.split('\n'):
match = re.match(r'llama_print_timings:\s*(.+?)\s*=\s*([0-9\.]+)\s*ms', line)
if match:
key = match.group(1).strip()
value = float(match.group(2))

key = key.lower().replace(' ', '_')
timings[key] = value

return timings

def evaluate(self, data, model_path=None, **kwargs):
"""
evaluate model
"""
if data is None or data.x is None:
raise ValueError("Evaluation data is None.")

if model_path:
self.load(model_path)

# do predict
predict_dict = self.predict(data.x, **kwargs)

# compute metrics
metric = kwargs.get("metric")
if metric is None:
raise ValueError("No metric provided in kwargs.")

metric_name, metric_func = metric

if callable(metric_func):
metric_value = metric_func(None, predict_dict["results"])
return {metric_name: metric_value}
else:
raise ValueError(f"Metric function {metric_name} is not callable or not provided.")

def save(self, model_path):
pass

def load(self, model_url):
pass

def train(self, train_data, valid_data=None, **kwargs):
return
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os
import argparse
import logging
from modelscope import snapshot_download

logging.getLogger().setLevel(logging.INFO)

def download_model(model_id, revision, local_dir):
try:
model_dir = snapshot_download(model_id, revision=revision, cache_dir=local_dir)
logging.info(f"Model successfully downloaded to: {model_dir}")
return model_dir
except Exception as e:
logging.info(f"Error downloading model: {str(e)}")
return None

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Download a model from ModelScope")
parser.add_argument("--model_id", type=str, required=True, help="ModelScope model ID")
parser.add_argument("--revision", type=str, default="master", help="Model revision")
parser.add_argument("--local_dir", type=str, required=True, help="Local directory to save the model")

args = parser.parse_args()

download_model(args.model_id, args.revision, args.local_dir)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2023 The KubeEdge Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from sedna.common.class_factory import ClassType, ClassFactory
import statistics

__all__ = ["latency"]


@ClassFactory.register(ClassType.GENERAL, alias="latency")
def latency(y_true, y_pred):
results_list = y_pred.get('results', [])
num_requests = len(results_list)
total_latency = 0.0
for result in results_list:
# print(result)
total_latency += result['total_time']
average_latency = total_latency / num_requests
return average_latency
Loading

0 comments on commit aeaa8e5

Please sign in to comment.