diff --git a/core/common/constant.py b/core/common/constant.py index aa111ddb..95938014 100644 --- a/core/common/constant.py +++ b/core/common/constant.py @@ -26,6 +26,8 @@ class DatasetFormat(Enum): CSV = "csv" TXT = "txt" JSON = "json" + JSONL = "jsonl" + JSONFORLLM = "jsonforllm" class ParadigmType(Enum): @@ -39,6 +41,7 @@ class ParadigmType(Enum): LIFELONG_LEARNING = "lifelonglearning" FEDERATED_LEARNING = "federatedlearning" FEDERATED_CLASS_INCREMENTAL_LEARNING = "federatedclassincrementallearning" + JOINT_INFERENCE = "jointinference" class ModuleType(Enum): @@ -48,6 +51,13 @@ class ModuleType(Enum): BASEMODEL = "basemodel" + # JOINT INFERENCE + EDGEMODEL = "edgemodel" + CLOUDMODEL = "cloudmodel" + + # Dataset Preprocessor + DATA_PROCESSOR = "dataset_processor" + # HEM HARD_EXAMPLE_MINING = "hard_example_mining" diff --git a/core/common/utils.py b/core/common/utils.py index 38b60526..e7ad6522 100644 --- a/core/common/utils.py +++ b/core/common/utils.py @@ -36,8 +36,12 @@ def is_local_dir(url): def get_file_format(url): """Get file format of the url.""" - return os.path.splitext(url)[-1][1:] + # Check if the url + if os.path.basename(url) == "metadata.json": + return "jsonforllm" + # Check if the url + return os.path.splitext(url)[-1][1:] def parse_kwargs(func, **kwargs): """Get valid parameters of the func in kwargs.""" diff --git a/core/storymanager/rank/rank.py b/core/storymanager/rank/rank.py index 5a558692..ac985c88 100644 --- a/core/storymanager/rank/rank.py +++ b/core/storymanager/rank/rank.py @@ -138,7 +138,6 @@ def _sort_all_df(self, all_df, all_metric_names): if metric_name not in all_metric_names: continue - print(metric_name) sort_metric_list.append(metric_name) is_ascend_list.append(ele.get(metric_name) == "ascend") @@ -234,15 +233,12 @@ def _draw_pictures(self, test_cases, test_results): out_put = test_case.output_dir test_result = test_results[test_case.id][0] matrix = test_result.get("Matrix") - # print(out_put) for key in matrix.keys(): draw_heatmap_picture(out_put, key, matrix[key]) def _prepare(self, test_cases, test_results, output_dir): all_metric_names = self._get_all_metric_names(test_results) - print(f"in_prepare all_metric_names: {all_metric_names}") all_hps_names = self._get_all_hps_names(test_cases) - print(f"in_prepare all_hps_names: {all_hps_names}") all_module_types = self._get_all_module_types(test_cases) self.all_df_header = [ "algorithm", *all_metric_names, diff --git a/core/storymanager/visualization/visualization.py b/core/storymanager/visualization/visualization.py index 74cfa683..49a2c128 100644 --- a/core/storymanager/visualization/visualization.py +++ b/core/storymanager/visualization/visualization.py @@ -23,7 +23,7 @@ def print_table(rank_file): """ print rank of the test""" with open(rank_file, "r", encoding="utf-8") as file: - table = from_csv(file) + table = from_csv(file, delimiter=",") print(table) def draw_heatmap_picture(output, title, matrix): @@ -40,7 +40,6 @@ def draw_heatmap_picture(output, title, matrix): plt.title(title, fontsize=15) plt.colorbar(format='%.2f') output_dir = os.path.join(output, f"output/{title}-heatmap.png") - #print(output_dir) plt.savefig(output_dir) plt.show() diff --git a/core/testcasecontroller/algorithm/algorithm.py b/core/testcasecontroller/algorithm/algorithm.py index 5bad73a9..22955d48 100644 --- a/core/testcasecontroller/algorithm/algorithm.py +++ b/core/testcasecontroller/algorithm/algorithm.py @@ -25,7 +25,8 @@ MultiedgeInference, LifelongLearning, FederatedLearning, - FederatedClassIncrementalLearning + FederatedClassIncrementalLearning, + JointInference ) from core.testcasecontroller.generation_assistant import get_full_combinations @@ -120,6 +121,9 @@ def paradigm(self, workspace: str, **kwargs): if self.paradigm_type == ParadigmType.FEDERATED_CLASS_INCREMENTAL_LEARNING.value: return FederatedClassIncrementalLearning(workspace, **config) + if self.paradigm_type == ParadigmType.JOINT_INFERENCE.value: + return JointInference(workspace, **config) + return None def _check_fields(self): diff --git a/core/testcasecontroller/algorithm/module/module.py b/core/testcasecontroller/algorithm/module/module.py index 1772725e..464bfe56 100644 --- a/core/testcasecontroller/algorithm/module/module.py +++ b/core/testcasecontroller/algorithm/module/module.py @@ -72,6 +72,7 @@ def _check_fields(self): if not isinstance(self.url, str): raise ValueError(f"module url({self.url}) must be string type.") + #pylint: disable=too-many-branches def get_module_instance(self, module_type): """ get function of algorithm module by using module type @@ -86,7 +87,6 @@ def get_module_instance(self, module_type): function """ - print(f'hyperparameters_list: {self.hyperparameters_list}') class_factory_type = ClassType.GENERAL if module_type in [ModuleType.HARD_EXAMPLE_MINING.value]: class_factory_type = ClassType.HEM @@ -110,13 +110,11 @@ def get_module_instance(self, module_type): elif module_type in [ModuleType.AGGREGATION.value]: class_factory_type = ClassType.FL_AGG agg = None - print(self.url) if self.url : try: utils.load_module(self.url) agg = ClassFactory.get_cls( type_name=class_factory_type, t_cls_name=self.name)(**self.hyperparameters) - print(agg) except Exception as err: raise RuntimeError(f"module(type={module_type} loads class(name={self.name}) " f"failed, error: {err}.") from err @@ -125,10 +123,17 @@ def get_module_instance(self, module_type): if self.url: try: utils.load_module(self.url) - # pylint: disable=E1134 - func = ClassFactory.get_cls( - type_name=class_factory_type, t_cls_name=self.name)(**self.hyperparameters) + + if class_factory_type == ClassType.HEM: + func = {"method": self.name, "param":self.hyperparameters} + else: + func = ClassFactory.get_cls( + type_name=class_factory_type, + t_cls_name=self.name + )(**self.hyperparameters) + return func + except Exception as err: raise RuntimeError(f"module(type={module_type} loads class(name={self.name}) " f"failed, error: {err}.") from err diff --git a/core/testcasecontroller/algorithm/paradigm/__init__.py b/core/testcasecontroller/algorithm/paradigm/__init__.py index 5c50b243..68267798 100644 --- a/core/testcasecontroller/algorithm/paradigm/__init__.py +++ b/core/testcasecontroller/algorithm/paradigm/__init__.py @@ -18,3 +18,4 @@ from .multiedge_inference import MultiedgeInference from .lifelong_learning import LifelongLearning from .federated_learning import FederatedLearning, FederatedClassIncrementalLearning +from .joint_inference import JointInference diff --git a/core/testcasecontroller/algorithm/paradigm/base.py b/core/testcasecontroller/algorithm/paradigm/base.py index e5178e29..4b11236f 100644 --- a/core/testcasecontroller/algorithm/paradigm/base.py +++ b/core/testcasecontroller/algorithm/paradigm/base.py @@ -18,6 +18,7 @@ from sedna.core.incremental_learning import IncrementalLearning from sedna.core.lifelong_learning import LifelongLearning +from sedna.core.joint_inference import JointInference from core.common.constant import ModuleType, ParadigmType from .sedna_federated_learning import FederatedLearning @@ -76,6 +77,7 @@ def _get_module_instances(self): module_instances.update({module_type: func}) return module_instances + # pylint: disable=too-many-return-statements def build_paradigm_job(self, paradigm_type): """ build paradigm job instance according to paradigm type. @@ -103,7 +105,10 @@ def build_paradigm_job(self, paradigm_type): if paradigm_type == ParadigmType.LIFELONG_LEARNING.value: return LifelongLearning( - estimator=self.module_instances.get(ModuleType.BASEMODEL.value), + seen_estimator=self.module_instances.get( + ModuleType.BASEMODEL.value + ), + unseen_estimator=None, task_definition=self.module_instances.get( ModuleType.TASK_DEFINITION.value ), @@ -144,4 +149,15 @@ def build_paradigm_job(self, paradigm_type): estimator=self.module_instances.get(ModuleType.BASEMODEL.value) ) + if paradigm_type == ParadigmType.JOINT_INFERENCE.value: + return JointInference( + estimator=self.module_instances.get( + ModuleType.EDGEMODEL.value), + cloud=self.module_instances.get( + ModuleType.CLOUDMODEL.value), + hard_example_mining=self.module_instances.get( + ModuleType.HARD_EXAMPLE_MINING.value), + LCReporter_enable=False + ) + return None diff --git a/core/testcasecontroller/algorithm/paradigm/joint_inference/__init__.py b/core/testcasecontroller/algorithm/paradigm/joint_inference/__init__.py new file mode 100644 index 00000000..c61116ba --- /dev/null +++ b/core/testcasecontroller/algorithm/paradigm/joint_inference/__init__.py @@ -0,0 +1,16 @@ +# Copyright 2022 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=missing-module-docstring +from .joint_inference import JointInference diff --git a/core/testcasecontroller/algorithm/paradigm/joint_inference/joint_inference.py b/core/testcasecontroller/algorithm/paradigm/joint_inference/joint_inference.py new file mode 100644 index 00000000..0a95b102 --- /dev/null +++ b/core/testcasecontroller/algorithm/paradigm/joint_inference/joint_inference.py @@ -0,0 +1,188 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Cloud-Edge Joint Inference""" + +import os +from tqdm import tqdm + +from core.common.log import LOGGER +from core.common.constant import ParadigmType +from core.testcasecontroller.algorithm.paradigm.base import ParadigmBase + +class JointInference(ParadigmBase): + """ + Cloud-Edge-JointInference: + provide the flow of multi-edge inference paradigm. + Notes: + 1. Ianvs serves as testing tools for test objects, e.g., algorithms. + 2. Ianvs does NOT include code directly on test object. + 3. Algorithms serve as typical test objects in Ianvs + and detailed algorithms are thus NOT included in this Ianvs python file. + 4. As for the details of example test objects, e.g., algorithms, + please refer to third party packages in Ianvs example. + For example, AI workflow and interface pls refer to sedna + (sedna docs: https://sedna.readthedocs.io/en/latest/api/lib/index.html), + and module implementation pls refer to `examples' test algorithms`, + e.g., basemodel.py, hard_example_mining.py. + + Parameters + --------- + workspace: string + the output required for multi-edge inference paradigm. + kwargs: dict + config required for the test process of joint inference paradigm, + e.g.: hard_example_mining_mode + + """ + + def __init__(self, workspace, **kwargs): + ParadigmBase.__init__(self, workspace, **kwargs) + self.inference_dataset = None + self.kwargs = kwargs + self.hard_example_mining_mode = kwargs.get( + "hard_example_mining_mode", + "mining-then-inference" + ) + + def set_config(self): + """ Set the configuration for the joint inference paradigm. + + Raises + ------ + KeyError + If required modules are not provided. + """ + + + inference_output_dir = os.path.dirname(self.workspace) + os.environ["RESULT_SAVED_URL"] = inference_output_dir + os.makedirs(inference_output_dir, exist_ok=True) + + LOGGER.info("Loading dataset") + + self.inference_dataset = self.dataset.load_data( + self.dataset.test_data_info, + "inference" + ) + + dataset_processor = self.module_instances.get("dataset_processor", None) + if callable(dataset_processor): + self.inference_dataset = dataset_processor(self.inference_dataset) + + # validate module instances + required_modules = {"edgemodel", "cloudmodel", "hard_example_mining"} + + if not required_modules.issubset(set(self.module_instances.keys())): + raise KeyError( + f"Required modules: {required_modules}, " + f"but got: {self.module_instances.keys()}" + ) + + # if hard example mining is OracleRouter, + # add the edgemodel and cloudmodel object to its kwargs so that it can use them. + mining = self.module_instances["hard_example_mining"] + param = mining.get("param") + if mining.get("method", None) == "OracleRouter": + param["edgemodel"] = self.module_instances["edgemodel"] + param["cloudmodel"] = self.module_instances["cloudmodel"] + + def run(self): + """ + run the test flow of joint inference paradigm. + + Returns + ------ + inference_result: list + system_metric_info: dict + information needed to compute system metrics. + + """ + self.set_config() + + job = self.build_paradigm_job(ParadigmType.JOINT_INFERENCE.value) + + inference_result = self._inference(job) + + self._cleanup(job) + + return inference_result, self.system_metric_info + + def _cleanup(self, job): + """Call module's cleanup method to release resources + + Parameters + ---------- + job : Sedna JointInference + Sedna JointInference API + """ + + LOGGER.info("Release models") + # release module resources + for module in self.module_instances.values(): + if hasattr(module, "cleanup"): + module.cleanup() + + # Special call is required for hard example mining module + # since it is instantiated within the job. + mining_instance = job.hard_example_mining_algorithm + if hasattr(mining_instance, "cleanup"): + mining_instance.cleanup() + + del job + + def _inference(self, job): + """Inference each data in Inference Dataset + + Parameters + ---------- + job : Sedna JointInference + Sedna JointInference API + + Returns + ------- + tuple + Inference Result with the format of `(is_hard_example, res, edge_result, cloud_result)` + """ + results = [] + + cloud_count, edge_count = 0,0 + + LOGGER.info("Inference Start") + + pbar = tqdm( + self.inference_dataset.x, + total=len(self.inference_dataset.x), + ncols=100 + ) + + for data in pbar: + # inference via sedna JointInference API + infer_res = job.inference( + data, + mining_mode=self.hard_example_mining_mode + ) + + if infer_res[2]: + edge_count += 1 + elif infer_res[3]: + cloud_count += 1 + + pbar.set_postfix({"Edge": edge_count, "Cloud": cloud_count}) + + results.append(infer_res) + + LOGGER.info("Inference Finished") + + return results diff --git a/core/testcasecontroller/metrics/metrics.py b/core/testcasecontroller/metrics/metrics.py index 8d3f9f52..bc9ed293 100644 --- a/core/testcasecontroller/metrics/metrics.py +++ b/core/testcasecontroller/metrics/metrics.py @@ -52,7 +52,6 @@ def compute(key, matrix): """ Compute BWT and FWT scores for a given matrix. """ - print(f"compute function: key={key}, matrix={matrix}, type(matrix)={type(matrix)}") length = len(matrix) accuracy = 0.0 @@ -147,7 +146,6 @@ def forget_rate_func(system_metric_info: dict): """ info = system_metric_info.get(SystemMetricType.FORGET_RATE.value) forget_rate = np.mean(info) - print(f"forget_rate: {forget_rate}") return round(forget_rate, 3) @@ -169,7 +167,6 @@ def get_metric_func(metric_dict: dict): name = metric_dict.get("name") url = metric_dict.get("url") - print(f"get metric func: name={name}, url={url}") if url: try: load_module(url) diff --git a/core/testcasecontroller/testcase/testcase.py b/core/testcasecontroller/testcase/testcase.py index 11622aef..b9e20154 100644 --- a/core/testcasecontroller/testcase/testcase.py +++ b/core/testcasecontroller/testcase/testcase.py @@ -66,7 +66,6 @@ def run(self, workspace): test_env_config = {} # pylint: disable=C0103 for k, v in self.test_env.__dict__.items(): - print(k,v) test_env_config[k] = v self.output_dir = self._get_output_dir(workspace) @@ -112,7 +111,6 @@ def compute_metrics(self, paradigm_result, dataset, **kwargs): metric_res = {} system_metric_types = [e.value for e in SystemMetricType.__members__.values()] for metric_name, metric_func in metric_funcs.items(): - #print(metric_name) if metric_name in system_metric_types: metric_res[metric_name] = metric_func(kwargs) else: diff --git a/core/testenvmanager/dataset/dataset.py b/core/testenvmanager/dataset/dataset.py index e07f5601..dffbf293 100644 --- a/core/testenvmanager/dataset/dataset.py +++ b/core/testenvmanager/dataset/dataset.py @@ -18,12 +18,18 @@ import tempfile import pandas as pd -from sedna.datasources import CSVDataParse, TxtDataParse, JSONDataParse +from sedna.datasources import ( + CSVDataParse, + TxtDataParse, + JSONDataParse, + JsonlDataParse, + JSONMetaDataParse +) from core.common import utils from core.common.constant import DatasetFormat - +# pylint: disable=too-many-instance-attributes class Dataset: """ Data: @@ -38,12 +44,28 @@ class Dataset: def __init__(self, config): self.train_url: str = "" self.test_url: str = "" + self.train_index: str = "" + self.test_index: str = "" + self.train_data: str = "" + self.test_data: str = "" + self.train_data_info: str = "" + self.test_data_info: str = "" self.label: str = "" self._parse_config(config) def _check_fields(self): - self._check_dataset_url(self.train_url) - self._check_dataset_url(self.test_url) + if self.train_index: + self._check_dataset_url(self.train_index) + if self.test_index: + self._check_dataset_url(self.test_index) + if self.train_data: + self._check_dataset_url(self.train_data) + if self.test_data: + self._check_dataset_url(self.test_data) + if self.train_data_info: + self._check_dataset_url(self.train_data_info) + if self.test_data_info: + self._check_dataset_url(self.test_data_info) def _parse_config(self, config): for attr, value in config.items(): @@ -108,6 +130,22 @@ def _process_index_file(self, file_url): return None + def _process_data_file(self, file_url): + file_format = utils.get_file_format(file_url) + if file_format == DatasetFormat.JSONL.value: + return file_url + + return None + + def _process_data_info_file(self, file_url): + file_format = utils.get_file_format(file_url) + if file_format == DatasetFormat.JSONFORLLM.value: + return file_url + raise ValueError( + f"The Data Info File must be named as `data_info.json`, " + f"but the current file is {file_url}." + ) + def process_dataset(self): """ process dataset: @@ -116,9 +154,26 @@ def process_dataset(self): in the index file(e.g.: txt index file). """ + if self.train_index: + self.train_url = self._process_index_file(self.train_index) + elif self.train_data: + self.train_url = self._process_data_file(self.train_data) + elif self.train_data_info: + self.train_url = self._process_data_info_file(self.train_data_info) + # raise NotImplementedError('to be done') + else: + raise NotImplementedError('not one of train_index/train_data/train_data_info') + + if self.test_index: + self.test_url = self._process_index_file(self.test_index) + elif self.test_data: + self.test_url = self._process_data_file(self.test_data) + elif self.test_data_info: + self.test_url = self._process_data_info_file(self.test_data_info) + # raise NotImplementedError('to be done') + else: + raise NotImplementedError('not one of test_index/test_data/test_data_info') - self.train_url = self._process_index_file(self.train_url) - self.test_url = self._process_index_file(self.test_url) # pylint: disable=too-many-arguments def split_dataset( @@ -489,9 +544,8 @@ def _hard_example_splitting( return data_files @classmethod - def load_data( - cls, file: str, data_type: str, label=None, use_raw=False, feature_process=None - ): + def load_data(cls, file: str, data_type: str, label=None, + use_raw=False, feature_process=None, **kwargs): """ load data @@ -523,11 +577,18 @@ def load_data( if data_format == DatasetFormat.TXT.value: data = TxtDataParse(data_type=data_type, func=feature_process) - # print(file) data.parse(file, use_raw=use_raw) if data_format == DatasetFormat.JSON.value: data = JSONDataParse(data_type=data_type, func=feature_process) data.parse(file) + if data_format == DatasetFormat.JSONL.value: + data = JsonlDataParse(data_type=data_type, func=feature_process) + data.parse(file) + + if data_format == DatasetFormat.JSONFORLLM.value: + data = JSONMetaDataParse(data_type=data_type, func=feature_process) + data.parse(file, **kwargs) + return data diff --git a/core/testenvmanager/testenv/testenv.py b/core/testenvmanager/testenv/testenv.py index d9916d1a..9960dab3 100644 --- a/core/testenvmanager/testenv/testenv.py +++ b/core/testenvmanager/testenv/testenv.py @@ -65,8 +65,7 @@ def _parse_config(self, config): if k == str.lower(Dataset.__name__): self.dataset = Dataset(v) else: - if k in self.__dict__: - self.__dict__[k] = v + self.__dict__[k] = v self._check_fields() diff --git a/examples/cloud-edge-collaborative-inference-for-llm/README.md b/examples/cloud-edge-collaborative-inference-for-llm/README.md new file mode 100644 index 00000000..85705406 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/README.md @@ -0,0 +1,337 @@ +# Quick Start + +## Introduction + +This example aims to implement benchmarks for **LLM in cloud-edge collaborative inference scenario**. + +### Why LLM need cloud-edge collaborative inference? + +Currently, such LLMs have billions or even trillions of parameters, requiring massive computing power for training and deployment. Therefore, they are often deployed in cloud computing centers and serving via APIs. However, such service paradigm faces many drawbacks. + +- Time to First Token(TTFT) is quite long, due to transmission delays from the distance to the data center. +- Uploading user data to the cloud may lead to additional privacy risks and retraining risks. +- Calling APIs of the most advanced models (GPT-4o *et.al*) is often very expensive. +- Not all tasks require high-performance models to complete. + +These issues can be addressed by introducing Edge Computing, which is an architecture featured by low-latency, privacy security, energy-efficient. + +By deploying small-scale LLMs on edge devices like mobile phones, PCs and communication base station, users will have low-latency and privacy-secure services. Empirically, models with fewer than 3B parameters are possible to be deployed on the aforementioned edge devices. However, due to Scaling Law, smaller models perform worse than larger models, so they can only maintain good performance on certain tasks. + +Thus, smaller models on edge should collaborate with larger models on cloud to achieve better performance on other tasks. + +### Possible Collaborative Inference Strategy + +There are several cloud-edge collaborative inference strategy, including: + +- Query Routing $^{[1, 2]}$ : route query to smaller-scale model on edge or larger-scale model on cloud based on its difficulty. +- Speculative Decoding $^{[3]}$ : smaller-scale models predicting future multiple words quickly during decoding followed by parallel validation via larger-scale models; if validation fails then re-generation by larger-scale occurs. +- Other possible ways. + +This example currently supports convenient benchmark testing for Query-Routing strategy. + +### Details of Design + +The overall design is shown in the figure below. + +![image-20240926143857223](./assets/image-20240926143857223.png) + +When Ianvs starts the benchmarking job, the Test Env Manager will first pass the data of the user-specified Dataset to the Test Case Controller for Joint Inference one by one. + +Joint Inference supports multiple modes, including `mining-then-inference`, `inference-then-mining`, and `self-design`. Among them, `mining-then-inference` is suitable for LLM scenarios, `inference-then-mining` is suitable for CV scenarios, and `self-design` allows you to implement more complex collaborative inference strategies on your own. + +In this example, we will rely on Ianvs' Joint Inference Paradigm using the `inference-then-mining` mode to implement a Query Routing strategy. First, we call your custom Hard Example Mining module to determine if it is a hard case. If it is, we call the inference interface of the Edge Model to complete the inference; if not, we call the inference interface of the Cloud Model to complete it. + +To save API calls during multi-round testing, this example has designed a result caching mechanism in both EdgeModel and Cloud Model. For questions that have already been tested, cached results will be read and returned. + +After all tests are completed, the Test Env Manager will calculate relevant metrics based on selected Metrics and hand over to Story Manager for printing test reports and generating Leader Board. + +## Required Resources + +Before using this example, you need to have the device ready: + +One machine is all you need, i.e., a laptop or a virtual machine is sufficient and a cluster is not necessary + +- 2 CPUs or more + +- 1 GPU with at least 6GB of memory, depends on the tested model + +- 4GB+ free memory, depends on algorithm and simulation setting + +- 10GB+ free disk space (depends on your model size) + +- Internet connection for GitHub, PyPI, HuggingFace, etc + +- Python 3.8+ environment + +## Step 1. Ianvs Preparation + +```bash +# Create a new conda environment with Python>=3.8 (venv users can do it in their own way). +conda create -n ianvs-experiment python=3.8 + +# Activate our environment +conda activate ianvs-experiment + +# Clone Ianvs Repo +git clone https://github.com/kubeedge/ianvs.git +cd ianvs + +# Install Sedna +pip install examples/resources/third_party/sedna-0.6.0.1-py3-none-any.whl + +# Install dependencies for this example. +pip install -r examples/cloud-edge-collaborative-inference-for-llm/requirements.txt + +# Install dependencies for Ianvs Core. +pip install -r requirements.txt + +# Install ianvs +python setup.py install +``` + +## Step 2. Dataset and Model Preparation + +### Dataset Configuration + +1. Download `mmlu-5-shot` from [Ianvs-MMLU-5-shot](https://huggingface.co/datasets/FuryMartin/Ianvs-MMLU-5-shot), which is a transformed MMLU-5-shot dataset formatted to fit Ianvs's requirements. + +2. Create a `dataset` folder in the root directory of Ianvs and move `mmlu-5-shot` into the `dataset` folder. + +3. Then, check the path of `train_data` and `test_dat` in +`examples/cloud-edge-collaborative-inference-for-llm/testenv/testenv.yaml`. + + - If you created the `dataset` folder inside `ianvs/` as mentioned earlier, then the relative path is correct and does not need to be modified. + + - If your `dataset` is created in a different location, please use an absolute path, and using `~` to represent the home directory is not supported. + +#### Dataset Details + +If you want to construct your own dataset, please see the details below and follow the instruction. + +``` +. +├── dataset +│ └── mmlu-5-shot +│ ├── test_data +│ │ ├── data.jsonl +│ │ └── metadata.json +│ └── train_data +│ └── data.json +``` + +Leave `train_data/data.jsonl` as empty. + +The file `data.jsonl` stores the main content of the dataset. Each line contains must contain keys `query`, `response`, `explanation`,`level_1_dim`, `level_2_dim`, `level_3_dim`, `level_4_dim` + +Here is an example: + +```json +{"query": "Question: Find the degree for the given field extension Q(sqrt(2), sqrt(3), sqrt(18)) over Q.\nA. 0\nB. 4\nC. 2\nD. 6", "response": "B", "explanation": "", "level_1_dim": "single-modal", "level_2_dim": "text", "level_3_dim": "knowledge Q&A", "level_4_dim": "abstract_algebra"} +{"query": "Question: Let p = (1, 2, 5, 4)(2, 3) in S_5 . Find the index of
in S_5.\nA. 8\nB. 2\nC. 24\nD. 120", "response": "C", "explanation": "", "level_1_dim": "single-modal", "level_2_dim": "text", "level_3_dim": "knowledge Q&A", "level_4_dim": "abstract_algebra"} +``` + +The `metadata.jsonl` stores information about the data, including `dataset`, `description`, `level_1_dim`, `level_2_dim`, `level_3_dim`, `level_4_dim`. + +Here is an example: + +```json +{ + "dataset": "MMLU", + "description": "Measuring Massive Multitask Language Understanding by Dan Hendrycks, Collin Burns, Steven Basart, Andy Zou, Mantas Mazeika, Dawn Song, and Jacob Steinhardt (ICLR 2021).", + "level_1_dim": "single-modal", + "level_2_dim": "text", + "level_3_dim": "Q&A", + "level_4_dim": "general" +} +``` + + + +### Metric Configuration + +*Note: If you just want to run this example quickly, you can skip this step.* + +We have designed multiple metrics for edge-cloud collaborative inference, including: + +| Metric | Description | Unit | +| :---------------------- | :------------------------------------------------------ | ------- | +| Accuracy | Accuracy on the test Dataset | - | +| Edge Ratio | proportion of queries router to edge | - | +| Time to First Token | Time taken to generate the first token | s | +| Internal Token Latency | Time taken to generate each token | s | +| Throughput | Token generation speed | token/s | +| Cloud Prompt Tokens | Number of prompt tokens consumed by Cloud Model | - | +| Cloud Completion Tokens | Number of completion tokens generated by Cloud Model | - | +| Edge Prompt Tokens | Number of prompt tokens consumed by the Edge Model | - | +| Edge Completion Tokens | Number of completion tokens generated by the Edge Model | - | + +Each metric is calculated by a module in `examples/cloud-edge-collaborative-inference-for-llm/testenv`. For more details, please check the folder. + +You can select multiple metrics in `examples/cloud-edge-collaborative-inference-for-llm/testenv/testenv.yaml`. + +### Model Configuration + +*Note: If you just want to run this example quickly, you can skip this step.* + +The models are configured in `examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/test_queryrouting.yaml`. + +In the configuration file, there are two models available for configuration: `EdgeModel` and `CloudModel`. + +#### EdgeModel Configuration + +The `EdgeModel` is the model that will be deployed on your local machine, supporting `huggingface` and `vllm` as serving backends. + +For `EdgeModel`, the open parameters are: + +| Parameter Name | Type | Description | Defalut | +| ---------------------- | ----- | ------------------------------------------------------------ | ------------------------ | +| model | str | model name | Qwen/Qwen2-1.5B-Instruct | +| backend | str | model serving framework | huggingface | +| temperature | float | What sampling temperature to use, between 0 and 2 | 0.8 | +| top_p | float | nucleus sampling parameter | 0.8 | +| max_tokens | int | The maximum number of tokens that can be generated in the chat completion | 512 | +| repetition_penalty | float | The parameter for repetition penalty | 1.05 | +| tensor_parallel_size | int | The size of tensor parallelism (Used for vLLM) | 1 | +| gpu_memory_utilization | float | The percentage of GPU memory utilization (Used for vLLM) | 0.9 | + +#### CloudModel Configuration + +The `CloudModel` represents the model on cloud, it will call LLM API via OpenAI API format. You need to set your OPENAI_BASE_URL and OPENAI_API_KEY in the environment variables yourself, for example. + +```bash +export OPENAI_BASE_URL="https://api.openai.com/v1" +export OPENAI_API_KEY=sk_xxxxxxxx +``` + +For `CloudModel`, the open parameters are: + +| Parameter Name | Type | Description | Defalut | +| ------------------ | ---- | ------------------------------------------------------------ | ----------- | +| model | str | model name | gpt-4o-mini | +| temperature | float | What sampling temperature to use, between 0 and 2 | 0.8 | +| top_p | float | nucleus sampling parameter | 0.8 | +| max_tokens | int | The maximum number of tokens that can be generated in the chat completion | 512 | +| repetition_penalty | float | The parameter for repetition penalty | 1.05 | + +#### Router Configuration + +Router is a component that routes the query to the edge or cloud model. The router is configured by `hard_example_mining` in `examples/cloud-edge-collaborative-inference-for-llm/testrouters/query-routing/test_queryrouting.yaml`. + +Currently, supported routers include: + +| Router Type | Description | Parameters | +| ------------ | ------------------------------------------------------------ | ---------------- | +| EdgeOnly | Route all queries to the edge model. | - | +| CloudOnly | Route all queries to the cloud model. | - | +| OracleRouter | Optimal Router | | +| BERTRouter | Use a BERT classifier to route the query to the edge or cloud model. | model, threshold | +| RandomRouter | Route the query to the edge or cloud model randomly. | threshold | + +You can modify the `router` parameter in `test_queryrouting.yaml` to select the router you want to use. + +For BERT router, you can use [routellm/bert](https://huggingface.co/routellm/bert) or [routellm/bert_mmlu_augmented](https://huggingface.co/routellm/bert_mmlu_augmented) or your own BERT model/ + +#### Data Processor Configuration +The Data Processor allows you to custom your own data format after the dataset loaded. + +Currently, supported routers include: + +| Data Processor | Description | Parameters | +| ------------ | ------------------------------------------------------------ | ---------------- | +| OracleRouterDatasetProcessor | Expose `gold` label to OracleRouter | - | + +## Step 3. Run Ianvs + +### Provided Response Cache +The testing process may take much time, depending on the number of test cases and the inference speed of the model. + +To enable you directly get the results, here we provide a workspace folder with cached results of `Qwen/Qwen2.5-1.5B-Instruct`, `Qwen/Qwen2.5-3B-Instruct`,`Qwen/Qwen2.5-7B-Instruct` and `gpt-4o-mini`. + +You can download `workspace-mmlu` folder from [Ianvs-MMLU-5-shot](https://huggingface.co/datasets/FuryMartin/Ianvs-MMLU-5-shot) and put it under your `ianvs` folder. + +### Run Joint Inference example + +Run the following command: + +`ianvs -f examples/cloud-edge-collaborative-inference-for-llm/benchmarkingjob.yaml` + +After the process finished, you will see output like this: + +```bash +[2024-10-28 18:03:37,314] edge_model.py(43) [INFO] - {'model': 'Qwen/Qwen2.5-1.5B-Instruct', 'backend': 'vllm', 'temperature': 0, 'top_p': 0.8, 'max_tokens': 512, 'repetition_penalty': 1.05, 'tensor_parallel_size': 4, 'gpu_memory_utilization': 0.9, 'use_cache': True} +[2024-10-28 18:03:37,314] cloud_model.py(34) [INFO] - {'model': 'gpt-4o-mini', 'temperature': 0, 'top_p': 0.8, 'max_tokens': 512, 'repetition_penalty': 1.05, 'use_cache': True} +[2024-10-28 18:03:37,850] joint_inference.py(73) [INFO] - Loading dataset +[2024-10-28 18:03:38,703] hard_sample_mining.py(30) [INFO] - USING EdgeOnlyFilter +[2024-10-28 18:03:38,704] joint_inference.py(162) [INFO] - Inference Start +100%|██████████████████████████████████| 14042/14042 [00:02<00:00, 6182.92it/s, Edge=14042, Cloud=0] +[2024-10-28 18:03:40,975] joint_inference.py(186) [INFO] - Inference Finished +[2024-10-28 18:03:40,976] joint_inference.py(131) [INFO] - Release models +``` + +### Results + +Change the Router type to `EdgeOnly`, `CloudOnly`, `OracleRouter` (or another router) will yield better results. + +The recommend testing order is `EdgeOnly`, `CloudOnly`, `OracleRouter`, `BERTRouter`, `RandomRouter`. + +By changing different models and Router parameters, you may see output like: + +```bash ++------+---------------+----------+------------+---------------------+------------+------------------------+---------------------+-------------------------+--------------------+------------------------+----------------+---------------------+----------------------------+-------------------+------------------+---------------------+-------------------------------------------------------------------------------------+ +| rank | algorithm | Accuracy | Edge Ratio | Time to First Token | Throughput | Internal Token Latency | Cloud Prompt Tokens | Cloud Completion Tokens | Edge Prompt Tokens | Edge Completion Tokens | paradigm | hard_example_mining | edgemodel-model | edgemodel-backend | cloudmodel-model | time | url | ++------+---------------+----------+------------+---------------------+------------+------------------------+---------------------+-------------------------+--------------------+------------------------+----------------+---------------------+----------------------------+-------------------+------------------+---------------------+-------------------------------------------------------------------------------------+ +| 1 | query-routing | 84.22 | 87.62 | 0.347 | 179.28 | 0.006 | 1560307 | 20339 | 10695142 | 30104 | jointinference | OracleRouter | Qwen/Qwen2.5-7B-Instruct | vllm | gpt-4o-mini | 2024-10-28 16:58:30 | ./workspace-mmlu/benchmarkingjob/query-routing/b8eb2606-950a-11ef-8cbc-c97e05df5d14 | +| 2 | query-routing | 82.75 | 77.55 | 0.316 | 216.72 | 0.005 | 2727792 | 18177 | 9470276 | 291364 | jointinference | OracleRouter | Qwen/Qwen2.5-3B-Instruct | vllm | gpt-4o-mini | 2024-10-28 16:58:19 | ./workspace-mmlu/benchmarkingjob/query-routing/b8eb2605-950a-11ef-8cbc-c97e05df5d14 | +| 3 | query-routing | 82.22 | 76.12 | 0.256 | 320.39 | 0.003 | 2978026 | 23254 | 9209538 | 29126 | jointinference | OracleRouter | Qwen/Qwen2.5-1.5B-Instruct | vllm | gpt-4o-mini | 2024-10-28 16:58:09 | ./workspace-mmlu/benchmarkingjob/query-routing/b8eb2604-950a-11ef-8cbc-c97e05df5d14 | +| 4 | query-routing | 75.99 | 0.0 | 0.691 | 698.83 | 0.001 | 11739216 | 79115 | 0 | 0 | jointinference | CloudOnly | Qwen/Qwen2.5-1.5B-Instruct | vllm | gpt-4o-mini | 2024-10-28 16:57:43 | ./workspace-mmlu/benchmarkingjob/query-routing/abe4062e-950a-11ef-8cbc-c97e05df5d14 | +| 5 | query-routing | 71.84 | 100.0 | 0.301 | 164.34 | 0.006 | 0 | 0 | 12335559 | 34817 | jointinference | EdgeOnly | Qwen/Qwen2.5-7B-Instruct | vllm | gpt-4o-mini | 2024-10-28 16:57:30 | ./workspace-mmlu/benchmarkingjob/query-routing/9b726328-950a-11ef-8cbc-c97e05df5d14 | +| 6 | query-routing | 60.3 | 100.0 | 0.206 | 176.71 | 0.006 | 0 | 0 | 12335559 | 397386 | jointinference | EdgeOnly | Qwen/Qwen2.5-3B-Instruct | vllm | gpt-4o-mini | 2024-10-28 16:57:23 | ./workspace-mmlu/benchmarkingjob/query-routing/9b726327-950a-11ef-8cbc-c97e05df5d14 | +| 7 | query-routing | 58.35 | 100.0 | 0.123 | 271.81 | 0.004 | 0 | 0 | 12335559 | 38982 | jointinference | EdgeOnly | Qwen/Qwen2.5-1.5B-Instruct | vllm | gpt-4o-mini | 2024-10-28 16:57:16 | ./workspace-mmlu/benchmarkingjob/query-routing/9b726326-950a-11ef-8cbc-c97e05df5d14 | ++------+---------------+----------+------------+---------------------+------------+------------------------+---------------------+-------------------------+--------------------+------------------------+----------------+---------------------+----------------------------+-------------------+------------------+---------------------+-------------------------------------------------------------------------------------+ +``` + +Ianvs will output a `rank.csv` and `selected_rank.csv` in `ianvs/workspace`, which will record the test results of each test. + +You can modify the relevant model parameters in `examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/test_queryrouting.yaml`, conduct multiple tests, and compare the results of different configurations. + +## Discussion + +### Query Routing's Application Scenario + +Query Routing is a very useful cloud-edge collaboration strategy based on two facts: + +- Calling top-tier large language models is expensive: For GPT-4o, the pricing is \$5.00 / 1M input tokens and \$15.00 / 1M output tokens. + +- Not all tasks require calling top-tier models: For tasks like translation, organization, summarization, data formatting,and casual conversation, small models with 3B parameters or less can achieve satisfactory results. + +These two facts suggest that if we can call different models based on the difficulty of the task, it will help save unnecessary API calls and thus reduce costs. Additionally, if edge device prformance is sufficient, locally deployed small models can also demonstrate excellent latency and throughput metrics, further enhancing user experience. + +Our Oracle Router is the ideal router that can route problems where the actual performance of edge small models outperforms that of cloud large models to the edge. Experiments have shown that when Qwen2.5-7B-Instruct collaborates with gpt-4o-mini, the accuracy on the MMLU (5-shot) dataset is +12.38% compared to pure edge and +8.23% absolute accuracy compared to pure cloud, with 87.62% of queries routed to edge. + +![](./assets/Oracle%20Router%20Demo.png) + +You can modify and run `performance-cost-plot.py` to get your Performance-Cost figure. + +Some related research $^{[1]}$ has trained pratical routers that can save up to 40% of GPT-4 API calls while maintaining essentially unchanged accuracy on the test set. + + +## Future + +This example builds an architecture for testing query routing strategies, but the provided dataset has some drawbacks such as being one-sided and singular, making it difficult to reflect effects in real-world scenarios. + +Besides, Speculative Decoding is another promising cloud-edge collaborative inference strategy, we should also implement it. + +Thus, the future tasks of this example include: + +- Build a more comprehensive dataset for better router evaluation +- Implement a Speculative Decoding example + + + +**Reference** + +[1] Ding, Dujian, et al. "Hybrid LLM: Cost-efficient and quality-aware query routing." *arXiv preprint arXiv:2404.14618* (2024). + +[2] Ong, Isaac, et al. "Routellm: Learning to route llms with preference data." *arXiv preprint arXiv:2406.18665* (2024). + +[3] Xia, Heming, et al. "Unlocking efficiency in large language model inference: A comprehensive survey of speculative decoding." *arXiv preprint arXiv:2401.07851* (2024). \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/assets/Oracle Router Demo.png b/examples/cloud-edge-collaborative-inference-for-llm/assets/Oracle Router Demo.png new file mode 100644 index 00000000..dac855a7 Binary files /dev/null and b/examples/cloud-edge-collaborative-inference-for-llm/assets/Oracle Router Demo.png differ diff --git a/examples/cloud-edge-collaborative-inference-for-llm/assets/image-20240926143857223.png b/examples/cloud-edge-collaborative-inference-for-llm/assets/image-20240926143857223.png new file mode 100644 index 00000000..7b28a63b Binary files /dev/null and b/examples/cloud-edge-collaborative-inference-for-llm/assets/image-20240926143857223.png differ diff --git a/examples/cloud-edge-collaborative-inference-for-llm/benchmarkingjob.yaml b/examples/cloud-edge-collaborative-inference-for-llm/benchmarkingjob.yaml new file mode 100755 index 00000000..d86f9beb --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/benchmarkingjob.yaml @@ -0,0 +1,70 @@ +benchmarkingjob: + # job name of bechmarking; string type; + name: "benchmarkingjob" + # the url address of job workspace that will reserve the output of tests; string type; + # "~/" cannot be identified, so must be relative path or absoulute path + workspace: "./workspace-mmlu" + + hard_example_mining_mode: "mining-then-inference" + + # the url address of test environment configuration file; string type; + # the file format supports yaml/yml; + testenv: "./examples/cloud-edge-collaborative-inference-for-llm/testenv/testenv.yaml" + + # the configuration of test object + test_object: + # test type; string type; + # currently the option of value is "algorithms",the others will be added in succession. + type: "algorithms" + # test algorithm configuration files; list type; + algorithms: + # algorithm name; string type; + - name: "query-routing" + # the url address of test algorithm configuration file; string type; + # the file format supports yaml/yml; + url: "./examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/test_queryrouting.yaml" + + # the configuration of ranking leaderboard + rank: + # rank leaderboard with metric of test case's evaluation and order ; list type; + # the sorting priority is based on the sequence of metrics in the list from front to back; + sort_by: [ { "Accuracy": "descend" } ] + + # visualization configuration + visualization: + # mode of visualization in the leaderboard; string type; + # There are quite a few possible dataitems in the leaderboard. Not all of them can be shown simultaneously on the screen. + # In the leaderboard, we provide the "selected_only" mode for the user to configure what is shown or is not shown. + mode: "selected_only" + # method of visualization for selected dataitems; string type; + # currently the options of value are as follows: + # 1> "print_table": print selected dataitems; + method: "print_table" + + # selected dataitem configuration + # The user can add his/her interested dataitems in terms of "paradigms", "modules", "hyperparameters" and "metrics", + # so that the selected columns will be shown. + selected_dataitem: + # currently the options of value are as follows: + # 1> "all": select all paradigms in the leaderboard; + # 2> paradigms in the leaderboard, e.g., "singletasklearning" + paradigms: [ "all" ] + # currently the options of value are as follows: + # 1> "all": select all modules in the leaderboard; + # 2> modules in the leaderboard, e.g., "basemodel" + modules: [ "hard_example_mining" ] + # currently the options of value are as follows: + # 1> "all": select all hyperparameters in the leaderboard; + # 2> hyperparameters in the leaderboard, e.g., "momentum" + hyperparameters: [ "edgemodel-model", "edgemodel-backend", "cloudmodel-model"] + # currently the options of value are as follows: + # 1> "all": select all metrics in the leaderboard; + # 2> metrics in the leaderboard, e.g., "f1_score" + # metrics: [ "acc" , "edge-rate", "cloud-prompt", "cloud-completion", "edge-prompt", "edge-completion", "input-throughput", "output-throughput", "latency"] + metrics: ["Accuracy", "Edge Ratio", "Time to First Token", "Throughput", "Internal Token Latency", "Cloud Prompt Tokens", "Cloud Completion Tokens", "Edge Prompt Tokens", "Edge Completion Tokens"] + + # model of save selected and all dataitems in workspace; string type; + # currently the options of value are as follows: + # 1> "selected_and_all": save selected and all dataitems; + # 2> "selected_only": save selected dataitems; + save_mode: "selected_and_all" diff --git a/examples/cloud-edge-collaborative-inference-for-llm/performance-cost-plot.py b/examples/cloud-edge-collaborative-inference-for-llm/performance-cost-plot.py new file mode 100644 index 00000000..4f97358c --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/performance-cost-plot.py @@ -0,0 +1,61 @@ +import numpy as np + +import matplotlib.pyplot as plt +from scipy.optimize import curve_fit + +colors = plt.cm.Paired.colors # Set1 调色板 +plt.rcParams["axes.prop_cycle"] = plt.cycler("color", colors) + +# a sigmoid function to fit non-oracle models' performance vs cost +def sigmoid_fit(x, L, k, x0): + return L / (1 + np.exp(-k * (x - x0))) + +def plot_accuracy_cost(models, costs, accuracy, non_oracle_costs, non_oracle_accuracy): + # Fit the sigmoid model + params_sigmoid, _ = curve_fit(sigmoid_fit, non_oracle_costs, non_oracle_accuracy, p0=[100, 1, 0.2]) + + # Generate points for the sigmoid fitted curve + curve_x_sigmoid = np.linspace(min(non_oracle_costs), max(non_oracle_costs), 100) + curve_y_sigmoid = sigmoid_fit(curve_x_sigmoid, *params_sigmoid) + + plt.figure(figsize=(10, 6)) + + # Plot all models + for i in range(len(models)): + if "Oracle" in models[i]: + marker = '^' # Triangle marker for Oracle models + else: + marker = 'o' # Circle marker for non-Oracle models + plt.scatter(costs[i], accuracy[i], label=models[i], marker=marker) + + # Plot the sigmoid fitted curve + plt.plot(curve_x_sigmoid, curve_y_sigmoid, 'gray', linestyle='dashed') # Gray dashed line for the curve + + plt.title('Model Performance vs Cost') + plt.xlabel('Cost($/M token)') + plt.ylabel('Accuracy (%)') + plt.legend(title='Model Name') + plt.grid(True) + plt.savefig('model_performance_sigmoid_fitted_curve.png', dpi=300) + plt.show() + +if __name__ == '__main__': + models = [ + "Oracle-Qwen2.5-7b-instruct + gpt-4o-mini", + "Oracle-Qwen2.5-1.5b-instruct + gpt-4o-mini", + "Oracle-Qwen2.5-3b-instruct + gpt-4o-mini", + "gpt-4o-mini", + "Qwen2.5-7B-Instruct", + "Qwen2.5-3B-Instruct", + "Qwen2.5-1.5B-Instruct" + ] + # The Oracle Routed Model's cost is an average weighted by the Edge Ratio between edge model costs and cloud model costs. + # The edge model’s cost is estimated based on its parameter size. + costs = [0.16, 0.18, 0.17, 0.60, 0.10, 0.08, 0.05] + accuracy = [84.22, 82.75, 82.22, 75.99, 71.84, 60.3, 58.35] + + # Non Oracle Models: gpt-4o-mini, Qwen2.5-7B-Instruct, Qwen2.5-3B-Instruct, Qwen2.5-1.5B-Instruct + non_oracle_costs = costs[-4:] # Costs in $/M token + non_oracle_accuracy = accuracy[-4:] # Accuracies in % + + plot_accuracy_cost(models, costs, accuracy, non_oracle_costs, non_oracle_accuracy) \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/requirements.txt b/examples/cloud-edge-collaborative-inference-for-llm/requirements.txt new file mode 100644 index 00000000..58fc617d --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/requirements.txt @@ -0,0 +1,5 @@ +vllm +transformers +openai +accelerate +datamodel_code_generator \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/cloud_model.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/cloud_model.py new file mode 100644 index 00000000..f466b367 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/cloud_model.py @@ -0,0 +1,71 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function + +import os + +from core.common.log import LOGGER +from sedna.common.class_factory import ClassType, ClassFactory +from models import APIBasedLLM + +os.environ['BACKEND_TYPE'] = 'TORCH' + +__all__ = ["BaseModel"] + +@ClassFactory.register(ClassType.GENERAL, alias="CloudModel") +class CloudModel: + """Models being deployed on the Cloud + """ + def __init__(self, **kwargs): + """Initialize the CloudModel. See `APIBasedLLM` for details about `kwargs`. + """ + LOGGER.info(kwargs) + self.model = APIBasedLLM(**kwargs) + self.load(kwargs.get("model", "gpt-4o-mini")) + + def load(self, model): + """Set the model. + + Parameters + ---------- + model : str + Existing model from your OpenAI provider. Example: `gpt-4o-mini` + """ + self.model._load(model = model) + + def inference(self, data, **kwargs): + """Inference the model with the given data. + + Parameters + ---------- + data : dict + The data to be used for inference. See format at BaseLLM's `inference()`. + kwargs : dict + To Align with Sedna's JointInference interface. + + Returns + ------- + dict + Formatted Response. See `model._format_response()` for more details. + """ + + return self.model.inference(data) + + def cleanup(self): + """Save the cache and cleanup the model. + """ + + self.model.save_cache() + self.model.cleanup() \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/data_processor.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/data_processor.py new file mode 100644 index 00000000..ee29ed40 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/data_processor.py @@ -0,0 +1,39 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from sedna.common.class_factory import ClassFactory, ClassType +from sedna.datasources import BaseDataSource + +@ClassFactory.register(ClassType.GENERAL, alias="OracleRouterDatasetProcessor") +class OracleRouterDatasetProcessor: + """ A Customized Dataset Processor for Oracle Router""" + def __init__(self, **kwargs): + pass + + def __call__(self, dataset): + """Transform the dataset to another format for Oracle Router + + Parameters + ---------- + dataset : sedna.datasources.BaseDataSource + The dataset loaded by Sedna + + Returns + ------- + sedna.datasources.BaseDataSource + Transformed dataset + """ + dataset.x = [{"query": x, "gold": y} for x,y in zip(dataset.x, dataset.y)] + return dataset \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/edge_model.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/edge_model.py new file mode 100644 index 00000000..42ca2542 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/edge_model.py @@ -0,0 +1,97 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function + +import os + +from core.common.log import LOGGER +from sedna.common.class_factory import ClassType, ClassFactory +from models import HuggingfaceLLM, APIBasedLLM, VllmLLM + +os.environ['BACKEND_TYPE'] = 'TORCH' + +__all__ = ["BaseModel"] + +@ClassFactory.register(ClassType.GENERAL, alias="EdgeModel") +class EdgeModel: + """Models being deployed on the Edge + """ + def __init__(self, **kwargs): + """Initialize the CloudModel. + + Parameters + ---------- + kwargs : dict + Parameters that are passed to the model. Details can be found in the `VllmLLM`, `HuggingfaceLLM`, `APIBasedLLM` class. + + Special keys: + - `backend`: str, default "huggingface". The serving framework to be used. + """ + + LOGGER.info(kwargs) + self.kwargs = kwargs + self.model_name = kwargs.get("model", None) + self.backend = kwargs.get("backend", "huggingface") + self._set_config() + + def _set_config(self): + """Set the model path in our environment variables due to Sedna’s [check](https://github.com/kubeedge/sedna/blob/ac623ab32dc37caa04b9e8480dbe1a8c41c4a6c2/lib/sedna/core/base.py#L132). + """ + # + os.environ["model_path"] = self.model_name + + def load(self, **kwargs): + """Set the model backend to be used. Will be called by Sedna's JointInference interface. + + Raises + ------ + Exception + When the backend is not supported. + """ + if self.backend == "huggingface": + self.model = HuggingfaceLLM(**self.kwargs) + elif self.backend == "vllm": + self.model = VllmLLM(**self.kwargs) + elif self.backend == "api": + self.model = APIBasedLLM(**self.kwargs) + else: + raise Exception(f"Backend {self.backend} is not supported. Please use 'huggingface', 'vllm', or `api` ") + + def predict(self, data, **kwargs): + """Inference the model with the given data. + + Parameters + ---------- + data : dict + The data to be used for inference. See format at BaseLLM's `inference()`. + kwargs : dict + To Align with Sedna's JointInference interface. + + Returns + ------- + dict + Formatted Response. See `model._format_response()` for more details. + """ + + answer = self.model.inference(data) + + return answer + + def cleanup(self): + """Save the cache and cleanup the model. + """ + + self.model.save_cache() + self.model.cleanup() diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/hard_sample_mining.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/hard_sample_mining.py new file mode 100644 index 00000000..469c9f3e --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/hard_sample_mining.py @@ -0,0 +1,253 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Hard Example Mining Algorithms""" + +import abc +import random +from transformers import pipeline +from sedna.common.class_factory import ClassFactory, ClassType +from core.common.log import LOGGER + +__all__ = ('BERTFilter', 'EdgeOnlyFilter', 'CloudOnlyFilter', + 'RandomRouterFilter', 'OracleRouterFilter') + +class BaseFilter(metaclass=abc.ABCMeta): + """The base class to define unified interface.""" + + def __init__(self, **kwargs): + LOGGER.info(f"USING {self.__class__.__name__}") + + def __call__(self, infer_result=None): + """ + predict function, judge the sample is hard or not. + + Parameters + ---------- + infer_result : array_like + prediction result + + Returns + ------- + is_hard_sample : bool + `True` means hard sample, `False` means not. + """ + raise NotImplementedError + + @classmethod + def data_check(cls, data): + """Check the data in [0,1].""" + return 0 <= float(data) <= 1 + + +@ClassFactory.register(ClassType.HEM, alias="BERTRouter") +class BERTFilter(BaseFilter, abc.ABC): + def __init__(self, **kwargs): + """Initialize the BERTFilter. + + Parameters + ---------- + kwargs: dict + Possible kwargs are: + - `model`: str, default "routellm/bert". The model to be used. + - `task`: str, default "text-classification". The task to be used. + - `max_length`: int, default 512. The maximum length of the input. + """ + super().__init__(**kwargs) + + self.kwargs = kwargs + LOGGER.info(kwargs) + + self.model = kwargs.get("model", "routellm/bert") + self.task = kwargs.get("task", "text-classification") + self.max_length = kwargs.get("max_length", 512) + + self.classifier = pipeline(self.task, model=self.model, device="cuda") + + def _text_classification_postprocess(self, result): + """Postprocess the text classification result + + Parameters + ---------- + result : list + The result from the classifier. Example: + ``` + [{"label": "LABEL_0", "score": 0.5}, + {"label": "LABEL_1", "score": 0.4}, + {"label": "LABEL_2", "score": 0.1}] + + Returns + ------- + bool + `True` means hard sample, `False` means not. + """ + + res = {item["label"]:item["score"] for item in result} + scaled_score = res["LABEL_0"] / (res["LABEL_0"] + res["LABEL_1"]) + + thresold = self.kwargs.get("threshold", 0.5) + label = "LABEL_0" if scaled_score >= thresold else "LABEL_1" + return False if label == "LABEL_0" else True + + def _predict(self, data): + """Predict the data label + + Parameters + ---------- + data : dict + See format at BaseLLM's `inference()`. + + Returns + ------- + bool + `True` means hard sample, `False` means not. + + Raises + ------ + NotImplementedError + If the task is not supported + """ + + if self.task == "text-classification": + result = self.classifier(data, top_k=None) + is_hard_sample = self._text_classification_postprocess(result) + else: + raise NotImplementedError + + return is_hard_sample + + def _preprocess(self, data): + """Preprocess the data + + Parameters + ---------- + data : dict + See format at BaseLLM's `inference()`. + + Returns + ------- + str + query string + """ + query = data.get("query") + if "query" in query: + return query["query"][:self.max_length] + else: + return query[:self.max_length] + + + def cleanup(self): + """Release the classifier model + """ + del self.classifier + + def __call__(self, data=None) -> bool: + data = self._preprocess(data) + return self._predict(data) + +@ClassFactory.register(ClassType.HEM, alias="EdgeOnly") +class EdgeOnlyFilter(BaseFilter, abc.ABC): + """Route all queries to edge. + """ + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def __call__(self, data=None) -> bool: + return False + +@ClassFactory.register(ClassType.HEM, alias="CloudOnly") +class CloudOnlyFilter(BaseFilter, abc.ABC): + """Route all queries to cloud. + """ + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def __call__(self, data=None) -> bool: + return True + +@ClassFactory.register(ClassType.HEM, alias="RandomRouter") +class RandomRouterFilter(BaseFilter, abc.ABC): + """Randomly route the queries to edge or cloud. + """ + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.threshold = kwargs.get("threshold", 0) + + def __call__(self, data=None) -> bool: + return False if random.random() < self.threshold else True + +@ClassFactory.register(ClassType.HEM, alias="OracleRouter") +class OracleRouterFilter(BaseFilter, abc.ABC): + """The Opitmal Router, which routes the queries to edge or cloud based on the models' prediction. + """ + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.edge_better = 0 + self.cloud_better = 0 + self.both_right = 0 + self.both_wrong = 0 + + self.edge_model = kwargs.get("edgemodel") + self.cloud_model = kwargs.get("cloudmodel") + + def __call__(self, data=None): + """Route the query to edge or cloud based on the models' prediction. + + Parameters + ---------- + data : dict + See format at BaseLLM's `inference()`. + + Returns + ------- + bool + `True` means hard sample, `False` means not. + """ + gold = data.get("gold", None) + + edge_result = self.edge_model.predict(data).get("prediction") + cloud_result = self.cloud_model.inference(data).get("prediction") + + both_right = edge_result == gold and cloud_result == gold + both_wrong = edge_result != gold and cloud_result != gold + edge_better = edge_result == gold and cloud_result != gold + cloud_better = edge_result != gold and cloud_result == gold + + if both_right: + self.both_right +=1 + elif both_wrong: + self.both_wrong += 1 + elif edge_better: + self.edge_better += 1 + elif cloud_better: + self.cloud_better += 1 + + if cloud_better: + # cloud is better than edge, hard sample + return True + else: + # both correct + both wrong + edge_better, easy sample + return False + + def cleanup(self): + """Leverage the `cleanup()` interface to print the statistics. + """ + message = [ + f"OracleRouter Statistics: \n", + f"Both Wrong: {self.both_wrong}, ", + f"Both Correct: {self.both_right}, ", + f"Edge Better: {self.edge_better}, ", + f"Cloud Better: {self.cloud_better}" + ] + LOGGER.info("".join(message)) diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/__init__.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/__init__.py new file mode 100644 index 00000000..548b62e7 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/__init__.py @@ -0,0 +1,4 @@ +from .api_llm import APIBasedLLM +from .huggingface_llm import HuggingfaceLLM +from .vllm_llm import VllmLLM +from .base_llm import BaseLLM \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/api_llm.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/api_llm.py new file mode 100644 index 00000000..96ce42a4 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/api_llm.py @@ -0,0 +1,107 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time + +from openai import OpenAI +from models.base_llm import BaseLLM + +class APIBasedLLM(BaseLLM): + def __init__(self, **kwargs) -> None: + """ Initialize the APIBasedLLM class + """ + BaseLLM.__init__(self, **kwargs) + + api_key=os.environ.get("OPENAI_API_KEY") + base_url=os.environ.get("OPENAI_BASE_URL") + + self.client = OpenAI( + api_key=api_key, + base_url=base_url + ) + + def _load(self, model): + """Set the model to be used. + + Parameters + ---------- + model : str + Existing model from your OpenAI provider. Example: `gpt-4o-mini` + """ + + self.model = model + + def _infer(self, messages): + """Call the OpenAI API to get the response + + Parameters + ---------- + messages : list + OpenAI style message chain. Example: + ``` + [{"role": "user", "content": "Hello, how are you?"}] + ``` + + Returns + ------- + dict + Formatted Response. See `_format_response()` for more details. + """ + + time_to_first_token = 0.0 + internal_token_latency = [] + st = time.perf_counter() + most_recent_timestamp = st + generated_text = "" + + stream = self.client.chat.completions.create( + messages = messages, + model=self.model, + temperature=self.temperature, + max_tokens=self.max_tokens, + top_p=self.top_p, + frequency_penalty=self.repetition_penalty, + stream=True, + stream_options={"include_usage":True} + ) + + for chunk in stream: + timestamp = time.perf_counter() + if time_to_first_token == 0.0: + time_to_first_token = time.perf_counter() - st + else: + internal_token_latency.append(timestamp - most_recent_timestamp) + most_recent_timestamp = timestamp + if chunk.choices: + generated_text += chunk.choices[0].delta.content or "" + if chunk.usage: + usage = chunk.usage + + text = generated_text + prompt_tokens = usage.prompt_tokens + completion_tokens = usage.completion_tokens + internal_token_latency = sum(internal_token_latency) / len(internal_token_latency) + throughput = 1 / internal_token_latency + + response = self._format_response( + text, + prompt_tokens, + completion_tokens, + time_to_first_token, + internal_token_latency, + throughput + ) + + return response diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/base_llm.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/base_llm.py new file mode 100644 index 00000000..f80bb0eb --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/base_llm.py @@ -0,0 +1,321 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import json + +def extract_prediction(input_string): + """Extract the prediction from the completion. This function is used when caching the responses. + """ + if not input_string or not any(char.isalpha() for char in input_string): + return None + # Find the last letter in the string + for char in reversed(input_string): + if 'A' <= char <= 'D': + return char + return None + + +class BaseLLM: + def __init__(self, **kwargs) -> None: + """ Initialize the BaseLLM class + + Parameters + ---------- + kwargs : dict + Parameters that are passed to the model. For details, see `_parse_kwargs()` + """ + self.config = kwargs + self._parse_kwargs(**kwargs) + self.is_cache_loaded = False + self.model_loaded = False + + def _load(self): + """Interface for Model Loading + + Raises + ------ + NotImplementedError + When the method is not implemented + """ + raise NotImplementedError + + + def _infer(self, messages): + """Interface for Model Inference + + Parameters + ---------- + messages : list + OpenAI style message chain. Example: + ``` + [{"role": "user", "content": "Hello, how are you?"}] + ``` + + Raises + ------ + NotImplementedError + When the method is not implemented + """ + raise NotImplementedError + + def _parse_kwargs(self, **kwargs): + """Parse the kwargs and set the attributes + + Parameters + ---------- + kwargs : dict + Parameters that are passed to the model. Possible keys are: + - `model`: str, default None. Model name + - `temperature`: float, default 0.8. Temperature for sampling + - `top_p`: float, default 0.8. Top p for sampling + - `repetition_penalty`: float, default 1.05. Repetition penalty + - `max_tokens`: int, default 512. Maximum tokens to generate + - `use_cache`: bool, default True. Whether to use reponse cache + """ + + self.model_name = kwargs.get("model", None) + self.temperature = kwargs.get("temperature", 0.8) + self.top_p = kwargs.get("top_p", 0.8) + self.repetition_penalty = kwargs.get("repetition_penalty", 1.05) + self.max_tokens = kwargs.get("max_tokens", 512) + self.use_cache = kwargs.get("use_cache", True) + + def inference(self, data): + """Inference the model + + Parameters + ---------- + data : dict + The input data. Example: + ``` + # With Gold Answer (For special uses like OracleRouter) + {"query": "What is the capital of China?", "gold": "A"} + # Without Gold Answer + {"query": "What is the capital of China?"} + ``` + + Returns + ------- + dict + Formatted Response. See `_format_response()` for more details. + + Raises + ------ + ValueError + If the data is not a dict + """ + + if isinstance(data, dict): + gold = data.get("gold", None) + query = data.get("query") + + messages = self.get_message_chain(query) + question = messages[-1]["content"] + + if self.use_cache: + response = self._try_cache(question) + if response is not None: + return response + + if not self.model_loaded: + self._load(self.model_name) + self.model_loaded = True + + response = self._infer(messages) + + prediction = extract_prediction(response.get("completion")) + + response["prediction"] = prediction + + if self.use_cache: + self._update_cache(question, response, prediction, gold) + + return response + + else: + raise ValueError(f"DataType {type(data)} is not supported, it must be `dict`") + + def get_message_chain(self, question, system = None): + """Get the OpenAI Chat style message chain + + Parameters + ---------- + question : str + User prompt. + system : str, optional + System Prompt, by default None + + Returns + ------- + list + OpenAI Chat style message chain. + """ + + if system: + messages = [ + {"role": "system", "content": system}, + {"role": "user", "content": question} + ] + else: + messages = [ + {"role": "user", "content": question} + ] + + return messages + + + def _format_response(self, text, prompt_tokens, completion_tokens, time_to_first_token, internal_token_latency, throughput): + """Format the response + + Parameters + ---------- + text : str + The completion text + prompt_tokens : int + The number of tokens in the prompt + completion_tokens : int + The number of tokens in the completion + time_to_first_token : float + The time consumed to generate the first token. Unit: s(seconds) + internal_token_latency : float + The average time consumed to generate a token. Unit: s(seconds) + throughput : float + The throughput of the completion. Unit: tokens/s + + Returns + ------- + dict + Example: + ``` + { + "completion": "A", + "usage": { + "prompt_tokens": 505, + "completion_tokens": 1, + "total_tokens": 506 + }, + "perf": { + "time_to_first_token": 0.6393, + "internal_token_latency": 0.0005, + "throughput": 1750.6698 + } + } + ``` + """ + + total_tokens = prompt_tokens + completion_tokens + + usage = { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": total_tokens + } + perf = { + "time_to_first_token": time_to_first_token, + "internal_token_latency": internal_token_latency, + "throughput": throughput + } + + resposne = { + "completion": text, + "usage":usage, + "perf":perf + } + + return resposne + + def _load_cache(self): + """Load cached Responses from `$RESULT_SAVED_URL/cache.json`. + """ + self.cache = None + self.cache_hash = {} + self.cache_models = [] + + cache_file = os.path.join(os.environ["RESULT_SAVED_URL"], "cache.json") + if os.path.exists(cache_file): + with open(cache_file, "r", encoding="utf-8") as f: + self.cache_models = json.load(f) + for cache in self.cache_models: + if cache["config"] == self.config: + self.cache = cache + self.cache_hash = {item["query"]:item['response'] for item in cache["result"]} + self.is_cache_loaded = True + + def _try_cache(self, question): + """Try to get the response from cache + + Parameters + ---------- + question : str + User prompt + + Returns + ------- + dict + If the question is found in cache, return the Formatted Response. Otherwise, return None. + """ + + if not self.is_cache_loaded: + self._load_cache() + + return self.cache_hash.get(question, None) + + def _update_cache(self, question, response, prediction, gold): + """Update the cache with the new item + + Parameters + ---------- + question : str + User prompt + response : dict + Formatted Response. See `_format_response()` for more details. + prediction : str + The prediction extracted from the response + gold : str + The gold answer for the question + """ + + if not self.is_cache_loaded: + self._load_cache() + + new_item = { + "query": question, + "response": response, + "prediction": prediction, + "gold": gold + } + + self.cache_hash[question] = response + + if self.cache is not None: + self.cache["result"].append(new_item) + else: + self.cache = {"config": self.config, "result": [new_item]} + self.cache_models.append(self.cache) + + def save_cache(self): + """Save the cache to `$RESULT_SAVED_URL/cache.json`. + """ + + cache_file = os.path.join(os.environ["RESULT_SAVED_URL"], "cache.json") + + if self.is_cache_loaded: + with open(cache_file, "w", encoding="utf-8") as f: + json.dump(self.cache_models, f, indent=4) + + def cleanup(self): + """Default Cleanup Method to release resources + """ + pass diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/huggingface_llm.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/huggingface_llm.py new file mode 100644 index 00000000..8bf87385 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/huggingface_llm.py @@ -0,0 +1,133 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +from threading import Thread + +from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer +from models.base_llm import BaseLLM + +device = "cuda" +os.environ["TOKENIZERS_PARALLELISM"] = "true" + +class HuggingfaceLLM(BaseLLM): + def __init__(self, **kwargs) -> None: + """ Initialize the HuggingfaceLLM class + + Parameters + ---------- + kwargs : dict + Parameters that are passed to the model. Details can be found in the BaseLLM class. + """ + BaseLLM.__init__(self, **kwargs) + + def _load(self, model): + """Load the model via Hugging Face API + + Parameters + ---------- + model : str + Hugging Face style model name. Example: `Qwen/Qwen2.5-0.5B-Instruct` + """ + self.model = AutoModelForCausalLM.from_pretrained( + model, + torch_dtype="auto", + device_map="auto", + trust_remote_code=True + ) + self.tokenizer = AutoTokenizer.from_pretrained( + model, + trust_remote_code=True + ) + + def _infer(self, messages): + """Call the transformers inference API to get the response + + Parameters + ---------- + messages : list + OpenAI style message chain. Example: + ``` + [{"role": "user", "content": "Hello, how are you?"}] + ``` + + Returns + ------- + dict + Formatted Response. See `_format_response()` for more details. + """ + + st = time.perf_counter() + most_recent_timestamp = st + + # messages = self.get_message_chain(question, system_prompt) + + streamer = TextIteratorStreamer(self.tokenizer) + + text = self.tokenizer.apply_chat_template( + messages, + tokenize=False, + add_generation_prompt=True + ) + + model_inputs = self.tokenizer([text], return_tensors="pt").to(device) + + streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True) + + generation_kwargs = dict( + model_inputs, + streamer=streamer, + max_new_tokens=self.max_tokens, + temperature=self.temperature, + top_p=self.top_p, + repetition_penalty=self.repetition_penalty, + ) + + thread = Thread( + target=self.model.generate, + kwargs=generation_kwargs + ) + + thread.start() + time_to_first_token = 0 + internal_token_latency = [] + generated_text = "" + completion_tokens = 0 + + for chunk in streamer: + timestamp = time.perf_counter() + if time_to_first_token == 0: + time_to_first_token = time.perf_counter() - st + else: + internal_token_latency.append(timestamp - most_recent_timestamp) + most_recent_timestamp = timestamp + generated_text += chunk + completion_tokens += 1 + + text = generated_text.replace("<|im_end|>", "") + prompt_tokens = len(model_inputs.input_ids[0]) + internal_token_latency = sum(internal_token_latency) / len(internal_token_latency) + throughput = 1 / internal_token_latency + + response = self._format_response( + text, + prompt_tokens, + completion_tokens, + time_to_first_token, + internal_token_latency, + throughput + ) + + return response diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/vllm_llm.py b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/vllm_llm.py new file mode 100644 index 00000000..5d572306 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/models/vllm_llm.py @@ -0,0 +1,140 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from vllm import LLM, SamplingParams +from vllm.distributed.parallel_state import destroy_model_parallel, destroy_distributed_environment +from models.base_llm import BaseLLM + +os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" +os.environ["TOKENIZERS_PARALLELISM"] = "true" +os.environ["VLLM_LOGGING_LEVEL"] = "ERROR" + +device = "cuda" + +class VllmLLM(BaseLLM): + def __init__(self, **kwargs) -> None: + """ Initialize the VllmLLM class + + Parameters + ---------- + kwargs : dict + Parameters that are passed to the model. Details can be found in the BaseLLM class. + + Special keys: + - `tensor_parallel_size`: int, default 1. Number of tensor parallelism. + - `gpu_memory_utilization`: float, default 0.8. GPU memory utilization. + + See details about special parameters in [vLLM's Named Arguments](https://docs.vllm.ai/en/latest/serving/openai_compatible_server.html). + """ + + BaseLLM.__init__(self, **kwargs) + + self.tensor_parallel_size = kwargs.get("tensor_parallel_size", 1) + self.gpu_memory_utilization = kwargs.get("gpu_memory_utilization", 0.8) + + def _load(self, model): + """Load the model via vLLM API + + Parameters + ---------- + model : str + Hugging Face style model name. Example: `Qwen/Qwen2.5-0.5B-Instruct` + """ + self.model = LLM( + model=model, + trust_remote_code=True, + dtype="float16", + tensor_parallel_size=self.tensor_parallel_size, + gpu_memory_utilization=self.gpu_memory_utilization, + max_model_len = 8192 + #quantization=self.quantization # TODO need to align with vllm API + ) + + self.sampling_params = SamplingParams( + temperature=self.temperature, + top_p=self.top_p, + repetition_penalty=self.repetition_penalty, + max_tokens=self.max_tokens + ) + + # Warmup to make metrics more accurate + self.warmup() + + def warmup(self): + """Warm up the Model for more accurate performance metrics + """ + + self.model.chat( + [{"role": "user", "content": "Hello"}], + self.sampling_params, + use_tqdm=False + ) + + def _infer(self, messages): + """Call the vLLM Offline Inference API to get the response + + Parameters + ---------- + messages : list + OpenAI style message chain. Example: + ``` + [{"role": "user", "content": "Hello, how are you?"}] + ``` + + Returns + ------- + dict + Formatted Response. See `_format_response()` for more details. + """ + + outputs = self.model.chat( + messages=messages, + sampling_params=self.sampling_params, + use_tqdm=False + ) + metrics = outputs[0].metrics + # Completion Text + text = outputs[0].outputs[0].text + # Prompt Token Count + prompt_tokens = len(outputs[0].prompt_token_ids) + # Completion Token Count + completion_tokens = len(outputs[0].outputs[0].token_ids) + # Time to First Token (s) + time_to_first_token = metrics.first_token_time - metrics.arrival_time + # Internal Token Latency (s) + internal_token_latency = (metrics.finished_time - metrics.first_token_time) / completion_tokens + # Completion Throughput (Token/s) + throughput = 1 / internal_token_latency + + response = self._format_response( + text, + prompt_tokens, + completion_tokens, + time_to_first_token, + internal_token_latency, + throughput + ) + + return response + + def cleanup(self): + """Release the model from GPU + """ + destroy_model_parallel() + destroy_distributed_environment() + + if hasattr(self, "model"): + del self.model.llm_engine.model_executor diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/test_queryrouting.yaml b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/test_queryrouting.yaml new file mode 100644 index 00000000..a3926146 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/test_queryrouting.yaml @@ -0,0 +1,97 @@ +algorithm: + # paradigm name; string type; + paradigm_type: "jointinference" + + # algorithm module configuration in the paradigm; list type; + modules: + # kind of algorithm module; string type; + - type: "dataset_processor" + # name of custom dataset processor; string type; + name: "OracleRouterDatasetProcessor" + # the url address of custom dataset processor; string type; + url: "./examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/data_processor.py" + + - type: "edgemodel" + # name of edge model module; string type; + name: "EdgeModel" + # the url address of edge model module; string type; + url: "./examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/edge_model.py" + + hyperparameters: + # name of the hyperparameter; string type; + - model: + values: + - "Qwen/Qwen2.5-1.5B-Instruct" + - "Qwen/Qwen2.5-3B-Instruct" + - "Qwen/Qwen2.5-7B-Instruct" + - backend: + # backend; string type; + # currently the options of value are as follows: + # 1> "huggingface": transformers backend; + # 2> "vllm": vLLM backend; + # 3> "api": OpenAI API backend; + values: + - "vllm" + - temperature: + # What sampling temperature to use, between 0 and 2; float type; + # For reproducable results, the temperature should be set to 0; + values: + - 0 + - top_p: + # nucleus sampling parameter; float type; + values: + - 0.8 + - max_tokens: + # The maximum number of tokens that can be generated in the chat completion; int type; + values: + - 512 + - repetition_penalty: + # The parameter for repetition penalty; float type; + values: + - 1.05 + - tensor_parallel_size: + # The size of tensor parallelism (Used for vLLM) + values: + - 4 + - gpu_memory_utilization: + # The percentage of GPU memory utilization (Used for vLLM) + values: + - 0.9 + - use_cache: + # Whether to use reponse cache; boolean type; + values: + - true + + - type: "cloudmodel" + # name of python module; string type; + name: "CloudModel" + # the url address of python module; string type; + url: "./examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/cloud_model.py" + + hyperparameters: + # name of the hyperparameter; string type; + - model: + values: + - "gpt-4o-mini" + - temperature: + values: + - 0 + - top_p: + values: + - 0.8 + - max_tokens: + values: + - 512 + - repetition_penalty: + values: + - 1.05 + - use_cache: + values: + - true + + - type: "hard_example_mining" + # name of Router module; string type; + # BERTRouter, EdgeOnly, CloudOnly, RandomRouter, OracleRouter + name: "EdgeOnly" + # the url address of python module; string type; + url: "./examples/cloud-edge-collaborative-inference-for-llm/testalgorithms/query-routing/hard_sample_mining.py" diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testenv/accuracy.py b/examples/cloud-edge-collaborative-inference-for-llm/testenv/accuracy.py new file mode 100644 index 00000000..a3dc088f --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testenv/accuracy.py @@ -0,0 +1,59 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sedna.common.class_factory import ClassType, ClassFactory +from result_parser import JointInferenceResult + +__all__ = ["acc"] + +def get_last_letter(input_string): + """Extract the prediction from the completion. This function is used when caching the responses. + """ + if not input_string or not any(char.isalpha() for char in input_string): + return None + # Find the last letter in the string + for char in reversed(input_string): + if 'A' <= char <= 'D': + return char + return None + +@ClassFactory.register(ClassType.GENERAL, alias="Accuracy") +def acc(y_true, y_pred): + """Calculate the accuracy. + + Parameters + ---------- + y_true : list + Ground truth + y_pred : list + List of predictions from the JointInference paradigm + + Returns + ------- + float + The accuracy (%) + """ + + infer_res = [JointInferenceResult.from_list(*pred) for pred in y_pred] + + y_pred = [get_last_letter(pred.result.completion) for pred in infer_res] + y_true = [get_last_letter(y) for y in y_true] + + # 使用列表推导来比较两个列表中的元素是否相同 + same_elements = [y_pred[i] == y_true[i] for i in range(len(y_pred))] + + # 计算相同元素的数量 + acc = sum(same_elements) / len(same_elements) + + return round(acc * 100, 2) diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testenv/cloud_completion_tokens.py b/examples/cloud-edge-collaborative-inference-for-llm/testenv/cloud_completion_tokens.py new file mode 100644 index 00000000..06c09b78 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testenv/cloud_completion_tokens.py @@ -0,0 +1,39 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sedna.common.class_factory import ClassType, ClassFactory +from result_parser import JointInferenceResult + +@ClassFactory.register(ClassType.GENERAL, alias="Cloud Completion Tokens") +def cloud_completion_tokens(_, y_pred): + """Calculate the number of completion tokens generated by the cloud model. + + Parameters + ---------- + _ : + Ignored + y_pred : list + List of predictions from the JointInference paradigm + + Returns + ------- + int + Number of completion tokens generated by the cloud model + """ + + infer_res = [JointInferenceResult.from_list(*pred) for pred in y_pred] + + cloud_completion_tokens = sum([pred.cloud_result.completion_tokens for pred in infer_res]) + + return cloud_completion_tokens \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testenv/cloud_prompt_tokens.py b/examples/cloud-edge-collaborative-inference-for-llm/testenv/cloud_prompt_tokens.py new file mode 100644 index 00000000..f3aed044 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testenv/cloud_prompt_tokens.py @@ -0,0 +1,39 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sedna.common.class_factory import ClassType, ClassFactory +from result_parser import JointInferenceResult + +@ClassFactory.register(ClassType.GENERAL, alias="Cloud Prompt Tokens") +def cloud_prompt_tokens(_, y_pred): + """Calculate the number of prompt tokens generated by the cloud model. + + Parameters + ---------- + _ : + Ignored + y_pred : list + List of predictions from the JointInference paradigm + + Returns + ------- + int + Number of prompt tokens generated by the cloud model + """ + + infer_res = [JointInferenceResult.from_list(*pred) for pred in y_pred] + + cloud_prompt_tokens = sum([pred.cloud_result.prompt_tokens for pred in infer_res]) + + return cloud_prompt_tokens \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testenv/edge_completion_tokens.py b/examples/cloud-edge-collaborative-inference-for-llm/testenv/edge_completion_tokens.py new file mode 100644 index 00000000..05af423f --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testenv/edge_completion_tokens.py @@ -0,0 +1,40 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sedna.common.class_factory import ClassType, ClassFactory +from result_parser import JointInferenceResult + +@ClassFactory.register(ClassType.GENERAL, alias="Edge Completion Tokens") +def edge_completion_tokens(_, y_pred): + """Calculate the number of completion tokens generated by the edge model. + + Parameters + ---------- + _ : + Ignored + y_pred : list + List of predictions from the JointInference paradigm + + Returns + ------- + int + Number of completion tokens generated by the edge model + """ + + + infer_res = [JointInferenceResult.from_list(*pred) for pred in y_pred] + + edge_completion_tokens = sum([pred.edge_result.completion_tokens for pred in infer_res]) + + return edge_completion_tokens \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testenv/edge_prompt_tokens.py b/examples/cloud-edge-collaborative-inference-for-llm/testenv/edge_prompt_tokens.py new file mode 100644 index 00000000..f2a30b3b --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testenv/edge_prompt_tokens.py @@ -0,0 +1,40 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sedna.common.class_factory import ClassType, ClassFactory +from result_parser import JointInferenceResult + +@ClassFactory.register(ClassType.GENERAL, alias="Edge Prompt Tokens") +def edge_prompt_tokens(_, y_pred): + """Calculate the number of prompt tokens generated by the edge model. + + Parameters + ---------- + _ : + Ignored + y_pred : list + List of predictions from the JointInference paradigm + + Returns + ------- + int + Number of prompt tokens generated by the edge model + """ + + + infer_res = [JointInferenceResult.from_list(*pred) for pred in y_pred] + + edge_prompt_tokens = sum([pred.edge_result.prompt_tokens for pred in infer_res]) + + return edge_prompt_tokens \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testenv/edge_ratio.py b/examples/cloud-edge-collaborative-inference-for-llm/testenv/edge_ratio.py new file mode 100644 index 00000000..6ae5a1f5 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testenv/edge_ratio.py @@ -0,0 +1,42 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sedna.common.class_factory import ClassType, ClassFactory +from result_parser import JointInferenceResult + +@ClassFactory.register(ClassType.GENERAL, alias="Edge Ratio") +def edge_ratio(_, y_pred): + """Calculate the ratio of of queries routed to EdgeModel. + + Parameters + ---------- + _ : + Ignored + y_pred : list + List of predictions from the JointInference paradigm + + Returns + ------- + int + The ratio of queries routed to EdgeModel (%) + """ + + infer_res = [JointInferenceResult.from_list(*pred) for pred in y_pred] + + y_pred = [pred.is_hard_example for pred in infer_res] + + edge_ratio = 1 - sum(y_pred) / len(y_pred) + + return round(edge_ratio * 100,2) + diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testenv/internal_token_latency.py b/examples/cloud-edge-collaborative-inference-for-llm/testenv/internal_token_latency.py new file mode 100644 index 00000000..9024a2b3 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testenv/internal_token_latency.py @@ -0,0 +1,41 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sedna.common.class_factory import ClassType, ClassFactory +from result_parser import JointInferenceResult + +@ClassFactory.register(ClassType.GENERAL, alias="Internal Token Latency") +def internal_token_latency(_, y_pred): + """Calculate the Internal Token Latency of the system. + + Parameters + ---------- + _ : + Ignored + y_pred : list + List of predictions from the JointInference paradigm + + Returns + ------- + float + Average Internal Token Latency (s) of the system + """ + + + + infer_res = [JointInferenceResult.from_list(*pred) for pred in y_pred] + + average_itl = sum([pred.result.internal_token_latency for pred in infer_res]) / len(infer_res) + + return round(average_itl,3) \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testenv/result_parser.py b/examples/cloud-edge-collaborative-inference-for-llm/testenv/result_parser.py new file mode 100644 index 00000000..bbaba90e --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testenv/result_parser.py @@ -0,0 +1,92 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from typing import TypedDict + +@dataclass +class Response: + """Formatted Response Parser""" + + completion: str + prompt_tokens : int + completion_tokens : int + total_tokens : int + time_to_first_token: float + internal_token_latency: float + throughput: float + + @classmethod + def from_dict(cls, response): + """Create a Response object from a dictionary + + Parameters + ---------- + response : dict + Formatted Response, See `BaseLLM._format_response()` for more details. + + Returns + ------- + Response + `Response` Object + """ + + if response: + return cls( + response["completion"], + response["usage"]["prompt_tokens"], + response["usage"]["completion_tokens"], + response["usage"]["total_tokens"], + response["perf"]["time_to_first_token"], + response["perf"]["internal_token_latency"], + response["perf"]["throughput"] + ) + else: + return cls("", 0, 0, 0, 0, 0, 0) + +@dataclass +class JointInferenceResult: + """Joint Inference Result Parser""" + is_hard_example : bool + result : Response + edge_result: Response + cloud_result: Response + + @classmethod + def from_list(cls, is_hard_example, result, edge_result, cloud_reslut): + """Create a JointInferenceResult object from a list + + Parameters + ---------- + is_hard_example : bool + Whter the example is hard or not + result : dict + Formatted Response. See `BaseLLM._format_response()` for more details. + edge_result : dict + Formatted Response from the Edge Model. See `BaseLLM._format_response()` for more details. + cloud_reslut : dict + Formatted Response from the Cloud Model. See `BaseLLM._format_response()` for more details. + + Returns + ------- + _type_ + _description_ + """ + + return cls( + is_hard_example, + Response.from_dict(result), + Response.from_dict(edge_result), + Response.from_dict(cloud_reslut), + ) diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testenv/testenv.yaml b/examples/cloud-edge-collaborative-inference-for-llm/testenv/testenv.yaml new file mode 100644 index 00000000..18510b64 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testenv/testenv.yaml @@ -0,0 +1,39 @@ +testenv: + # dataset configuration + dataset: + # the url address of train dataset index; string type; + train_data: "./dataset/mmlu-5-shot/train_data/data.json" + # the url address of test dataset index; string type; + test_data_info: "./dataset/mmlu-5-shot/test_data/metadata.json" + + # metrics configuration for test case's evaluation; list type; + metrics: + # metric name; string type; + - name: "Accuracy" + # the url address of python file + url: "./examples/cloud-edge-collaborative-inference-for-llm/testenv/accuracy.py" + + - name: "Edge Ratio" + url: "./examples/cloud-edge-collaborative-inference-for-llm/testenv/edge_ratio.py" + + - name: "Cloud Prompt Tokens" + url: "./examples/cloud-edge-collaborative-inference-for-llm/testenv/cloud_prompt_tokens.py" + + - name: "Cloud Completion Tokens" + url: "./examples/cloud-edge-collaborative-inference-for-llm/testenv/cloud_completion_tokens.py" + + - name: "Edge Prompt Tokens" + url: "./examples/cloud-edge-collaborative-inference-for-llm/testenv/edge_prompt_tokens.py" + + - name: "Edge Completion Tokens" + url: "./examples/cloud-edge-collaborative-inference-for-llm/testenv/edge_completion_tokens.py" + + - name: "Time to First Token" + url: "./examples/cloud-edge-collaborative-inference-for-llm/testenv/time_to_first_token.py" + + - name: "Throughput" + url: "./examples/cloud-edge-collaborative-inference-for-llm/testenv/throughput.py" + + - name: "Internal Token Latency" + url: "./examples/cloud-edge-collaborative-inference-for-llm/testenv/internal_token_latency.py" + diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testenv/throughput.py b/examples/cloud-edge-collaborative-inference-for-llm/testenv/throughput.py new file mode 100644 index 00000000..6a2ccc61 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testenv/throughput.py @@ -0,0 +1,41 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sedna.common.class_factory import ClassType, ClassFactory +from result_parser import JointInferenceResult + +@ClassFactory.register(ClassType.GENERAL, alias="Throughput") +def throughput(_, y_pred): + """Calculate the Throughput of the system. + + Parameters + ---------- + _ : + Ignored + y_pred : list + List of predictions from the JointInference paradigm + + Returns + ------- + float + Average Throughput (token/s) of the system + """ + + infer_res = [JointInferenceResult.from_list(*pred) for pred in y_pred] + + average_itl = sum([pred.result.internal_token_latency for pred in infer_res]) / len(infer_res) + + average_throughput = 1 / average_itl + + return round(average_throughput,2) \ No newline at end of file diff --git a/examples/cloud-edge-collaborative-inference-for-llm/testenv/time_to_first_token.py b/examples/cloud-edge-collaborative-inference-for-llm/testenv/time_to_first_token.py new file mode 100644 index 00000000..0f0964e7 --- /dev/null +++ b/examples/cloud-edge-collaborative-inference-for-llm/testenv/time_to_first_token.py @@ -0,0 +1,39 @@ +# Copyright 2024 The KubeEdge Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sedna.common.class_factory import ClassType, ClassFactory +from result_parser import JointInferenceResult + +@ClassFactory.register(ClassType.GENERAL, alias="Time to First Token") +def time_to_first_token(_, y_pred): + """Calculate the Time to First Token of the system. + + Parameters + ---------- + _ : + Ignored + y_pred : list + List of predictions from the JointInference paradigm + + Returns + ------- + float + Average Time to First Token (s) of the system + """ + + infer_res = [JointInferenceResult.from_list(*pred) for pred in y_pred] + + average_ttft = sum([pred.result.time_to_first_token for pred in infer_res]) / len(infer_res) + + return round(average_ttft, 3) \ No newline at end of file diff --git a/examples/resources/third_party/sedna-0.4.1-py3-none-any.whl b/examples/resources/third_party-bk/sedna-0.4.1-py3-none-any.whl similarity index 100% rename from examples/resources/third_party/sedna-0.4.1-py3-none-any.whl rename to examples/resources/third_party-bk/sedna-0.4.1-py3-none-any.whl diff --git a/examples/resources/third_party/sedna-0.6.0.1-py3-none-any.whl b/examples/resources/third_party/sedna-0.6.0.1-py3-none-any.whl new file mode 100644 index 00000000..3a50150b Binary files /dev/null and b/examples/resources/third_party/sedna-0.6.0.1-py3-none-any.whl differ diff --git a/requirements.txt b/requirements.txt index 9f7cec56..8ecae08a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,5 @@ -prettytable~=2.5.0 # BSD \ No newline at end of file +prettytable~=2.5.0 # BSD +scikit-learn +numpy +pandas +tqdm \ No newline at end of file