diff --git a/.github/workflows/cortex-cpp-quality-gate.yml b/.github/workflows/cortex-cpp-quality-gate.yml index 316160ce5..8a76e4669 100644 --- a/.github/workflows/cortex-cpp-quality-gate.yml +++ b/.github/workflows/cortex-cpp-quality-gate.yml @@ -34,7 +34,7 @@ jobs: ccache-dir: "" - os: "mac" name: "arm64" - runs-on: "macos-silicon" + runs-on: "macos-selfhosted-12-arm64" cmake-flags: "-DCORTEX_CPP_VERSION=${{github.event.pull_request.head.sha}} -DCMAKE_BUILD_TEST=ON -DMAC_ARM64=ON -DCMAKE_TOOLCHAIN_FILE=vcpkg/scripts/buildsystems/vcpkg.cmake" build-deps-cmake-flags: "" ccache-dir: "" diff --git a/.github/workflows/template-build-macos.yml b/.github/workflows/template-build-macos.yml index 371468dfb..ae10fb675 100644 --- a/.github/workflows/template-build-macos.yml +++ b/.github/workflows/template-build-macos.yml @@ -82,7 +82,7 @@ jobs: matrix: include: - arch: 'arm64' - runs-on: 'macos-silicon' + runs-on: 'macos-selfhosted-12-arm64' extra-cmake-flags: "-DMAC_ARM64=ON" - arch: 'amd64' diff --git a/engine/common/file.h b/engine/common/file.h new file mode 100644 index 000000000..3096023c5 --- /dev/null +++ b/engine/common/file.h @@ -0,0 +1,71 @@ +#pragma once + +#include +#include "common/json_serializable.h" + +namespace OpenAi { +/** + * The File object represents a document that has been uploaded to OpenAI. + */ +struct File : public JsonSerializable { + /** + * The file identifier, which can be referenced in the API endpoints. + */ + std::string id; + + /** + * The object type, which is always file. + */ + std::string object = "file"; + + /** + * The size of the file, in bytes. + */ + uint64_t bytes; + + /** + * The Unix timestamp (in seconds) for when the file was created. + */ + uint32_t created_at; + + /** + * The name of the file. + */ + std::string filename; + + /** + * The intended purpose of the file. Supported values are assistants, + * assistants_output, batch, batch_output, fine-tune, fine-tune-results + * and vision. + */ + std::string purpose; + + ~File() = default; + + static cpp::result FromJson(const Json::Value& json) { + File file; + + file.id = std::move(json["id"].asString()); + file.object = "file"; + file.bytes = json["bytes"].asUInt64(); + file.created_at = json["created_at"].asUInt(); + file.filename = std::move(json["filename"].asString()); + file.purpose = std::move(json["purpose"].asString()); + + return file; + } + + cpp::result ToJson() { + Json::Value root; + + root["id"] = id; + root["object"] = object; + root["bytes"] = bytes; + root["created_at"] = created_at; + root["filename"] = filename; + root["purpose"] = purpose; + + return root; + } +}; +} // namespace OpenAi diff --git a/engine/common/message.h b/engine/common/message.h index 909a843ee..3bff6f048 100644 --- a/engine/common/message.h +++ b/engine/common/message.h @@ -19,6 +19,20 @@ namespace OpenAi { +inline std::string ExtractFileId(const std::string& path) { + // Handle both forward and backward slashes + auto last_slash = path.find_last_of("/\\"); + if (last_slash == std::string::npos) + return ""; + + auto filename = path.substr(last_slash + 1); + auto dot_pos = filename.find('.'); + if (dot_pos == std::string::npos) + return ""; + + return filename.substr(0, dot_pos); +} + // Represents a message within a thread. struct Message : JsonSerializable { Message() = default; @@ -70,6 +84,12 @@ struct Message : JsonSerializable { // Set of 16 key-value pairs that can be attached to an object. This can be useful for storing additional information about the object in a structured format. Keys can be a maximum of 64 characters long and values can be a maximum of 512 characters long. Cortex::VariantMap metadata; + // deprecated. remove in the future + std::optional attach_filename; + std::optional size; + std::optional rel_path; + // end deprecated + static cpp::result FromJsonString( std::string&& json_str) { Json::Value root; @@ -98,7 +118,6 @@ struct Message : JsonSerializable { message.completed_at = root["completed_at"].asUInt(); message.incomplete_at = root["incomplete_at"].asUInt(); message.role = RoleFromString(std::move(root["role"].asString())); - message.content = ParseContents(std::move(root["content"])).value(); message.assistant_id = std::move(root["assistant_id"].asString()); message.run_id = std::move(root["run_id"].asString()); @@ -114,6 +133,54 @@ struct Message : JsonSerializable { } } + if (root.isMember("content")) { + if (root["content"].isArray() && !root["content"].empty()) { + if (root["content"][0]["type"].asString() == "text") { + message.content = ParseContents(std::move(root["content"])).value(); + } else { + // deprecated, for supporting jan and should be removed in the future + // check if annotations is empty + if (!root["content"][0]["text"]["annotations"].empty()) { + // parse attachment + Json::Value attachments_json_array{Json::arrayValue}; + Json::Value attachment; + attachment["file_id"] = ExtractFileId( + root["content"][0]["text"]["annotations"][0].asString()); + + Json::Value tools_json_array{Json::arrayValue}; + Json::Value tool; + tool["type"] = "file_search"; + tools_json_array.append(tool); + + attachment["tools"] = tools_json_array; + attachment["file_id"] = attachments_json_array.append(attachment); + + message.attachments = + ParseAttachments(std::move(attachments_json_array)).value(); + + message.attach_filename = + root["content"][0]["text"]["name"].asString(); + message.size = root["content"][0]["text"]["size"].asUInt64(); + message.rel_path = + root["content"][0]["text"]["annotations"][0].asString(); + } + + // parse content + Json::Value contents_json_array{Json::arrayValue}; + Json::Value content; + Json::Value content_text; + Json::Value empty_annotations{Json::arrayValue}; + content["type"] = "text"; + content_text["value"] = root["content"][0]["text"]["value"]; + content_text["annotations"] = empty_annotations; + content["text"] = content_text; + contents_json_array.append(content); + message.content = + ParseContents(std::move(contents_json_array)).value(); + } + } + } + return message; } catch (const std::exception& e) { return cpp::fail(std::string("FromJsonString failed: ") + e.what()); diff --git a/engine/common/repository/file_repository.h b/engine/common/repository/file_repository.h new file mode 100644 index 000000000..f574b76d0 --- /dev/null +++ b/engine/common/repository/file_repository.h @@ -0,0 +1,29 @@ +#pragma once + +#include "common/file.h" +#include "utils/result.hpp" + +class FileRepository { + public: + virtual cpp::result StoreFile(OpenAi::File& file_metadata, + const char* content, + uint64_t length) = 0; + + virtual cpp::result, std::string> ListFiles( + const std::string& purpose, uint8_t limit, const std::string& order, + const std::string& after) const = 0; + + virtual cpp::result RetrieveFile( + const std::string file_id) const = 0; + + virtual cpp::result, size_t>, std::string> + RetrieveFileContent(const std::string& file_id) const = 0; + + virtual cpp::result, size_t>, std::string> + RetrieveFileContentByPath(const std::string& path) const = 0; + + virtual cpp::result DeleteFileLocal( + const std::string& file_id) = 0; + + virtual ~FileRepository() = default; +}; diff --git a/engine/controllers/files.cc b/engine/controllers/files.cc new file mode 100644 index 000000000..e0cd502f4 --- /dev/null +++ b/engine/controllers/files.cc @@ -0,0 +1,269 @@ +#include "files.h" +#include "common/api-dto/delete_success_response.h" +#include "utils/cortex_utils.h" +#include "utils/logging_utils.h" + +void Files::UploadFile(const HttpRequestPtr& req, + std::function&& callback) { + MultiPartParser parser; + if (parser.parse(req) != 0 || parser.getFiles().size() != 1) { + Json::Value root; + root["message"] = "Must only be one file"; + auto response = cortex_utils::CreateCortexHttpJsonResponse(root); + response->setStatusCode(k400BadRequest); + callback(response); + return; + } + + auto params = parser.getParameters(); + if (params.find("purpose") == params.end()) { + Json::Value root; + root["message"] = "purpose is mandatory"; + auto response = cortex_utils::CreateCortexHttpJsonResponse(root); + response->setStatusCode(k400BadRequest); + callback(response); + return; + } + + auto purpose = params["purpose"]; + if (std::find(file_service_->kSupportedPurposes.begin(), + file_service_->kSupportedPurposes.end(), + purpose) == file_service_->kSupportedPurposes.end()) { + Json::Value root; + root["message"] = + "purpose is not supported. Purpose can only one of these types: " + "assistants, vision, batch or fine-tune"; + auto response = cortex_utils::CreateCortexHttpJsonResponse(root); + response->setStatusCode(k400BadRequest); + callback(response); + return; + } + + const auto& file = parser.getFiles()[0]; + auto result = + file_service_->UploadFile(file.getFileName(), purpose, + file.fileContent().data(), file.fileLength()); + + if (result.has_error()) { + Json::Value ret; + ret["message"] = result.error(); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(ret); + resp->setStatusCode(k400BadRequest); + callback(resp); + } else { + auto resp = + cortex_utils::CreateCortexHttpJsonResponse(result->ToJson().value()); + resp->setStatusCode(k200OK); + callback(resp); + } +} + +void Files::ListFiles(const HttpRequestPtr& req, + std::function&& callback, + std::optional purpose, + std::optional limit, + std::optional order, + std::optional after) const { + auto res = file_service_->ListFiles( + purpose.value_or(""), std::stoi(limit.value_or("20")), + order.value_or("desc"), after.value_or("")); + if (res.has_error()) { + Json::Value root; + root["message"] = res.error(); + auto response = cortex_utils::CreateCortexHttpJsonResponse(root); + response->setStatusCode(k400BadRequest); + callback(response); + return; + } + + Json::Value msg_arr(Json::arrayValue); + for (auto& msg : res.value()) { + if (auto it = msg.ToJson(); it.has_value()) { + msg_arr.append(it.value()); + } else { + CTL_WRN("Failed to convert message to json: " + it.error()); + } + } + + Json::Value root; + root["object"] = "list"; + root["data"] = msg_arr; + auto response = cortex_utils::CreateCortexHttpJsonResponse(root); + response->setStatusCode(k200OK); + callback(response); +} + +void Files::RetrieveFile(const HttpRequestPtr& req, + std::function&& callback, + const std::string& file_id, + std::optional thread_id) const { + // this code part is for backward compatible. remove it later on + if (thread_id.has_value()) { + auto msg_res = + message_service_->RetrieveMessage(thread_id.value(), file_id); + if (msg_res.has_error()) { + Json::Value ret; + ret["message"] = msg_res.error(); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(ret); + resp->setStatusCode(k400BadRequest); + callback(resp); + return; + } + + if (msg_res->attachments->empty()) { + auto res = file_service_->RetrieveFile(file_id); + if (res.has_error()) { + Json::Value ret; + ret["message"] = res.error(); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(ret); + resp->setStatusCode(k400BadRequest); + callback(resp); + return; + } + + auto resp = + cortex_utils::CreateCortexHttpJsonResponse(res->ToJson().value()); + resp->setStatusCode(k200OK); + callback(resp); + return; + } else { + if (!msg_res->attach_filename.has_value() || !msg_res->size.has_value()) { + Json::Value ret; + ret["message"] = "File not found or had been removed!"; + auto resp = cortex_utils::CreateCortexHttpJsonResponse(ret); + resp->setStatusCode(k404NotFound); + callback(resp); + return; + } + + Json::Value ret; + ret["object"] = "file"; + ret["created_at"] = msg_res->created_at; + ret["filename"] = msg_res->attach_filename.value(); + ret["bytes"] = msg_res->size.value(); + ret["id"] = msg_res->id; + ret["purpose"] = "assistants"; + + auto resp = cortex_utils::CreateCortexHttpJsonResponse(ret); + resp->setStatusCode(k200OK); + callback(resp); + return; + } + } + + auto res = file_service_->RetrieveFile(file_id); + if (res.has_error()) { + Json::Value ret; + ret["message"] = res.error(); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(ret); + resp->setStatusCode(k400BadRequest); + callback(resp); + return; + } + + auto resp = cortex_utils::CreateCortexHttpJsonResponse(res->ToJson().value()); + resp->setStatusCode(k200OK); + callback(resp); +} + +void Files::DeleteFile(const HttpRequestPtr& req, + std::function&& callback, + const std::string& file_id) { + auto res = file_service_->DeleteFileLocal(file_id); + if (res.has_error()) { + Json::Value ret; + ret["message"] = res.error(); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(ret); + resp->setStatusCode(k400BadRequest); + callback(resp); + return; + } + + api_response::DeleteSuccessResponse response; + response.id = file_id; + response.object = "file"; + response.deleted = true; + auto resp = + cortex_utils::CreateCortexHttpJsonResponse(response.ToJson().value()); + resp->setStatusCode(k200OK); + callback(resp); +} + +void Files::RetrieveFileContent( + const HttpRequestPtr& req, + std::function&& callback, + const std::string& file_id, std::optional thread_id) { + if (thread_id.has_value()) { + auto msg_res = + message_service_->RetrieveMessage(thread_id.value(), file_id); + if (msg_res.has_error()) { + Json::Value ret; + ret["message"] = msg_res.error(); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(ret); + resp->setStatusCode(k400BadRequest); + callback(resp); + return; + } + + if (msg_res->attachments->empty()) { + auto res = file_service_->RetrieveFileContent(file_id); + if (res.has_error()) { + Json::Value ret; + ret["message"] = res.error(); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(ret); + resp->setStatusCode(k400BadRequest); + callback(resp); + return; + } + + auto [buffer, size] = std::move(res.value()); + auto resp = HttpResponse::newHttpResponse(); + resp->setBody(std::string(buffer.get(), size)); + resp->setContentTypeCode(CT_APPLICATION_OCTET_STREAM); + callback(resp); + } else { + if (!msg_res->rel_path.has_value()) { + Json::Value ret; + ret["message"] = "File not found or had been removed"; + auto resp = cortex_utils::CreateCortexHttpJsonResponse(ret); + resp->setStatusCode(k400BadRequest); + callback(resp); + return; + } + + auto content_res = + file_service_->RetrieveFileContentByPath(msg_res->rel_path.value()); + + if (content_res.has_error()) { + Json::Value ret; + ret["message"] = content_res.error(); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(ret); + resp->setStatusCode(k400BadRequest); + callback(resp); + return; + } + + auto [buffer, size] = std::move(content_res.value()); + auto resp = HttpResponse::newHttpResponse(); + resp->setBody(std::string(buffer.get(), size)); + resp->setContentTypeCode(CT_APPLICATION_OCTET_STREAM); + callback(resp); + } + } + + auto res = file_service_->RetrieveFileContent(file_id); + if (res.has_error()) { + Json::Value ret; + ret["message"] = res.error(); + auto resp = cortex_utils::CreateCortexHttpJsonResponse(ret); + resp->setStatusCode(k400BadRequest); + callback(resp); + return; + } + + auto [buffer, size] = std::move(res.value()); + auto resp = HttpResponse::newHttpResponse(); + resp->setBody(std::string(buffer.get(), size)); + resp->setContentTypeCode(CT_APPLICATION_OCTET_STREAM); + callback(resp); +} diff --git a/engine/controllers/files.h b/engine/controllers/files.h new file mode 100644 index 000000000..efd7f6d93 --- /dev/null +++ b/engine/controllers/files.h @@ -0,0 +1,62 @@ +#pragma once + +#include +#include +#include +#include "services/file_service.h" +#include "services/message_service.h" + +using namespace drogon; + +class Files : public drogon::HttpController { + public: + METHOD_LIST_BEGIN + ADD_METHOD_TO(Files::UploadFile, "/v1/files", Options, Post); + + ADD_METHOD_TO(Files::RetrieveFile, "/v1/files/{file_id}?thread={thread_id}", + Get); + + ADD_METHOD_TO( + Files::ListFiles, + "/v1/files?purpose={purpose}&limit={limit}&order={order}&after={after}", + Get); + + ADD_METHOD_TO(Files::DeleteFile, "/v1/files/{file_id}", Options, Delete); + + ADD_METHOD_TO(Files::RetrieveFileContent, + "/v1/files/{file_id}/content?thread={thread_id}", Get); + + METHOD_LIST_END + + explicit Files(std::shared_ptr file_service, + std::shared_ptr msg_service) + : file_service_{file_service}, message_service_{msg_service} {} + + void UploadFile(const HttpRequestPtr& req, + std::function&& callback); + + void ListFiles(const HttpRequestPtr& req, + std::function&& callback, + std::optional purpose, + std::optional limit, + std::optional order, + std::optional after) const; + + void RetrieveFile(const HttpRequestPtr& req, + std::function&& callback, + const std::string& file_id, + std::optional thread_id) const; + + void DeleteFile(const HttpRequestPtr& req, + std::function&& callback, + const std::string& file_id); + + void RetrieveFileContent( + const HttpRequestPtr& req, + std::function&& callback, + const std::string& file_id, std::optional thread_id); + + private: + std::shared_ptr file_service_; + std::shared_ptr message_service_; +}; diff --git a/engine/database/file.cc b/engine/database/file.cc new file mode 100644 index 000000000..3f9a37b98 --- /dev/null +++ b/engine/database/file.cc @@ -0,0 +1,96 @@ +#include "file.h" +#include "utils/logging_utils.h" +#include "utils/scope_exit.h" + +namespace cortex::db { + +cpp::result, std::string> File::GetFileList() const { + try { + db_.exec("BEGIN TRANSACTION;"); + cortex::utils::ScopeExit se([this] { db_.exec("COMMIT;"); }); + std::vector entries; + SQLite::Statement query(db_, + "SELECT id, object, " + "purpose, filename, created_at, bytes FROM files"); + + while (query.executeStep()) { + OpenAi::File entry; + entry.id = query.getColumn(0).getString(); + entry.object = query.getColumn(1).getString(); + entry.purpose = query.getColumn(2).getString(); + entry.filename = query.getColumn(3).getString(); + entry.created_at = query.getColumn(4).getInt(); + entry.bytes = query.getColumn(5).getInt(); + entries.push_back(entry); + } + return entries; + } catch (const std::exception& e) { + CTL_WRN(e.what()); + return cpp::fail(e.what()); + } +} + +cpp::result File::GetFileById( + const std::string& file_id) const { + try { + SQLite::Statement query(db_, + "SELECT id, object, " + "purpose, filename, created_at, bytes FROM files " + "WHERE id = ?"); + + query.bind(1, file_id); + if (query.executeStep()) { + OpenAi::File entry; + entry.id = query.getColumn(0).getString(); + entry.object = query.getColumn(1).getString(); + entry.purpose = query.getColumn(2).getString(); + entry.filename = query.getColumn(3).getString(); + entry.created_at = query.getColumn(4).getInt(); + entry.bytes = query.getColumn(5).getInt64(); + return entry; + } else { + return cpp::fail("File not found: " + file_id); + } + } catch (const std::exception& e) { + return cpp::fail(e.what()); + } +} + +cpp::result File::AddFileEntry(OpenAi::File& file) { + try { + SQLite::Statement insert( + db_, + "INSERT INTO files (id, object, " + "purpose, filename, created_at, bytes) VALUES (?, ?, " + "?, ?, ?, ?)"); + insert.bind(1, file.id); + insert.bind(2, file.object); + insert.bind(3, file.purpose); + insert.bind(4, file.filename); + insert.bind(5, std::to_string(file.created_at)); + insert.bind(6, std::to_string(file.bytes)); + insert.exec(); + + CTL_INF("Inserted: " << file.ToJson()->toStyledString()); + return {}; + } catch (const std::exception& e) { + CTL_WRN(e.what()); + return cpp::fail(e.what()); + } +} + +cpp::result File::DeleteFileEntry( + const std::string& file_id) { + try { + SQLite::Statement del(db_, "DELETE from files WHERE id = ?"); + del.bind(1, file_id); + if (del.exec() == 1) { + CTL_INF("Deleted: " << file_id); + return {}; + } + return {}; + } catch (const std::exception& e) { + return cpp::fail(e.what()); + } +} +} // namespace cortex::db diff --git a/engine/database/file.h b/engine/database/file.h new file mode 100644 index 000000000..be976ecce --- /dev/null +++ b/engine/database/file.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include +#include +#include +#include "common/file.h" +#include "database.h" +#include "utils/result.hpp" + +namespace cortex::db { +class File { + SQLite::Database& db_; + + public: + File(SQLite::Database& db) : db_{db} {}; + + File() : db_(cortex::db::Database::GetInstance().db()) {} + + ~File() {} + + cpp::result, std::string> GetFileList() const; + + cpp::result GetFileById( + const std::string& file_id) const; + + cpp::result AddFileEntry(OpenAi::File& file); + + cpp::result DeleteFileEntry(const std::string& file_id); +}; +} // namespace cortex::db diff --git a/engine/database/models.h b/engine/database/models.h index 74d3937c8..b0c4bc258 100644 --- a/engine/database/models.h +++ b/engine/database/models.h @@ -10,6 +10,7 @@ namespace cortex::db { enum class ModelStatus { Remote, Downloaded, Downloadable }; + struct ModelEntry { std::string model; std::string author_repo_id; @@ -61,4 +62,4 @@ class Models { const std::string& model_src) const; }; -} // namespace cortex::db \ No newline at end of file +} // namespace cortex::db diff --git a/engine/main.cc b/engine/main.cc index 044de87ac..13583dc00 100644 --- a/engine/main.cc +++ b/engine/main.cc @@ -5,6 +5,7 @@ #include "controllers/configs.h" #include "controllers/engines.h" #include "controllers/events.h" +#include "controllers/files.h" #include "controllers/hardware.h" #include "controllers/messages.h" #include "controllers/models.h" @@ -13,6 +14,7 @@ #include "controllers/threads.h" #include "database/database.h" #include "migrations/migration_manager.h" +#include "repositories/file_fs_repository.h" #include "repositories/message_fs_repository.h" #include "repositories/thread_fs_repository.h" #include "services/assistant_service.h" @@ -122,11 +124,13 @@ void RunServer(std::optional port, bool ignore_cout) { auto event_queue_ptr = std::make_shared(); cortex::event::EventProcessor event_processor(event_queue_ptr); - auto msg_repo = std::make_shared( - file_manager_utils::GetCortexDataPath()); - auto thread_repo = std::make_shared( - file_manager_utils::GetCortexDataPath()); + auto data_folder_path = file_manager_utils::GetCortexDataPath(); + auto file_repo = std::make_shared(data_folder_path); + auto msg_repo = std::make_shared(data_folder_path); + auto thread_repo = std::make_shared(data_folder_path); + + auto file_srv = std::make_shared(file_repo); auto assistant_srv = std::make_shared(thread_repo); auto thread_srv = std::make_shared(thread_repo); auto message_srv = std::make_shared(msg_repo); @@ -147,6 +151,7 @@ void RunServer(std::optional port, bool ignore_cout) { file_watcher_srv->start(); // initialize custom controllers + auto file_ctl = std::make_shared(file_srv, message_srv); auto assistant_ctl = std::make_shared(assistant_srv); auto thread_ctl = std::make_shared(thread_srv, message_srv); auto message_ctl = std::make_shared(message_srv); @@ -160,6 +165,7 @@ void RunServer(std::optional port, bool ignore_cout) { std::make_shared(inference_svc, engine_service); auto config_ctl = std::make_shared(config_service); + drogon::app().registerController(file_ctl); drogon::app().registerController(assistant_ctl); drogon::app().registerController(thread_ctl); drogon::app().registerController(message_ctl); @@ -171,9 +177,6 @@ void RunServer(std::optional port, bool ignore_cout) { drogon::app().registerController(hw_ctl); drogon::app().registerController(config_ctl); - auto upload_path = std::filesystem::temp_directory_path() / "cortex-uploads"; - drogon::app().setUploadPath(upload_path.string()); - LOG_INFO << "Server started, listening at: " << config.apiServerHost << ":" << config.apiServerPort; LOG_INFO << "Please load your model"; @@ -188,6 +191,12 @@ void RunServer(std::optional port, bool ignore_cout) { LOG_INFO << "Number of thread is:" << drogon::app().getThreadNum(); drogon::app().disableSigtermHandling(); + // file upload + drogon::app() + .enableCompressedRequest(true) + .setClientMaxBodySize(256 * 1024 * 1024) // Max 256MiB body size + .setClientMaxMemoryBodySize(1024 * 1024); // 1MiB before writing to disk + // CORS drogon::app().registerPostHandlingAdvice( [config_service](const drogon::HttpRequestPtr& req, diff --git a/engine/migrations/migration_manager.cc b/engine/migrations/migration_manager.cc index 6936f45a0..26197115d 100644 --- a/engine/migrations/migration_manager.cc +++ b/engine/migrations/migration_manager.cc @@ -8,6 +8,8 @@ #include "v0/migration.h" #include "v1/migration.h" #include "v2/migration.h" +#include "v3/migration.h" + namespace cortex::migr { namespace { @@ -145,8 +147,8 @@ cpp::result MigrationManager::DoUpFolderStructure( return v1::MigrateFolderStructureUp(); case 2: return v2::MigrateFolderStructureUp(); - - break; + case 3: + return v3::MigrateFolderStructureUp(); default: return true; @@ -161,7 +163,8 @@ cpp::result MigrationManager::DoDownFolderStructure( return v1::MigrateFolderStructureDown(); case 2: return v2::MigrateFolderStructureDown(); - break; + case 3: + return v3::MigrateFolderStructureDown(); default: return true; @@ -198,7 +201,8 @@ cpp::result MigrationManager::DoUpDB(int version) { return v1::MigrateDBUp(db_); case 2: return v2::MigrateDBUp(db_); - break; + case 3: + return v3::MigrateDBUp(db_); default: return true; @@ -213,7 +217,8 @@ cpp::result MigrationManager::DoDownDB(int version) { return v1::MigrateDBDown(db_); case 2: return v2::MigrateDBDown(db_); - break; + case 3: + return v3::MigrateDBDown(db_); default: return true; @@ -247,4 +252,4 @@ cpp::result MigrationManager::UpdateSchemaVersion( return cpp::fail(e.what()); } } -} // namespace cortex::migr \ No newline at end of file +} // namespace cortex::migr diff --git a/engine/migrations/migration_manager.h b/engine/migrations/migration_manager.h index b05a76c26..05fc42693 100644 --- a/engine/migrations/migration_manager.h +++ b/engine/migrations/migration_manager.h @@ -1,6 +1,6 @@ #pragma once + #include "migration_helper.h" -#include "v0/migration.h" namespace cortex::migr { class MigrationManager { @@ -28,4 +28,4 @@ class MigrationManager { MigrationHelper mgr_helper_; SQLite::Database& db_; }; -} // namespace cortex::migr \ No newline at end of file +} // namespace cortex::migr diff --git a/engine/migrations/schema_version.h b/engine/migrations/schema_version.h index 5739040d0..619f3054d 100644 --- a/engine/migrations/schema_version.h +++ b/engine/migrations/schema_version.h @@ -1,5 +1,4 @@ #pragma once //Track the current schema version -#define SCHEMA_VERSION 2 - +#define SCHEMA_VERSION 3 diff --git a/engine/migrations/v3/migration.h b/engine/migrations/v3/migration.h new file mode 100644 index 000000000..3bed802fb --- /dev/null +++ b/engine/migrations/v3/migration.h @@ -0,0 +1,73 @@ +#pragma once + +#include +#include +#include "utils/logging_utils.h" +#include "utils/result.hpp" + +namespace cortex::migr::v3 { +inline cpp::result MigrateFolderStructureUp() { + return true; +} + +inline cpp::result MigrateFolderStructureDown() { + // CTL_INF("Folder structure already up to date!"); + return true; +} + +// Database +inline cpp::result MigrateDBUp(SQLite::Database& db) { + try { + db.exec( + "CREATE TABLE IF NOT EXISTS schema_version ( version INTEGER PRIMARY " + "KEY);"); + + // files + { + // Check if the table exists + SQLite::Statement query(db, + "SELECT name FROM sqlite_master WHERE " + "type='table' AND name='files'"); + auto table_exists = query.executeStep(); + + if (!table_exists) { + // Create new table + db.exec( + "CREATE TABLE files (" + "id TEXT PRIMARY KEY," + "object TEXT," + "purpose TEXT," + "filename TEXT," + "created_at INTEGER," + "bytes INTEGER" + ")"); + } + } + + return true; + } catch (const std::exception& e) { + CTL_WRN("Migration up failed: " << e.what()); + return cpp::fail(e.what()); + } +}; + +inline cpp::result MigrateDBDown(SQLite::Database& db) { + try { + // hardware + { + SQLite::Statement query(db, + "SELECT name FROM sqlite_master WHERE " + "type='table' AND name='hardware'"); + auto table_exists = query.executeStep(); + if (table_exists) { + db.exec("DROP TABLE files"); + } + } + + return true; + } catch (const std::exception& e) { + CTL_WRN("Migration down failed: " << e.what()); + return cpp::fail(e.what()); + } +} +}; // namespace cortex::migr::v3 diff --git a/engine/repositories/file_fs_repository.cc b/engine/repositories/file_fs_repository.cc new file mode 100644 index 000000000..b9ab4fec6 --- /dev/null +++ b/engine/repositories/file_fs_repository.cc @@ -0,0 +1,169 @@ +#include "file_fs_repository.h" +#include +#include +#include +#include "database/file.h" +#include "utils/logging_utils.h" +#include "utils/result.hpp" + +std::filesystem::path FileFsRepository::GetFilePath() const { + return data_folder_path_ / kFileContainerFolderName; +} + +cpp::result FileFsRepository::StoreFile( + OpenAi::File& file_metadata, const char* content, uint64_t length) { + auto file_container_path = GetFilePath(); + if (!std::filesystem::exists(file_container_path)) { + std::filesystem::create_directories(file_container_path); + } + + cortex::db::File db; + auto file_full_path = file_container_path / file_metadata.filename; + if (std::filesystem::exists(file_full_path)) { + return cpp::fail("File already exists: " + file_full_path.string()); + } + + try { + std::ofstream file(file_full_path, std::ios::binary); + if (!file) { + return cpp::fail("Failed to open file for writing: " + + file_full_path.string()); + } + + file.write(content, length); + file.flush(); + file.close(); + + auto result = db.AddFileEntry(file_metadata); + if (result.has_error()) { + std::filesystem::remove(file_full_path); + return cpp::fail(result.error()); + } + + return {}; + } catch (const std::exception& e) { + CTL_ERR("Failed to store file: " << e.what()); + return cpp::fail("Failed to write file: " + file_full_path.string() + + ", error: " + e.what()); + } +} + +cpp::result, std::string> FileFsRepository::ListFiles( + const std::string& purpose, uint8_t limit, const std::string& order, + const std::string& after) const { + cortex::db::File db; + auto res = db.GetFileList(); + if (res.has_error()) { + return cpp::fail(res.error()); + } + auto files = res.value(); + + if (order == "desc") { + std::sort(files.begin(), files.end(), + [](const OpenAi::File& a, const OpenAi::File& b) { + return a.id > b.id; + }); + } else { + std::sort(files.begin(), files.end(), + [](const OpenAi::File& a, const OpenAi::File& b) { + return a.id < b.id; + }); + } + + if (limit > 0 && files.size() > limit) { + files.resize(limit); + } + + return files; +} + +cpp::result FileFsRepository::RetrieveFile( + const std::string file_id) const { + CTL_INF("Retrieving file: " + file_id); + + auto file_container_path = GetFilePath(); + cortex::db::File db; + auto res = db.GetFileById(file_id); + if (res.has_error()) { + return cpp::fail(res.error()); + } + + return res.value(); +} + +cpp::result, size_t>, std::string> +FileFsRepository::RetrieveFileContent(const std::string& file_id) const { + auto file_container_path = GetFilePath(); + auto file_metadata = RetrieveFile(file_id); + if (file_metadata.has_error()) { + return cpp::fail(file_metadata.error()); + } + auto file_path = file_container_path / file_metadata->filename; + if (!std::filesystem::exists(file_path)) { + return cpp::fail("File content not found: " + file_path.string()); + } + size_t size = std::filesystem::file_size(file_path); + auto buffer = std::make_unique(size); + std::ifstream file(file_path, std::ios::binary); + if (!file.read(buffer.get(), size)) { + return cpp::fail("Failed to read file: " + file_path.string()); + } + + return std::make_pair(std::move(buffer), size); +} + +cpp::result, size_t>, std::string> +FileFsRepository::RetrieveFileContentByPath(const std::string& path) const { + auto file_path = data_folder_path_ / path; + if (!std::filesystem::exists(file_path)) { + return cpp::fail("File not found: " + path); + } + + try { + size_t size = std::filesystem::file_size(file_path); + auto buffer = std::make_unique(size); + + std::ifstream file(file_path, std::ios::binary); + if (!file.read(buffer.get(), size)) { + return cpp::fail("Failed to read file: " + file_path.string()); + } + + return std::make_pair(std::move(buffer), size); + } catch (const std::exception& e) { + CTL_ERR("Failed to retrieve file content: " << e.what()); + return cpp::fail("Failed to retrieve file content"); + } +} + +cpp::result FileFsRepository::DeleteFileLocal( + const std::string& file_id) { + CTL_INF("Deleting file: " + file_id); + auto file_container_path = GetFilePath(); + cortex::db::File db; + auto file_metadata = db.GetFileById(file_id); + if (file_metadata.has_error()) { + return cpp::fail(file_metadata.error()); + } + + auto file_path = file_container_path / file_metadata->filename; + + auto res = db.DeleteFileEntry(file_id); + if (res.has_error()) { + CTL_ERR("Failed to delete file entry: " << res.error()); + return cpp::fail(res.error()); + } + + if (!std::filesystem::exists(file_path)) { + CTL_INF("File not found: " + file_path.string()); + return {}; + } + + try { + std::filesystem::remove_all(file_path); + return {}; + } catch (const std::exception& e) { + CTL_ERR("Failed to delete file: " << e.what()); + return cpp::fail("Failed to delete file: " + file_container_path.string() + + ", error: " + e.what()); + } +} diff --git a/engine/repositories/file_fs_repository.h b/engine/repositories/file_fs_repository.h new file mode 100644 index 000000000..974e81fa4 --- /dev/null +++ b/engine/repositories/file_fs_repository.h @@ -0,0 +1,50 @@ +#pragma once + +#include +#include "common/repository/file_repository.h" +#include "utils/logging_utils.h" + +class FileFsRepository : public FileRepository { + public: + constexpr static auto kFileContainerFolderName = "files"; + + cpp::result StoreFile(OpenAi::File& file_metadata, + const char* content, + uint64_t length) override; + + cpp::result, std::string> ListFiles( + const std::string& purpose, uint8_t limit, const std::string& order, + const std::string& after) const override; + + cpp::result RetrieveFile( + const std::string file_id) const override; + + cpp::result, size_t>, std::string> + RetrieveFileContent(const std::string& file_id) const override; + + cpp::result, size_t>, std::string> + RetrieveFileContentByPath(const std::string& path) const override; + + cpp::result DeleteFileLocal( + const std::string& file_id) override; + + explicit FileFsRepository(std::filesystem::path data_folder_path) + : data_folder_path_{data_folder_path} { + CTL_INF("Constructing FileFsRepository.."); + auto file_container_path = data_folder_path_ / kFileContainerFolderName; + + if (!std::filesystem::exists(file_container_path)) { + std::filesystem::create_directories(file_container_path); + } + } + + ~FileFsRepository() = default; + + private: + std::filesystem::path GetFilePath() const; + + /** + * The path to the data folder. + */ + std::filesystem::path data_folder_path_; +}; diff --git a/engine/services/file_service.cc b/engine/services/file_service.cc new file mode 100644 index 000000000..f2514fbfb --- /dev/null +++ b/engine/services/file_service.cc @@ -0,0 +1,55 @@ +#include "file_service.h" +#include +#include "utils/ulid/ulid.hh" + +cpp::result FileService::UploadFile( + const std::string& filename, const std::string& purpose, + const char* content, uint64_t content_length) { + + auto seconds_since_epoch = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + auto file_id{"file-" + ulid::Marshal(ulid::CreateNowRand())}; + OpenAi::File file; + file.id = file_id; + file.object = "file"; + file.bytes = content_length; + file.created_at = seconds_since_epoch; + file.filename = filename; + file.purpose = purpose; + + auto res = file_repository_->StoreFile(file, content, content_length); + if (res.has_error()) { + return cpp::fail(res.error()); + } + + return file; +} + +cpp::result, std::string> FileService::ListFiles( + const std::string& purpose, uint8_t limit, const std::string& order, + const std::string& after) const { + return file_repository_->ListFiles(purpose, limit, order, after); +} + +cpp::result FileService::RetrieveFile( + const std::string& file_id) const { + return file_repository_->RetrieveFile(file_id); +} + +cpp::result FileService::DeleteFileLocal( + const std::string& file_id) { + return file_repository_->DeleteFileLocal(file_id); +} + +cpp::result, size_t>, std::string> +FileService::RetrieveFileContent(const std::string& file_id) const { + return file_repository_->RetrieveFileContent(file_id); +} + +cpp::result, size_t>, std::string> +FileService::RetrieveFileContentByPath(const std::string& path) const { + return file_repository_->RetrieveFileContentByPath(path); +} diff --git a/engine/services/file_service.h b/engine/services/file_service.h new file mode 100644 index 000000000..397feda20 --- /dev/null +++ b/engine/services/file_service.h @@ -0,0 +1,40 @@ +#pragma once + +#include "common/file.h" +#include "common/repository/file_repository.h" +#include "utils/result.hpp" + +class FileService { + public: + const std::vector kSupportedPurposes{"assistants", "vision", + "batch", "fine-tune"}; + + cpp::result UploadFile(const std::string& filename, + const std::string& purpose, + const char* content, + uint64_t content_length); + + cpp::result, std::string> ListFiles( + const std::string& purpose, uint8_t limit, const std::string& order, + const std::string& after) const; + + cpp::result RetrieveFile( + const std::string& file_id) const; + + cpp::result DeleteFileLocal(const std::string& file_id); + + cpp::result, size_t>, std::string> + RetrieveFileContent(const std::string& file_id) const; + + /** + * For getting file content by **relative** path. + */ + cpp::result, size_t>, std::string> + RetrieveFileContentByPath(const std::string& path) const; + + explicit FileService(std::shared_ptr file_repository) + : file_repository_{file_repository} {} + + private: + std::shared_ptr file_repository_; +};