diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml index 1e8b6bc..d12734e 100644 --- a/.github/workflows/windows.yml +++ b/.github/workflows/windows.yml @@ -27,6 +27,7 @@ jobs: - uses: xmake-io/github-action-setup-xmake@v1 with: xmake-version: 2.9.3 + actions-cache-folder: '.xmake-cache' - name: configure shell: cmd @@ -38,7 +39,3 @@ jobs: run: | xmake -b unit-test - - name: test - shell: cmd - run: | - xmake r unit-test diff --git a/config.h.in b/config.h.in index 798409e..74ddc3d 100644 --- a/config.h.in +++ b/config.h.in @@ -40,6 +40,8 @@ ${define HKU_ENABLE_HTTP_CLIENT} ${define HKU_ENABLE_HTTP_CLIENT_SSL} ${define HKU_ENABLE_HTTP_CLIENT_ZIP} +${define HKU_ENABLE_NODE} + // clang-format on #endif /* HKU_UTILS_CONFIG_H_*/ \ No newline at end of file diff --git a/copy_dependents.lua b/copy_dependents.lua index 48b6fc5..146fd46 100644 --- a/copy_dependents.lua +++ b/copy_dependents.lua @@ -39,7 +39,7 @@ task("copy_dependents") end elseif type(pkg_path) == 'table' then for i=1, #pkg_path do - local pos = string.find(pkg_path[i], "yh_utils") + local pos = string.find(pkg_path[i], "hku_utils") if pos == nil then pos = string.find(pkg_path[i], "opencv") if pos == nil then @@ -49,11 +49,11 @@ task("copy_dependents") end else for _, filedir in ipairs(os.dirs(pkg_path[i] .. "/*")) do - local pos = string.find(filedir, "yihua") + local pos = string.find(filedir, "hikyuu") if pos == nil then os.trycp(filedir, destpath .. "/include") else - os.trycp(filedir .. "/utils", destpath .. "/include/yihua") + os.trycp(filedir .. "/utilites", destpath .. "/include/hikyuu") end end end @@ -64,7 +64,8 @@ task("copy_dependents") -- 拷贝依赖的库文件 os.trycp(pkg:installdir() .. "/lib/*", libdir) if is_plat("windows") then - os.trycp(pkg:installdir() .. "/bin/*.dll", libdir) + -- os.trycp(pkg:installdir() .. "/bin/*.dll", libdir) + os.trycp(pkg:installdir() .. "/lib/*.dll", destpath .. '/bin') end :: continue :: diff --git a/hikyuu/utilities/http_client/HttpClient.cpp b/hikyuu/utilities/http_client/HttpClient.cpp index ae5ed00..1f5fb19 100644 --- a/hikyuu/utilities/http_client/HttpClient.cpp +++ b/hikyuu/utilities/http_client/HttpClient.cpp @@ -5,8 +5,6 @@ * Author: fasiondog */ -#include "hikyuu/utilities/Log.h" -#include "hikyuu/utilities/os.h" #include "HttpClient.h" #if HKU_ENABLE_HTTP_CLIENT_ZIP @@ -14,6 +12,11 @@ #include "gzip/decompress.hpp" #endif +#include +#include "hikyuu/utilities/Log.h" +#include "hikyuu/utilities/os.h" +#include "url.h" + namespace hku { HttpResponse::HttpResponse() { @@ -31,6 +34,7 @@ void HttpResponse::reset() { nng_http_res_free(m_res); NNG_CHECK(nng_http_res_alloc(&m_res)); } + m_body.clear(); } HttpResponse::HttpResponse(HttpResponse&& rhs) : m_res(rhs.m_res), m_body(std::move(rhs.m_body)) { @@ -99,121 +103,105 @@ void HttpClient::_connect() { } HttpResponse HttpClient::request(const std::string& method, const std::string& path, - const HttpHeaders& headers, const char* body, size_t len, + const HttpParams& params, const HttpHeaders& headers, + const char* body, size_t body_len, const std::string& content_type) { HKU_CHECK(m_url.valid(), "Invalid url: {}", m_url.raw_url()); HttpResponse res; try { - nng::http_req req(m_url); - req.set_method(method).set_uri(path).add_headers(m_default_headers).add_headers(headers); - if (body != nullptr) { - HKU_CHECK(len > 0, "Body is not null, but len is zero!"); - req.add_header("Content-Type", content_type); - -#if HKU_ENABLE_HTTP_CLIENT_ZIP - if (req.get_header("Content-Encoding") == "gzip") { - gzip::Compressor comp(Z_DEFAULT_COMPRESSION); - std::string output; - comp.compress(output, body, len); - req.copy_data(output.data(), output.size()); + std::ostringstream buf; + bool first = true; + for (auto iter = params.cbegin(); iter != params.cend(); ++iter) { + if (first) { + buf << "?"; } else { - req.set_data(body, len); + buf << "&"; } -#else - req.del_header("Content-Encoding").set_data(body, len); -#endif + buf << iter->first << "=" << iter->second; } - int count = 0; - while (count < 2) { - count++; - _connect(); - - // Send the request, and wait for that to finish. - m_conn.write_req(req, m_aio); - int rv = m_aio.wait().result(); - if (NNG_ECLOSED == rv || NNG_ECONNSHUT == rv || NNG_ECONNREFUSED == rv) { - // HKU_DEBUG("rv: {}", nng_strerror(rv)); - reset(); - res.reset(); - continue; - } else if (NNG_ETIMEDOUT == rv) { - throw HttpTimeoutException(); - } else if (0 != rv) { - HKU_THROW("[NNG_ERROR] {} ", nng_strerror(rv)); - } + std::string uri = buf.str(); + uri = uri.empty() ? path : fmt::format("{}{}", path, uri); + res = _readResChunk(method, uri, headers, body, body_len, content_type); - m_conn.read_res(res.get(), m_aio); - rv = m_aio.wait().result(); - if (0 == rv) { - break; - } else if (NNG_ETIMEDOUT == rv) { - throw HttpTimeoutException(); - } else if (NNG_ECLOSED == rv || NNG_ECONNSHUT == rv || NNG_ECONNREFUSED == rv) { - // HKU_DEBUG("rv: {}", nng_strerror(rv)); - reset(); - res.reset(); - } else { - HKU_THROW("[NNG_ERROR] {} ", nng_strerror(rv)); - } + if (res.getHeader("Connection") == "close") { + HKU_WARN("Connect closed"); + reset(); } - HKU_IF_RETURN(res.status() != NNG_HTTP_STATUS_OK, res); - - std::string hdr = res.getHeader("Content-Length"); - HKU_WARN_IF_RETURN(hdr.empty(), res, "Missing Content-Length header."); + } catch (const std::exception&) { + reset(); + throw; + } catch (...) { + reset(); + HKU_THROW_UNKNOWN; + } + return res; +} - size_t len = std::stoi(hdr); - HKU_IF_RETURN(len == 0, res); +HttpResponse HttpClient::_readResChunk(const std::string& method, const std::string& uri, + const HttpHeaders& headers, const char* body, + size_t body_len, const std::string& content_type) { + HttpResponse res; + nng::http_req req(m_url); + req.set_method(method).set_uri(uri).add_headers(m_default_headers).add_headers(headers); + if (body != nullptr) { + HKU_CHECK(body_len > 0, "Body is not null, but len is zero!"); + req.add_header("Content-Type", content_type); #if HKU_ENABLE_HTTP_CLIENT_ZIP - if (res.getHeader("Content-Encoding") == "gzip") { - nng_iov iov; - auto data = std::unique_ptr(new char[len]); - iov.iov_len = len; - iov.iov_buf = data.get(); - m_aio.set_iov(1, &iov); - m_conn.read_all(m_aio); - NNG_CHECK(m_aio.wait().result()); - - gzip::Decompressor decomp; - decomp.decompress(res.m_body, data.get(), len); - + if (req.get_header("Content-Encoding") == "gzip") { + gzip::Compressor comp(Z_DEFAULT_COMPRESSION); + std::string output; + comp.compress(output, body, body_len); + req.copy_data(output.data(), output.size()); } else { - res._resizeBody(len); - nng_iov iov; - iov.iov_len = len; - iov.iov_buf = res.m_body.data(); - m_aio.set_iov(1, &iov); - m_conn.read_all(m_aio); - NNG_CHECK(m_aio.wait().result()); + req.set_data(body, body_len); } #else - HKU_WARN_IF( - res.getHeader("Content-Encoding") == "gzip", - "Automatic decompression is not supported. You need to decompress it yourself!"); - res._resizeBody(len); - nng_iov iov; - iov.iov_len = len; - iov.iov_buf = res.m_body.data(); - m_aio.set_iov(1, &iov); - m_conn.read_all(m_aio); - NNG_CHECK(m_aio.wait().result()); - -#endif // #if HKU_ENABLE_HTTP_CLIENT_ZIP + req.del_header("Content-Encoding").set_data(body, body_len); +#endif + } - if (res.getHeader("Connection") == "close") { - HKU_WARN("Connect closed"); + int count = 0; + while (count < 2) { + count++; + _connect(); + + m_conn.transact(req.get(), res.get(), m_aio); + int rv = m_aio.wait().result(); + if (0 == rv) { + break; + } else if (NNG_ETIMEDOUT == rv) { + throw HttpTimeoutException(); + } else if (NNG_ECLOSED == rv || NNG_ECONNSHUT == rv || NNG_ECONNREFUSED == rv) { + // HKU_DEBUG("rv: {}", nng_strerror(rv)); reset(); + res.reset(); + } else { + HKU_THROW("[NNG_ERROR] {} ", nng_strerror(rv)); } - } catch (const std::exception&) { - reset(); - throw; - } catch (...) { - reset(); - HKU_THROW_UNKNOWN; } + + HKU_IF_RETURN(res.status() != NNG_HTTP_STATUS_OK, res); + + void* data; + size_t len; + nng_http_res_get_data(res.get(), &data, &len); + +#if HKU_ENABLE_HTTP_CLIENT_ZIP + if (res.getHeader("Content-Encoding") == "gzip") { + res.m_body = gzip::decompress((const char*)data, len); + } else { + res._resizeBody(len); + memcpy(res.m_body.data(), data, len); + } +#else + res._resizeBody(len); + memcpy(res.m_body.data(), data, len); +#endif + return res; } diff --git a/hikyuu/utilities/http_client/HttpClient.h b/hikyuu/utilities/http_client/HttpClient.h index cd01b4f..3ecba6f 100644 --- a/hikyuu/utilities/http_client/HttpClient.h +++ b/hikyuu/utilities/http_client/HttpClient.h @@ -125,29 +125,52 @@ class HKU_UTILS_API HttpClient { void reset(); HttpResponse request(const std::string& method, const std::string& path, - const HttpHeaders& headers, const char* body, size_t len, - const std::string& content_type); + const HttpParams& params, const HttpHeaders& headers, const char* body, + size_t body_len, const std::string& content_type); - HttpResponse get(const std::string& path, const HttpHeaders& headers = {}) { - return request("GET", path, headers, nullptr, 0, ""); + HttpResponse get(const std::string& path, const HttpHeaders& headers = HttpHeaders()) { + return request("GET", path, HttpParams(), headers, nullptr, 0, ""); + } + + HttpResponse get(const std::string& path, const HttpParams& params, + const HttpHeaders& headers) { + return request("GET", path, params, headers, nullptr, 0, ""); + } + + HttpResponse post(const std::string& path, const HttpParams& params, const HttpHeaders& headers, + const char* body, size_t len, const std::string& content_type) { + return request("POST", path, params, headers, body, len, content_type); } HttpResponse post(const std::string& path, const HttpHeaders& headers, const char* body, size_t len, const std::string& content_type) { - return request("POST", path, headers, body, len, content_type); + return request("POST", path, HttpParams(), headers, body, len, content_type); + } + + HttpResponse post(const std::string& path, const HttpParams& params, const HttpHeaders& headers, + const std::string& content, const std::string& content_type = "text/plaint") { + return post(path, params, headers, content.data(), content.size(), content_type); } HttpResponse post(const std::string& path, const HttpHeaders& headers, const std::string& content, const std::string& content_type = "text/plaint") { - return post(path, headers, content.data(), content.size(), content_type); + return post(path, HttpParams(), headers, content, content_type); + } + + HttpResponse post(const std::string& path, const HttpParams& params, const HttpHeaders& headers, + const json& body) { + return post(path, params, headers, body.dump(), "application/json"); } HttpResponse post(const std::string& path, const HttpHeaders& headers, const json& body) { - return post(path, headers, body.dump(), "application/json"); + return post(path, HttpParams(), headers, body); } private: void _connect(); + HttpResponse _readResChunk(const std::string& method, const std::string& uri, + const HttpHeaders& headers, const char* body, size_t body_len, + const std::string& content_type); private: HttpHeaders m_default_headers; diff --git a/hikyuu/utilities/http_client/nng_wrap.h b/hikyuu/utilities/http_client/nng_wrap.h index 1529a5e..722e8d1 100644 --- a/hikyuu/utilities/http_client/nng_wrap.h +++ b/hikyuu/utilities/http_client/nng_wrap.h @@ -27,6 +27,7 @@ struct HttpTimeoutException : hku::exception { }; using HttpHeaders = std::map; +using HttpParams = std::map; } // namespace hku @@ -463,6 +464,10 @@ class http_conn final { nng_http_conn_read_all(m_conn, aio.get()); } + void transact(nng_http_req* req, nng_http_res* res, const aio& aio) { + nng_http_conn_transact(m_conn, req, res, aio.get()); + } + private: nng_http_conn* m_conn{nullptr}; }; diff --git a/hikyuu/utilities/http_client/url.cpp b/hikyuu/utilities/http_client/url.cpp new file mode 100644 index 0000000..4052ff9 --- /dev/null +++ b/hikyuu/utilities/http_client/url.cpp @@ -0,0 +1,56 @@ +/* + * Copyright(C) 2021 hikyuu.org + * + * Create on: 2021-03-07 + * Author: fasiondog + */ + +#include "url.h" + +namespace hku { + +#define IS_ALPHA(c) (((c) >= 'a' && (c) <= 'z') || ((c) >= 'A' && (c) <= 'Z')) +#define IS_NUM(c) ((c) >= '0' && (c) <= '9') +#define IS_ALPHANUM(c) (IS_ALPHA(c) || IS_NUM(c)) +#define IS_HEX(c) (IS_NUM(c) || ((c) >= 'a' && (c) <= 'f') || ((c) >= 'A' && (c) <= 'F')) + +static inline bool is_unambiguous(char c) { + return IS_ALPHANUM(c) || c == '-' || c == '_' || c == '.' || c == '~'; +} + +static inline unsigned char hex2i(char hex) { + return hex <= '9' ? hex - '0' : hex <= 'F' ? hex - 'A' + 10 : hex - 'a' + 10; +} + +std::string url_escape(const char* istr) { + std::string ostr; + const char* p = istr; + char szHex[4] = {0}; + while (*p != '\0') { + if (is_unambiguous(*p)) { + ostr += *p; + } else { + sprintf(szHex, "%%%02X", *p); + ostr += szHex; + } + ++p; + } + return ostr; +} + +std::string url_unescape(const char* istr) { + std::string ostr; + const char* p = istr; + while (*p != '\0') { + if (*p == '%' && IS_HEX(p[1]) && IS_HEX(p[2])) { + ostr += ((hex2i(p[1]) << 4) | hex2i(p[2])); + p += 3; + } else { + ostr += *p; + ++p; + } + } + return ostr; +} + +} // namespace hku \ No newline at end of file diff --git a/hikyuu/utilities/http_client/url.h b/hikyuu/utilities/http_client/url.h new file mode 100644 index 0000000..aa43206 --- /dev/null +++ b/hikyuu/utilities/http_client/url.h @@ -0,0 +1,25 @@ +/* + * Copyright(C) 2021 hikyuu.org + * + * Create on: 2021-03-07 + * Author: fasiondog + */ + +#pragma once +#ifndef HKU_UTILS_URL_H +#define HKU_UTILS_URL_H + +#include + +#ifndef HKU_UTILS_API +#define HKU_UTILS_API +#endif + +namespace hku { + +std::string HKU_UTILS_API url_escape(const char* istr); +std::string HKU_UTILS_API url_unescape(const char* istr); + +} // namespace hku + +#endif \ No newline at end of file diff --git a/hikyuu/utilities/node/NodeClient.h b/hikyuu/utilities/node/NodeClient.h new file mode 100644 index 0000000..f42d5b6 --- /dev/null +++ b/hikyuu/utilities/node/NodeClient.h @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2022 hikyuu.org + * + * Created on: 2022-04-25 + * Author: fasiondog + */ + +#pragma once + +#include "hikyuu/utilities/config.h" +#if !HKU_ENABLE_NODE +#error "Don't enable node client, please config with --node=y" +#endif + +#include +#include +#include +#include "hikyuu/utilities/datetime/Datetime.h" +#include "NodeMessage.h" + +namespace hku { + +class NodeClient { +public: + NodeClient() = default; + + explicit NodeClient(const std::string& serverAddr) : m_server_addr(serverAddr) {} + + virtual ~NodeClient() { + close(); + } + + /** 设置服务端地址 */ + void setServerAddr(const std::string& serverAddr) { + m_server_addr = serverAddr; + } + + /** 连接服务器 */ + bool dial() noexcept { + std::lock_guard lock(m_mutex); + close(); + // HKU_TRACE("dial: {}", m_server_addr); + int rv = nng_req0_open(&m_socket); + // HKU_ERROR_IF_RETURN(rv != 0, false, "Failed open req socket! {}", nng_strerror(rv)); + HKU_IF_RETURN(rv != 0, false); + m_connected = true; + + try { + // 设置发送结果 socket 连接参数 + rv = nng_socket_set_ms(m_socket, NNG_OPT_RECONNMINT, 10); + NODE_NNG_CHECK(rv, "Failed nng_socket_set_ms!"); + + rv = nng_socket_set_ms(m_socket, NNG_OPT_RECONNMAXT, 15000); + NODE_NNG_CHECK(rv, "Failed nng_socket_set_ms!"); + + rv = nng_socket_set_ms(m_socket, NNG_OPT_SENDTIMEO, 10000); + NODE_NNG_CHECK(rv, "Failed nng_socket_set_ms!"); + + rv = nng_socket_set_ms(m_socket, NNG_OPT_RECVTIMEO, 10000); + NODE_NNG_CHECK(rv, "Failed nng_socket_set_ms!"); + + rv = nng_dial(m_socket, m_server_addr.c_str(), NULL, 0); + NODE_NNG_CHECK(rv, "Failed dial server: {}!", m_server_addr); + + return true; + + } catch (const std::exception& e) { + HKU_ERROR_IF(m_show_log, "Failed dail server: {}! {}", m_server_addr, e.what()); + } catch (...) { + HKU_ERROR_IF(m_show_log, "Failed dail server: {}! Unknown error!", m_server_addr); + } + + m_connected = false; + nng_close(m_socket); + return false; + } + + /** 关闭连接 */ + void close() noexcept { + if (m_connected) { + nng_close(m_socket); + m_connected = false; + } + } + + /** 当前连接状态 */ + bool connected() const { + return m_connected; + } + + /** 获取最后一次接收到服务端响应的时间 */ + Datetime getLastAckTime() const { + return m_last_ack_time; + } + + /** + * 发送消息 + * @param req 发送请求消息 + * @param res 返回响应 + */ + bool post(const json& req, json& res) noexcept { + // 保证和服务器的通信必须是 req/res 模式 + std::lock_guard lock(m_mutex); + return _send(req) && _recv(res); + } + + void showLog(bool show) { + m_show_log = show; + } + +private: + bool _send(const json& req) const noexcept { + bool success = false; + // HKU_ERROR_IF_RETURN(!m_connected, success, "Not connected!"); + HKU_IF_RETURN(!m_connected, success); + + nng_msg* msg = nullptr; + int rv = nng_msg_alloc(&msg, 0); + // HKU_ERROR_IF_RETURN(rv != 0, success, "Failed nng_msg_alloc! {}", nng_strerror(rv)); + HKU_IF_RETURN(rv != 0, success); + + try { + encodeMsg(msg, req); + rv = nng_sendmsg(m_socket, msg, 0); + NODE_NNG_CHECK(rv, "Failed nng_sendmsg!"); + success = true; + + } catch (const std::exception& e) { + HKU_ERROR_IF(m_show_log, "Failed send result! {}", e.what()); + } catch (...) { + HKU_ERROR_IF(m_show_log, "Failed send result! Unknown error!"); + } + + if (!success) { + nng_msg_free(msg); + } + + return success; + } + + bool _recv(json& res) noexcept { + bool success = false; + nng_msg* msg{nullptr}; + int rv = nng_recvmsg(m_socket, &msg, 0); + if (rv != 0) { + HKU_ERROR_IF(m_show_log, "Failed nng_recvmsg! {}", nng_strerror(rv)); + return success; + } + + m_last_ack_time = Datetime::now(); + + try { + res = decodeMsg(msg); + success = true; + + } catch (const std::exception& e) { + HKU_ERROR_IF(m_show_log, "Failed recv response! {}", e.what()); + } catch (...) { + HKU_ERROR_IF(m_show_log, "Failed recv response! Unknown error!"); + } + + nng_msg_free(msg); + return success; + } + +private: + std::mutex m_mutex; + std::string m_server_addr; // 服务端地址 + nng_socket m_socket; + Datetime m_last_ack_time{Datetime::now()}; // 最后一次接收服务端响应的时间 + std::atomic_bool m_connected{false}; + std::atomic_bool m_show_log{true}; +}; + +} // namespace hku \ No newline at end of file diff --git a/hikyuu/utilities/node/NodeError.h b/hikyuu/utilities/node/NodeError.h new file mode 100644 index 0000000..1e32608 --- /dev/null +++ b/hikyuu/utilities/node/NodeError.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2022 hikyuu.org + * + * Created on: 2022-04-16 + * Author: fasiondog + */ + +#pragma once + +#include + +namespace hku { + +/** + * 节点返回码 + */ +enum NodeErrorCode { + SUCCESS = 0, + UNKNOWN_ERROR = 1, ///< 未知错误 + NNG_ERROR, ///< nng内部错误 + MISSING_CMD, ///< 缺失命令 + INVALID_CMD, ///< 无效命令,没有相应的处理服务 +}; + +class NodeError : public hku::exception { +public: + NodeError() : NodeError(NodeErrorCode::UNKNOWN_ERROR, "Unknow error!") {} + NodeError(NodeErrorCode errcode, const char* errmsg) + : hku::exception(fmt::format("{} errcode: {}", errmsg, int(errcode))), m_errcode(errcode) {} + NodeError(NodeErrorCode errcode, const std::string& errmsg) + : hku::exception(errmsg), m_errcode(errcode) {} + + NodeErrorCode errcode() const { + return m_errcode; + } + +private: + NodeErrorCode m_errcode = NodeErrorCode::UNKNOWN_ERROR; +}; + +class NodeNngError : public NodeError { +public: + NodeNngError() = delete; + NodeNngError(int rv, const char* msg) + : NodeError(NodeErrorCode::NNG_ERROR, + fmt::format("{} nng error: {} (errcode: {})", msg, nng_strerror(rv), rv)) {} + NodeNngError(int rv, const std::string& msg) + : NodeError(NodeErrorCode::NNG_ERROR, + fmt::format("{} nng error: {} (errcode: {})", msg, nng_strerror(rv), rv)) {} +}; + +#define NODE_CHECK(expr, errcode, ...) \ + { \ + if (!(expr)) { \ + throw NodeError(errcode, fmt::format(__VA_ARGS__)); \ + } \ + } + +#define NODE_NNG_CHECK(rv, ...) \ + { \ + if (rv != 0) { \ + throw NodeNngError(rv, fmt::format(__VA_ARGS__)); \ + } \ + } + +} // namespace hku \ No newline at end of file diff --git a/hikyuu/utilities/node/NodeMessage.h b/hikyuu/utilities/node/NodeMessage.h new file mode 100644 index 0000000..68e727f --- /dev/null +++ b/hikyuu/utilities/node/NodeMessage.h @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2019 hikyuu.org + * + * Created on: 2022-03-27 + * Author: fasiondog + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include "NodeError.h" + +using json = nlohmann::json; + +namespace hku { + +#define NODE_STATUS_TIMEOUT 150 ///< 节点状态超时时长(秒),超时认为连接中断 +#define NODE_STATUS_INTERVAL 60 ///< 发送状态的间隔时间(秒)(心跳) + +/* + * 消息格式 + * req -> + * {"cmd": int, ...} + * + * <- res + * {"ret": code, "msg": str} // msg 存在错误时返回错误信息 (可选) + * + * + */ + +/** + * 对消息进行解码,消息类型和消息体必须匹配 + * @tparam T 消息体类型 + * @param msg 消息 + * @param out 解码输出的消息 + * @exception yas::io_exception 消息解码失败 + * @exception NodeErrorCode 消息类型不匹配 + */ +inline json decodeMsg(nng_msg *msg) { + HKU_ASSERT(msg != nullptr); + size_t len = nng_msg_len(msg); + uint8_t *data = (uint8_t *)nng_msg_body(msg); + json result = json::from_msgpack(data, data + len); + return result; +} + +/** + * @brief 消息编码 + * @tparam T 消息体类型 + * @param msg 消息 + * @param in 编码输入 + * @exception NodeNngError nng 操作失败 + * @exception yas::io_exception yas 序列化异常 + */ +inline void encodeMsg(nng_msg *msg, const json &in) { + HKU_ASSERT(msg != nullptr); + nng_msg_clear(msg); + + std::vector v = json::to_msgpack(in); + int rv = nng_msg_append(msg, v.data(), v.size()); + NODE_NNG_CHECK(rv, "Failed nng_msg_append!"); +} + +/** + * 构造错误消息响应 + * @param msg[out] 消息 + * @param errcode 错误码 + * @param errmsg 错误消息 + */ +inline void errorMsg(nng_msg *msg, NodeErrorCode errcode, const std::string &errmsg) { + json res; + res["ret"] = errcode; + res["msg"] = errmsg; + encodeMsg(msg, res); +} + +} // namespace hku diff --git a/hikyuu/utilities/node/NodeServer.h b/hikyuu/utilities/node/NodeServer.h new file mode 100644 index 0000000..63d6645 --- /dev/null +++ b/hikyuu/utilities/node/NodeServer.h @@ -0,0 +1,249 @@ +/* + * Copyright (c) 2022 hikyuu.org + * + * Created on: 2022-04-15 + * Author: fasiondog + */ + +#pragma once + +#include "hikyuu/utilities/config.h" +#if !HKU_ENABLE_NODE +#error "Don't enable node server, please config with --node=y" +#endif + +#include +#include +#include +#include + +#include +#if HKU_OS_WINDOWS +#ifndef NOMINMAX +#define NOMINMAX +#endif +#include +#include +#else +#include +#endif + +#include "hikyuu/utilities/Log.h" +#include "NodeMessage.h" + +namespace hku { + +class NodeServer { + CLASS_LOGGER_IMP(NodeServer) + +private: + static constexpr size_t PARALLEL = 128; + +public: + NodeServer() = default; + explicit NodeServer(const std::string& addr) : m_addr(addr) {} + virtual ~NodeServer() { + stop(); + } + + void setAddr(const std::string& addr) { + m_addr = addr; + } + + void regHandle(const std::string& cmd, const std::function& handle) { + m_handles[cmd] = handle; + } + + void regHandle(const std::string& cmd, std::function&& handle) { + m_handles[cmd] = std::move(handle); + } + + void start() { + CLS_CHECK(!m_addr.empty(), "You must set NodeServer's addr first!"); + + // 启动 node server + int rv = nng_rep0_open(&m_socket); + CLS_CHECK(0 == rv, "Failed open server socket! {}", nng_strerror(rv)); + rv = nng_listen(m_socket, m_addr.c_str(), &m_listener, 0); + CLS_CHECK(0 == rv, "Failed listen node server socket ({})! {}", m_addr, nng_strerror(rv)); + CLS_TRACE("channel lisenter server: {}", m_addr); + + m_works.resize(PARALLEL); + for (size_t i = 0, total = m_works.size(); i < total; i++) { + Work* w = &m_works[i]; + rv = nng_aio_alloc(&w->aio, _serverCallback, w); + CLS_CHECK(0 == rv, "Failed create work {}! {}", i, nng_strerror(rv)); + rv = nng_ctx_open(&w->ctx, m_socket); + CLS_CHECK(0 == rv, "Failed open ctx {}! {}", i, nng_strerror(rv)); + w->state = Work::INIT; + w->server = this; + } + + for (size_t i = 0, total = m_works.size(); i < total; i++) { + _serverCallback(&m_works[i]); + } + } + + void loop() { + while (true) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + } + + void stop() { + HKU_IF_RETURN(m_works.empty(), void()); + for (size_t i = 0, total = m_works.size(); i < total; i++) { + Work* w = &m_works[i]; + w->server = nullptr; + w->state = Work::FINISH; + if (w->aio) { + nng_aio_stop(w->aio); + nng_aio_free(w->aio); + nng_ctx_close(w->ctx); + w->aio = nullptr; + } + } + + // 关闭 socket 服务节点 + nng_listener_close(m_listener); + nng_close(m_socket); + m_works.clear(); + CLS_INFO("stopped node server."); + } + +private: + struct Work { + enum { INIT, RECV, SEND, FINISH } state = INIT; + nng_aio* aio{nullptr}; + nng_ctx ctx; + NodeServer* server{nullptr}; + }; + + static void _serverCallback(void* arg) { + Work* work = static_cast(arg); + int rv = 0; + switch (work->state) { + case Work::INIT: + work->state = Work::RECV; + nng_ctx_recv(work->ctx, work->aio); + break; + + case Work::RECV: + _processRequest(work); + break; + + case Work::SEND: + if ((rv = nng_aio_result(work->aio)) != 0) { + CLS_FATAL("Failed nng_ctx_send! {}", nng_strerror(rv)); + work->state = Work::FINISH; + return; + } + work->state = Work::RECV; + nng_ctx_recv(work->ctx, work->aio); + break; + + case Work::FINISH: + break; + + default: + CLS_FATAL("nng bad state!"); + break; + } + } + + static void _processRequest(Work* work) { + NodeServer* server = work->server; + CLS_IF_RETURN(!server || !work->aio, void()); + nng_msg* msg = nullptr; + json res; + + try { + int rv = nng_aio_result(work->aio); + HKU_CHECK(rv == 0, "Failed nng_aio_result!"); + + msg = nng_aio_get_msg(work->aio); + json req = decodeMsg(msg); + NODE_CHECK(req.contains("cmd"), NodeErrorCode::MISSING_CMD, "Missing command!"); + + // 兼容老版本数字cmd + std::string cmd = req["cmd"].is_number() ? fmt::format("{}", req["cmd"].get()) + : req["cmd"].get(); + auto iter = server->m_handles.find(cmd); + NODE_CHECK(iter != server->m_handles.end(), NodeErrorCode::INVALID_CMD, + "The server does not know how to process the message: {}", cmd); + + // tcp 连接尝试获取客户端地址和端口加入 req 中 + req["remote_host"] = ""; + req["remote_port"] = 0; + nng_pipe p = nng_msg_get_pipe(msg); + if (nng_pipe_id(p) > 0) { + uint16_t port = 0; + nng_sockaddr ra; + rv = nng_pipe_get_addr(p, NNG_OPT_REMADDR, &ra); + if (rv == 0) { + if (ra.s_family == NNG_AF_INET) { + char ipAddr[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, (void*)&ra.s_in.sa_addr, ipAddr, INET_ADDRSTRLEN); + port = ntohs(ra.s_in.sa_port); + req["remote_host"] = ipAddr; + req["remote_port"] = port; + } else if (ra.s_family == NNG_AF_INET6) { + char ipAddr[INET6_ADDRSTRLEN]; + inet_ntop(AF_INET6, (void*)&ra.s_in.sa_addr, ipAddr, INET6_ADDRSTRLEN); + port = ntohs(ra.s_in6.sa_port); + req["remote_host"] = ipAddr; + req["remote_port"] = port; + } + } + } + + res = iter->second(std::move(req)); + res["ret"] = NodeErrorCode::SUCCESS; + encodeMsg(msg, res); + + nng_aio_set_msg(work->aio, msg); + work->state = Work::SEND; + nng_ctx_send(work->ctx, work->aio); + + } catch (const NodeNngError& e) { + CLS_FATAL(e.what()); + work->state = Work::FINISH; + + } catch (const NodeError& e) { + CLS_ERROR(e.what()); + res["ret"] = e.errcode(); + res["msg"] = e.what(); + encodeMsg(msg, res); + nng_aio_set_msg(work->aio, msg); + work->state = Work::SEND; + nng_ctx_send(work->ctx, work->aio); + + } catch (const std::exception& e) { + CLS_ERROR(e.what()); + res["ret"] = NodeErrorCode::UNKNOWN_ERROR; + res["msg"] = e.what(); + encodeMsg(msg, res); + nng_aio_set_msg(work->aio, msg); + work->state = Work::SEND; + nng_ctx_send(work->ctx, work->aio); + + } catch (...) { + std::string errmsg = "Unknown error!"; + CLS_ERROR(errmsg); + res["ret"] = NodeErrorCode::UNKNOWN_ERROR; + res["msg"] = errmsg; + nng_aio_set_msg(work->aio, msg); + work->state = Work::SEND; + nng_ctx_send(work->ctx, work->aio); + } + } + +private: + std::string m_addr; + nng_socket m_socket; + nng_listener m_listener; + std::vector m_works; + std::unordered_map> m_handles; +}; + +} // namespace hku \ No newline at end of file diff --git a/release.md b/release.md index 018a067..832f522 100644 --- a/release.md +++ b/release.md @@ -1,10 +1,13 @@ # 版本发布说明 +## 1.0.2 - 2024年7月31日 + +增加基于 nng 的简单请求响应服务及客户端(NodeServer, NodeClient) + ## 1.0.1 - 2024年7月29日 add http_client - ## 1.0.0 - 2024年7月10日 初始版本 diff --git a/test/utilities/http_client/test_HttpClient.cpp b/test/utilities/http_client/test_HttpClient.cpp index 3d0e7e4..95f7be8 100644 --- a/test/utilities/http_client/test_HttpClient.cpp +++ b/test/utilities/http_client/test_HttpClient.cpp @@ -46,6 +46,20 @@ TEST_CASE("test_HttpClient") { auto ip = jres["origin"].get(); HKU_INFO("ip: {}", ip); + // for (size_t i = 0; i < 15; i++) { + // res = cli.get("/ip"); + // jres = res.json(); + // ip = jres["origin"].get(); + // HKU_INFO("ip: {}", ip); + // } + + // HKU_INFO("wait"); + // std::this_thread::sleep_for(std::chrono::seconds(60)); + // res = cli.get("/ip"); + // jres = res.json(); + // ip = jres["origin"].get(); + // HKU_INFO("ip: {}", ip); + #if HKU_ENABLE_HTTP_CLIENT_SSL // 访问 https, 无 CA file cli.setUrl("https://httpbin.org/"); @@ -56,4 +70,10 @@ TEST_CASE("test_HttpClient") { res = cli.get("/ip"); HKU_INFO("{} {}", cli.url(), res.status()); #endif + + cli.setUrl("http://api.vore.top"); + res = cli.get("/api/IPdata", {{"ip", "10.0.0.1"}}, HttpHeaders()); + auto data = res.json(); + HKU_INFO("{}", data.dump()); + CHECK_EQ(data["ipinfo"]["text"].get(), "10.0.0.1"); } \ No newline at end of file diff --git a/test/utilities/node/test_node.cpp b/test/utilities/node/test_node.cpp new file mode 100644 index 0000000..05e162e --- /dev/null +++ b/test/utilities/node/test_node.cpp @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2024 hikyuu.org + * + * Created on: 2024-07-30 + * Author: fasiondog + */ + +#include "test_config.h" +#include +#include + +using namespace hku; + +TEST_CASE("test_node") { + std::string server_addr = "inproc://tmp"; + NodeServer server; + server.setAddr(server_addr); + server.regHandle("hello", [](json&& req) { + json res; + HKU_INFO("Hello world!"); + return res; + }); + + server.start(); + + auto t = std::thread([server_addr]() { + NodeClient cli(server_addr); + cli.dial(); + + json req, res; + req["cmd"] = "hello"; + cli.post(req, res); + CHECK_EQ(res["ret"].get(), NodeErrorCode::SUCCESS); + }); + t.join(); +} \ No newline at end of file diff --git a/test/xmake.lua b/test/xmake.lua index b9eac73..58c66ea 100644 --- a/test/xmake.lua +++ b/test/xmake.lua @@ -17,8 +17,12 @@ target("unit-test") add_packages("sqlite3") end - if has_config("http_client") then - add_packages("nng", "nlohmann_json", "gzip-hpp") + if has_config("http_client") or has_config("node") then + add_packages("nng", "nlohmann_json") + end + + if has_config("http_client_zip") then + add_packages("gzip-hpp") end add_includedirs("..", ".") @@ -34,7 +38,7 @@ target("unit-test") add_files("*.cpp") add_files("utilities/*.cpp") - + if get_config("ini_parser") then add_files("utilities/ini_parser/*.cpp") end @@ -58,6 +62,10 @@ target("unit-test") add_files("utilities/http_client/**.cpp") end + if has_config("node") then + add_files("utilities/node/*.cpp") + end + before_build(function(target) -- 未指定 C++标准时,设置最低要求 c++11 local x = target:get("languages") diff --git a/xmake.lua b/xmake.lua index 91f8c46..407d46a 100644 --- a/xmake.lua +++ b/xmake.lua @@ -1,5 +1,5 @@ -- config version -set_version("1.0.1", {build="%Y%m%d%H%M"}) --使用 build 参数将导致每次重编译 +set_version("1.0.2", {build="%Y%m%d%H%M"}) --使用 build 参数将导致每次重编译 -- set warning all as error set_warnings("all", "error") @@ -57,6 +57,7 @@ option("mo", {description = "International language support", default = false}) option("http_client", {description = "use http client", default = true}) option("http_client_ssl", {description = "enable https support for http client", default = false}) option("http_client_zip", {description = "enable http support gzip", default = false}) +option("node", {description = "enable node reqrep server/client", default = true}) if has_config("leak_check") then -- 需要 export LD_PRELOAD=libasan.so @@ -146,7 +147,7 @@ if has_config("mysql") then end end -if has_config("http_client") then +if has_config("http_client") or has_config("node") then add_requires("nlohmann_json") if is_kind("shared") then add_requires("nng", {configs = {NNG_ENABLE_TLS = has_config("http_client_ssl"), cxflags = "-fPIC"}}) @@ -181,6 +182,7 @@ target("hku_utils") set_configvar("HKU_ENABLE_HTTP_CLIENT", has_config("http_client") and 1 or 0) set_configvar("HKU_ENABLE_HTTP_CLIENT_SSL", has_config("http_client_ssl") and 1 or 0) set_configvar("HKU_ENABLE_HTTP_CLIENT_ZIP", has_config("http_client_zip") and 1 or 0) + set_configvar("HKU_ENABLE_NODE", has_config("node") and 1 or 0) set_configvar("HKU_DEFAULT_LOG_NAME", get_config("log_name")) set_configvar("HKU_USE_SPDLOG_ASYNC_LOGGER", has_config("async_log") and 1 or 0) @@ -224,8 +226,12 @@ target("hku_utils") add_packages("mysql") end - if has_config("http_client") then - add_packages("nng", "nlohmann_json", "gzip-hpp") + if has_config("http_client") or has_config("node") then + add_packages("nng", "nlohmann_json") + end + + if has_config("http_client_zip") then + add_packages("gzip-hpp") end if is_plat("linux", "cross") then @@ -320,7 +326,7 @@ target("hku_utils") after_install(function(target) local destpath = target:installdir() import("core.project.task") - task.run("copy_dependents", {}, target, destpath, true) + task.run("copy_dependents", {}, target, destpath, false) end) target_end()