Skip to content

Commit

Permalink
feat: engines endpoint and cortex.python
Browse files Browse the repository at this point in the history
  • Loading branch information
vansangpfiev committed May 21, 2024
1 parent 48dcae3 commit c08043f
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 70 deletions.
Empty file.
Empty file.
1 change: 1 addition & 0 deletions cortex-cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ cmake_minimum_required(VERSION 3.5)
project(cortex-cpp C CXX)

include(engines/cortex.llamacpp/engine.cmake)
include(engines/cortex.python/engine.cmake)
include(CheckIncludeFileCXX)

check_include_file_cxx(any HAS_ANY)
Expand Down
11 changes: 9 additions & 2 deletions cortex-cpp/common/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@ class BaseModel {
virtual ~BaseModel() {}

// Model management
virtual void LoadModel(const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) = 0;
virtual void LoadModel(
const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) = 0;
virtual void UnloadModel(
const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) = 0;
virtual void ModelStatus(
const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) = 0;
virtual void GetEngines(
const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) = 0;
virtual void FineTuning(
const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) = 0;
};

class BaseChatCompletion {
Expand Down
187 changes: 131 additions & 56 deletions cortex-cpp/controllers/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,33 @@
#include <iostream>

#include "trantor/utils/Logger.h"
#include "utils/logging_utils.h"
#include "utils/cortex_utils.h"
#include "utils/logging_utils.h"

using namespace inferences;
using json = nlohmann::json;
namespace inferences {
namespace {
constexpr static auto kLlamaEngine = "cortex.llamacpp";
constexpr static auto kLlamaLibPath = "./engines/cortex.llamacpp";
constexpr static auto kPythonRuntimeEngine = "cortex.python";
} // namespace

server::server()
: engine_{nullptr} {
server::server(){

// Some default values for now below
// log_disable(); // Disable the log to file feature, reduce bloat for
// target
// system ()
};
// Some default values for now below
// log_disable(); // Disable the log to file feature, reduce bloat for
// target
// system ()
};

server::~server() {}

void server::ChatCompletion(
const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) {
if (!IsEngineLoaded()) {
auto engine_type =
(*(req->getJsonObject())).get("engine", kLlamaEngine).asString();
if (!IsEngineLoaded(engine_type)) {
Json::Value res;
res["message"] = "Engine is not loaded yet";
auto resp = cortex_utils::nitroHttpJsonResponse(res);
Expand All @@ -44,10 +45,10 @@ void server::ChatCompletion(
auto json_body = req->getJsonObject();
bool is_stream = (*json_body).get("stream", false).asBool();
auto q = std::make_shared<SyncQueue>();
engine_->HandleChatCompletion(json_body,
[q](Json::Value status, Json::Value res) {
q->push(std::make_pair(status, res));
});
std::get<EngineI*>(engines_[engine_type].engine)->HandleChatCompletion(
json_body, [q](Json::Value status, Json::Value res) {
q->push(std::make_pair(status, res));
});
LOG_TRACE << "Wait to chat completion responses";
if (is_stream) {
ProcessStreamRes(std::move(callback), q);
Expand All @@ -57,10 +58,11 @@ void server::ChatCompletion(
LOG_TRACE << "Done chat completion";
}

void server::Embedding(
const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) {
if (!IsEngineLoaded()) {
void server::Embedding(const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) {
auto engine_type =
(*(req->getJsonObject())).get("engine", kLlamaEngine).asString();
if (!IsEngineLoaded(engine_type)) {
Json::Value res;
res["message"] = "Engine is not loaded yet";
auto resp = cortex_utils::nitroHttpJsonResponse(res);
Expand All @@ -72,10 +74,10 @@ void server::Embedding(

LOG_TRACE << "Start embedding";
SyncQueue q;
engine_->HandleEmbedding(req->getJsonObject(),
[&q](Json::Value status, Json::Value res) {
q.push(std::make_pair(status, res));
});
std::get<EngineI*>(engines_[engine_type].engine)->HandleEmbedding(
req->getJsonObject(), [&q](Json::Value status, Json::Value res) {
q.push(std::make_pair(status, res));
});
LOG_TRACE << "Wait to embedding";
ProcessNonStreamRes(std::move(callback), q);
LOG_TRACE << "Done embedding";
Expand All @@ -84,7 +86,9 @@ void server::Embedding(
void server::UnloadModel(
const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) {
if (!IsEngineLoaded()) {
auto engine_type =
(*(req->getJsonObject())).get("engine", kLlamaEngine).asString();
if (!IsEngineLoaded(engine_type)) {
Json::Value res;
res["message"] = "Engine is not loaded yet";
auto resp = cortex_utils::nitroHttpJsonResponse(res);
Expand All @@ -94,7 +98,7 @@ void server::UnloadModel(
return;
}
LOG_TRACE << "Start unload model";
engine_->UnloadModel(
std::get<EngineI*>(engines_[engine_type].engine)->UnloadModel(
req->getJsonObject(),
[cb = std::move(callback)](Json::Value status, Json::Value res) {
auto resp = cortex_utils::nitroHttpJsonResponse(res);
Expand All @@ -108,7 +112,9 @@ void server::UnloadModel(
void server::ModelStatus(
const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) {
if (!IsEngineLoaded()) {
auto engine_type =
(*(req->getJsonObject())).get("engine", kLlamaEngine).asString();
if (!IsEngineLoaded(engine_type)) {
Json::Value res;
res["message"] = "Engine is not loaded yet";
auto resp = cortex_utils::nitroHttpJsonResponse(res);
Expand All @@ -119,7 +125,7 @@ void server::ModelStatus(
}

LOG_TRACE << "Start to get model status";
engine_->GetModelStatus(
std::get<EngineI*>(engines_[engine_type].engine)->GetModelStatus(
req->getJsonObject(),
[cb = std::move(callback)](Json::Value status, Json::Value res) {
auto resp = cortex_utils::nitroHttpJsonResponse(res);
Expand All @@ -130,57 +136,126 @@ void server::ModelStatus(
LOG_TRACE << "Done get model status";
}

void server::LoadModel(
void server::GetEngines(
const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) {
Json::Value res;
Json::Value engine_array(Json::arrayValue);
for (const auto& [s, _] : engines_) {
Json::Value val;
val["id"] = s;
val["object"] = "engine";
engine_array.append(val);
}

res["object"] = "list";
res["data"] = engine_array;

auto resp = cortex_utils::nitroHttpJsonResponse(res);
callback(resp);
}

void server::FineTuning(
const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) {
auto engine_type =
(*(req->getJsonObject())).get("engine", kPythonRuntimeEngine).asString();

if (engines_.find(engine_type) == engines_.end()) {
try {
engines_[engine_type].dl = std::make_unique<dylib>(
cortex_utils::kPythonRuntimeLibPath, "engine");
} catch (const dylib::load_error& e) {
LOG_ERROR << "Could not load engine: " << e.what();
engines_.erase(engine_type);

Json::Value res;
res["message"] = "Could not load engine " + engine_type;
auto resp = cortex_utils::nitroHttpJsonResponse(res);
resp->setStatusCode(k500InternalServerError);
callback(resp);
return;
}

auto func =
engines_[engine_type].dl->get_function<CortexPythonEngineI*()>("get_engine");
engines_[engine_type].engine = func();
LOG_INFO << "Loaded engine: " << engine_type;
}

LOG_TRACE << "Start to fine-tuning";
auto& en = std::get<CortexPythonEngineI*>(engines_[engine_type].engine);
if (en->IsSupported("HandlePythonFileExecutionRequest")) {
en->HandlePythonFileExecutionRequest(
req->getJsonObject(),
[cb = std::move(callback)](Json::Value status, Json::Value res) {
auto resp = cortex_utils::nitroHttpJsonResponse(res);
resp->setStatusCode(static_cast<drogon::HttpStatusCode>(
status["status_code"].asInt()));
cb(resp);
});
} else {
Json::Value res;
res["message"] = "Method is not supported yet";
auto resp = cortex_utils::nitroHttpJsonResponse(res);
resp->setStatusCode(k500InternalServerError);
callback(resp);
LOG_WARN << "Method is not supported yet";
}
LOG_TRACE << "Done fine-tuning";
}

void server::LoadModel(const HttpRequestPtr& req,
std::function<void(const HttpResponsePtr&)>&& callback) {
auto engine_type =
(*(req->getJsonObject())).get("engine", kLlamaEngine).asString();
if (!dylib_ || engine_type != cur_engine_name_) {
cur_engine_name_ = engine_type;
// TODO: change this when we get more engines

// We have not loaded engine yet, should load it before using it
if (engines_.find(engine_type) == engines_.end()) {
// TODO(sang) we cannot run cortex.llamacpp and cortex.tensorrt-llm at the same time.
// So need an unload engine machanism to handle.
auto get_engine_path = [](std::string_view e) {
if (e == kLlamaEngine) {
return kLlamaLibPath;
return cortex_utils::kLlamaLibPath;
}
return kLlamaLibPath;
return cortex_utils::kLlamaLibPath;
};

try {
dylib_ =
std::make_unique<dylib>(get_engine_path(cur_engine_name_), "engine");
engines_[engine_type].dl =
std::make_unique<dylib>(get_engine_path(engine_type), "engine");
} catch (const dylib::load_error& e) {
LOG_ERROR << "Could not load engine: " << e.what();
dylib_.reset();
engine_ = nullptr;
}
engines_.erase(engine_type);

if (!dylib_) {
Json::Value res;
res["message"] = "Could not load engine " + cur_engine_name_;
res["message"] = "Could not load engine " + engine_type;
auto resp = cortex_utils::nitroHttpJsonResponse(res);
resp->setStatusCode(k500InternalServerError);
callback(resp);
return;
}
auto func = dylib_->get_function<EngineI*()>("get_engine");
engine_ = func();
LOG_INFO << "Loaded engine: " << cur_engine_name_;

auto func =
engines_[engine_type].dl->get_function<EngineI*()>("get_engine");
engines_[engine_type].engine = func();
LOG_INFO << "Loaded engine: " << engine_type;
}

LOG_TRACE << "Load model";
engine_->LoadModel(
req->getJsonObject(),
[cb = std::move(callback)](Json::Value status, Json::Value res) {
auto resp = cortex_utils::nitroHttpJsonResponse(res);
resp->setStatusCode(
static_cast<drogon::HttpStatusCode>(status["status_code"].asInt()));
cb(resp);
});
auto& en = std::get<EngineI*>(engines_[engine_type].engine);
en->LoadModel(req->getJsonObject(), [cb = std::move(callback)](
Json::Value status, Json::Value res) {
auto resp = cortex_utils::nitroHttpJsonResponse(res);
resp->setStatusCode(
static_cast<drogon::HttpStatusCode>(status["status_code"].asInt()));
cb(resp);
});
LOG_TRACE << "Done load model";
}

void server::ProcessStreamRes(std::function<void(const HttpResponsePtr&)> cb,
std::shared_ptr<SyncQueue> q) {
std::shared_ptr<SyncQueue> q) {
auto err_or_done = std::make_shared<std::atomic_bool>(false);
auto chunked_content_provider =
[q, err_or_done](char* buf, std::size_t buf_size) -> std::size_t {
Expand Down Expand Up @@ -209,21 +284,21 @@ void server::ProcessStreamRes(std::function<void(const HttpResponsePtr&)> cb,
};

auto resp = cortex_utils::nitroStreamResponse(chunked_content_provider,
"chat_completions.txt");
"chat_completions.txt");
cb(resp);
}

void server::ProcessNonStreamRes(
std::function<void(const HttpResponsePtr&)> cb, SyncQueue& q) {
void server::ProcessNonStreamRes(std::function<void(const HttpResponsePtr&)> cb,
SyncQueue& q) {
auto [status, res] = q.wait_and_pop();
auto resp = cortex_utils::nitroHttpJsonResponse(res);
resp->setStatusCode(
static_cast<drogon::HttpStatusCode>(status["status_code"].asInt()));
cb(resp);
}

bool server::IsEngineLoaded() {
return !!engine_;
bool server::IsEngineLoaded(const std::string& e) {
return engines_.find(e) != engines_.end();
}

} // namespace inferences
Loading

0 comments on commit c08043f

Please sign in to comment.