From 6ac98c3e0d3f9ebec3faa8d199162a1031c6f190 Mon Sep 17 00:00:00 2001 From: Toyo Li Date: Thu, 8 Aug 2024 17:14:07 +0800 Subject: [PATCH] update libuv source (#121) --- packages/emnapi/CMakeLists.txt | 16 +- packages/emnapi/emnapi.gyp | 3 + packages/emnapi/include/node/uv.h | 55 ++++++- packages/emnapi/include/node/uv/threadpool.h | 2 +- packages/emnapi/include/node/uv/unix.h | 4 + packages/emnapi/src/node_api.c | 4 +- packages/emnapi/src/threadsafe_function.c | 37 +++-- packages/emnapi/src/uv/queue.h | 154 ++++++++----------- packages/emnapi/src/uv/threadpool.c | 96 +++++++----- packages/emnapi/src/uv/unix/async.c | 131 ++++++++-------- packages/emnapi/src/uv/unix/core.c | 37 ++++- packages/emnapi/src/uv/unix/internal.h | 54 +++++++ packages/emnapi/src/uv/unix/loop.c | 44 +++++- packages/emnapi/src/uv/unix/posix-hrtime.c | 40 +++++ packages/emnapi/src/uv/uv-common.c | 130 +++++++++++++++- packages/emnapi/src/uv/uv-common.h | 146 +++++++++++++++++- 16 files changed, 720 insertions(+), 233 deletions(-) create mode 100644 packages/emnapi/src/uv/unix/internal.h create mode 100644 packages/emnapi/src/uv/unix/posix-hrtime.c diff --git a/packages/emnapi/CMakeLists.txt b/packages/emnapi/CMakeLists.txt index debfa133..4a680675 100644 --- a/packages/emnapi/CMakeLists.txt +++ b/packages/emnapi/CMakeLists.txt @@ -33,6 +33,7 @@ set(UV_SRC "${CMAKE_CURRENT_SOURCE_DIR}/src/uv/uv-common.c" "${CMAKE_CURRENT_SOURCE_DIR}/src/uv/threadpool.c" "${CMAKE_CURRENT_SOURCE_DIR}/src/uv/unix/loop.c" + "${CMAKE_CURRENT_SOURCE_DIR}/src/uv/unix/posix-hrtime.c" "${CMAKE_CURRENT_SOURCE_DIR}/src/uv/unix/thread.c" "${CMAKE_CURRENT_SOURCE_DIR}/src/uv/unix/async.c" "${CMAKE_CURRENT_SOURCE_DIR}/src/uv/unix/core.c" @@ -139,7 +140,10 @@ endif() add_library(${EMNAPI_BASIC_TARGET_NAME} STATIC ${ENAPI_BASIC_SRC}) target_include_directories(${EMNAPI_BASIC_TARGET_NAME} PUBLIC ${EMNAPI_INCLUDE}) -target_compile_definitions(${EMNAPI_BASIC_TARGET_NAME} PUBLIC ${EMNAPI_DEFINES}) +target_compile_definitions(${EMNAPI_BASIC_TARGET_NAME} + PUBLIC ${EMNAPI_DEFINES} + PRIVATE "EMNAPI_DISABLE_UV" +) if(IS_EMSCRIPTEN) set_target_properties(${EMNAPI_BASIC_TARGET_NAME} PROPERTIES INTERFACE_LINK_DEPENDS ${EMNAPI_JS_LIB}) target_link_options(${EMNAPI_BASIC_TARGET_NAME} INTERFACE "--js-library=${EMNAPI_JS_LIB}") @@ -159,7 +163,10 @@ if(EMNAPI_BUILD_BASIC_MT) ) target_compile_options(${EMNAPI_BASIC_MT_TARGET_NAME} PUBLIC "-matomics" "-mbulk-memory") target_include_directories(${EMNAPI_BASIC_MT_TARGET_NAME} PUBLIC ${EMNAPI_INCLUDE}) - target_compile_definitions(${EMNAPI_BASIC_MT_TARGET_NAME} PUBLIC ${EMNAPI_DEFINES}) + target_compile_definitions(${EMNAPI_BASIC_MT_TARGET_NAME} + PUBLIC ${EMNAPI_DEFINES} + PRIVATE "EMNAPI_DISABLE_UV" + ) if(IS_EMSCRIPTEN) set_target_properties(${EMNAPI_BASIC_MT_TARGET_NAME} PROPERTIES INTERFACE_LINK_DEPENDS ${EMNAPI_JS_LIB}) @@ -175,7 +182,10 @@ endif() if(EMNAPI_BUILD_MT) add_library(${EMNAPI_MT_TARGET_NAME} STATIC ${EMNAPI_SRC} ${UV_SRC}) - target_compile_options(${EMNAPI_MT_TARGET_NAME} PRIVATE ${EMNAPI_MT_CFLAGS}) + target_compile_options(${EMNAPI_MT_TARGET_NAME} + PUBLIC "-matomics" "-mbulk-memory" + PRIVATE ${EMNAPI_MT_CFLAGS} + ) target_include_directories(${EMNAPI_MT_TARGET_NAME} PUBLIC ${EMNAPI_INCLUDE}) target_compile_definitions(${EMNAPI_MT_TARGET_NAME} PUBLIC ${EMNAPI_DEFINES}) if(IS_EMSCRIPTEN) diff --git a/packages/emnapi/emnapi.gyp b/packages/emnapi/emnapi.gyp index 53089fed..d9db62f8 100644 --- a/packages/emnapi/emnapi.gyp +++ b/packages/emnapi/emnapi.gyp @@ -47,6 +47,9 @@ { 'target_name': 'emnapi_basic', 'type': 'static_library', + 'defines': [ + 'EMNAPI_DISABLE_UV' + ], 'sources': [ 'src/js_native_api.c', 'src/node_api.c', diff --git a/packages/emnapi/include/node/uv.h b/packages/emnapi/include/node/uv.h index 1d0bb637..3be4c4ff 100644 --- a/packages/emnapi/include/node/uv.h +++ b/packages/emnapi/include/node/uv.h @@ -3,13 +3,21 @@ #if defined(__EMSCRIPTEN_PTHREADS__) || defined(_REENTRANT) -#include -#include "uv/unix.h" - #ifdef __cplusplus extern "C" { #endif +#include +#include + +/* Internal type, do not use. */ +struct uv__queue { + struct uv__queue* next; + struct uv__queue* prev; +}; + +#include "uv/unix.h" + #define UV_EXTERN /* nothing */ typedef enum { @@ -29,6 +37,8 @@ typedef struct uv_work_s uv_work_t; typedef struct uv_handle_s uv_handle_t; typedef struct uv_async_s uv_async_t; +typedef struct uv_metrics_s uv_metrics_t; + typedef void (*uv_work_cb)(uv_work_t* req); typedef void (*uv_after_work_cb)(uv_work_t* req, int status); @@ -40,10 +50,22 @@ struct uv_req_s { UV_REQ_FIELDS }; +typedef void* (*uv_malloc_func)(size_t size); +typedef void* (*uv_realloc_func)(void* ptr, size_t size); +typedef void* (*uv_calloc_func)(size_t count, size_t size); +typedef void (*uv_free_func)(void* ptr); + UV_EXTERN void uv_library_shutdown(void); + +UV_EXTERN int uv_replace_allocator(uv_malloc_func malloc_func, + uv_realloc_func realloc_func, + uv_calloc_func calloc_func, + uv_free_func free_func); + UV_EXTERN uv_loop_t* uv_default_loop(void); UV_EXTERN int uv_loop_init(uv_loop_t* loop); UV_EXTERN int uv_loop_close(uv_loop_t* loop); +UV_EXTERN uint64_t uv_hrtime(void); UV_EXTERN void uv_sleep(unsigned int msec); UV_EXTERN int uv_sem_init(uv_sem_t* sem, unsigned int value); @@ -111,6 +133,12 @@ typedef void (*uv_async_cb)(uv_async_t* handle); uv_loop_t* loop; \ uv_handle_type type; \ uv_close_cb close_cb; \ + struct uv__queue handle_queue; \ + union { \ + int fd; \ + void* reserved[4]; \ + } u; \ + UV_HANDLE_PRIVATE_FIELDS \ struct uv_handle_s { UV_HANDLE_FIELDS @@ -119,7 +147,7 @@ struct uv_handle_s { struct uv_async_s { UV_HANDLE_FIELDS uv_async_cb async_cb; - void* queue[2]; + struct uv__queue queue; int pending; }; @@ -129,18 +157,33 @@ UV_EXTERN int uv_async_init(uv_loop_t*, UV_EXTERN int uv_async_send(uv_async_t* async); UV_EXTERN void uv_close(uv_handle_t* handle, uv_close_cb close_cb); +UV_EXTERN int uv_is_closing(const uv_handle_t* handle); + +struct uv_metrics_s { + uint64_t loop_count; + uint64_t events; + uint64_t events_waiting; + /* private */ + uint64_t* reserved[13]; +}; + +UV_EXTERN int uv_metrics_info(uv_loop_t* loop, uv_metrics_t* metrics); +UV_EXTERN uint64_t uv_metrics_idle_time(uv_loop_t* loop); struct uv_loop_s { void* data; + unsigned int active_handles; + struct uv__queue handle_queue; union { void* unused; unsigned int count; } active_reqs; - void* wq[2]; + void* internal_fields; + struct uv__queue wq; uv_mutex_t wq_mutex; uv_async_t wq_async; - void* async_handles[2]; + struct uv__queue async_handles; void* em_queue; }; diff --git a/packages/emnapi/include/node/uv/threadpool.h b/packages/emnapi/include/node/uv/threadpool.h index 2c738478..6a65356f 100644 --- a/packages/emnapi/include/node/uv/threadpool.h +++ b/packages/emnapi/include/node/uv/threadpool.h @@ -33,7 +33,7 @@ struct uv__work { void (*work)(struct uv__work *w); void (*done)(struct uv__work *w, int status); struct uv_loop_s* loop; - void* wq[2]; + struct uv__queue wq; }; #endif diff --git a/packages/emnapi/include/node/uv/unix.h b/packages/emnapi/include/node/uv/unix.h index 357463a0..8e3a3dcc 100644 --- a/packages/emnapi/include/node/uv/unix.h +++ b/packages/emnapi/include/node/uv/unix.h @@ -18,4 +18,8 @@ typedef pthread_cond_t uv_cond_t; #endif +#define UV_HANDLE_PRIVATE_FIELDS \ + uv_handle_t* next_closing; \ + unsigned int flags; \ + #endif /* UV_UNIX_H */ diff --git a/packages/emnapi/src/node_api.c b/packages/emnapi/src/node_api.c index 69216ab8..56a064e1 100644 --- a/packages/emnapi/src/node_api.c +++ b/packages/emnapi/src/node_api.c @@ -1,7 +1,7 @@ #include "node_api.h" #include "emnapi_internal.h" -#if EMNAPI_HAVE_THREADS +#if EMNAPI_HAVE_THREADS && !defined(EMNAPI_DISABLE_UV) #include "uv.h" #endif @@ -31,7 +31,7 @@ napi_get_node_version(node_api_basic_env env, napi_status napi_get_uv_event_loop(node_api_basic_env env, struct uv_loop_s** loop) { -#if EMNAPI_HAVE_THREADS +#if EMNAPI_HAVE_THREADS && !defined(EMNAPI_DISABLE_UV) CHECK_ENV(env); CHECK_ARG(env, loop); // Though this is fake libuv loop diff --git a/packages/emnapi/src/threadsafe_function.c b/packages/emnapi/src/threadsafe_function.c index 3637d0d3..f2364d4b 100644 --- a/packages/emnapi/src/threadsafe_function.c +++ b/packages/emnapi/src/threadsafe_function.c @@ -5,9 +5,8 @@ #include #include #include -#include "uv/queue.h" - #include "uv.h" +#include "uv/queue.h" EXTERN_C_START @@ -21,7 +20,7 @@ static const unsigned int kMaxIterationCount = 1000; struct data_queue_node { void* data; - void* q[2]; + struct uv__queue q; }; struct napi_threadsafe_function__ { @@ -30,7 +29,7 @@ struct napi_threadsafe_function__ { pthread_mutex_t mutex; pthread_cond_t* cond; size_t queue_size; - void* queue[2]; + struct uv__queue queue; uv_async_t async; size_t thread_count; bool is_closing; @@ -92,7 +91,7 @@ _emnapi_tsfn_create(napi_env env, pthread_mutex_init(&ts_fn->mutex, NULL); ts_fn->cond = NULL; ts_fn->queue_size = 0; - QUEUE_INIT(&ts_fn->queue); + uv__queue_init(&ts_fn->queue); ts_fn->thread_count = initial_thread_count; ts_fn->is_closing = false; ts_fn->dispatch_state = kDispatchIdle; @@ -125,13 +124,13 @@ static void _emnapi_tsfn_destroy(napi_threadsafe_function func) { func->cond = NULL; } - QUEUE* tmp; + struct uv__queue* tmp; struct data_queue_node* node; - QUEUE_FOREACH(tmp, &func->queue) { - node = QUEUE_DATA(tmp, struct data_queue_node, q); + uv__queue_foreach(tmp, &func->queue) { + node = uv__queue_data(tmp, struct data_queue_node, q); free(node); } - QUEUE_INIT(&func->queue); + uv__queue_init(&func->queue); if (func->ref != NULL) { EMNAPI_ASSERT_CALL(napi_delete_reference(func->env, func->ref)); @@ -187,14 +186,14 @@ static napi_status _emnapi_tsfn_init(napi_threadsafe_function func) { } static void _emnapi_tsfn_empty_queue_and_delete(napi_threadsafe_function func) { - while (!QUEUE_EMPTY(&func->queue)) { - QUEUE* q = QUEUE_HEAD(&func->queue); - struct data_queue_node* node = QUEUE_DATA(q, struct data_queue_node, q); + while (!uv__queue_empty(&func->queue)) { + struct uv__queue* q = uv__queue_head(&func->queue); + struct data_queue_node* node = uv__queue_data(q, struct data_queue_node, q); func->call_js_cb(NULL, NULL, func->context, node->data); - QUEUE_REMOVE(q); - QUEUE_INIT(q); + uv__queue_remove(q); + uv__queue_init(q); func->queue_size--; free(node); } @@ -295,10 +294,10 @@ static bool _emnapi_tsfn_dispatch_one(napi_threadsafe_function func) { } else { size_t size = func->queue_size; if (size > 0) { - QUEUE* q = QUEUE_HEAD(&func->queue); - struct data_queue_node* node = QUEUE_DATA(q, struct data_queue_node, q); - QUEUE_REMOVE(q); - QUEUE_INIT(q); + struct uv__queue* q = uv__queue_head(&func->queue); + struct data_queue_node* node = uv__queue_data(q, struct data_queue_node, q); + uv__queue_remove(q); + uv__queue_init(q); func->queue_size--; data = node->data; free(node); @@ -519,7 +518,7 @@ napi_call_threadsafe_function(napi_threadsafe_function func, return napi_generic_failure; } queue_node->data = data; - QUEUE_INSERT_TAIL(&func->queue, &queue_node->q); + uv__queue_insert_tail(&func->queue, &queue_node->q); func->queue_size++; _emnapi_tsfn_send(func); pthread_mutex_unlock(&func->mutex); diff --git a/packages/emnapi/src/uv/queue.h b/packages/emnapi/src/uv/queue.h index ff3540a0..5f8489e9 100644 --- a/packages/emnapi/src/uv/queue.h +++ b/packages/emnapi/src/uv/queue.h @@ -18,91 +18,73 @@ #include -typedef void *QUEUE[2]; - -/* Private macros. */ -#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0])) -#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1])) -#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) -#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) - -/* Public macros. */ -#define QUEUE_DATA(ptr, type, field) \ - ((type *) ((char *) (ptr) - offsetof(type, field))) - -/* Important note: mutating the list while QUEUE_FOREACH is - * iterating over its elements results in undefined behavior. - */ -#define QUEUE_FOREACH(q, h) \ - for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q)) - -#define QUEUE_EMPTY(q) \ - ((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q)) - -#define QUEUE_HEAD(q) \ - (QUEUE_NEXT(q)) - -#define QUEUE_INIT(q) \ - do { \ - QUEUE_NEXT(q) = (q); \ - QUEUE_PREV(q) = (q); \ - } \ - while (0) - -#define QUEUE_ADD(h, n) \ - do { \ - QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \ - QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \ - QUEUE_PREV(h) = QUEUE_PREV(n); \ - QUEUE_PREV_NEXT(h) = (h); \ - } \ - while (0) - -#define QUEUE_SPLIT(h, q, n) \ - do { \ - QUEUE_PREV(n) = QUEUE_PREV(h); \ - QUEUE_PREV_NEXT(n) = (n); \ - QUEUE_NEXT(n) = (q); \ - QUEUE_PREV(h) = QUEUE_PREV(q); \ - QUEUE_PREV_NEXT(h) = (h); \ - QUEUE_PREV(q) = (n); \ - } \ - while (0) - -#define QUEUE_MOVE(h, n) \ - do { \ - if (QUEUE_EMPTY(h)) \ - QUEUE_INIT(n); \ - else { \ - QUEUE* q = QUEUE_HEAD(h); \ - QUEUE_SPLIT(h, q, n); \ - } \ - } \ - while (0) - -#define QUEUE_INSERT_HEAD(h, q) \ - do { \ - QUEUE_NEXT(q) = QUEUE_NEXT(h); \ - QUEUE_PREV(q) = (h); \ - QUEUE_NEXT_PREV(q) = (q); \ - QUEUE_NEXT(h) = (q); \ - } \ - while (0) - -#define QUEUE_INSERT_TAIL(h, q) \ - do { \ - QUEUE_NEXT(q) = (h); \ - QUEUE_PREV(q) = QUEUE_PREV(h); \ - QUEUE_PREV_NEXT(q) = (q); \ - QUEUE_PREV(h) = (q); \ - } \ - while (0) - -#define QUEUE_REMOVE(q) \ - do { \ - QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \ - QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \ - } \ - while (0) +#define uv__queue_data(pointer, type, field) \ + ((type*) ((char*) (pointer) - offsetof(type, field))) + +#define uv__queue_foreach(q, h) \ + for ((q) = (h)->next; (q) != (h); (q) = (q)->next) + +static inline void uv__queue_init(struct uv__queue* q) { + q->next = q; + q->prev = q; +} + +static inline int uv__queue_empty(const struct uv__queue* q) { + return q == q->next; +} + +static inline struct uv__queue* uv__queue_head(const struct uv__queue* q) { + return q->next; +} + +static inline struct uv__queue* uv__queue_next(const struct uv__queue* q) { + return q->next; +} + +static inline void uv__queue_add(struct uv__queue* h, struct uv__queue* n) { + h->prev->next = n->next; + n->next->prev = h->prev; + h->prev = n->prev; + h->prev->next = h; +} + +static inline void uv__queue_split(struct uv__queue* h, + struct uv__queue* q, + struct uv__queue* n) { + n->prev = h->prev; + n->prev->next = n; + n->next = q; + h->prev = q->prev; + h->prev->next = h; + q->prev = n; +} + +static inline void uv__queue_move(struct uv__queue* h, struct uv__queue* n) { + if (uv__queue_empty(h)) + uv__queue_init(n); + else + uv__queue_split(h, h->next, n); +} + +static inline void uv__queue_insert_head(struct uv__queue* h, + struct uv__queue* q) { + q->next = h->next; + q->prev = h; + q->next->prev = q; + h->next = q; +} + +static inline void uv__queue_insert_tail(struct uv__queue* h, + struct uv__queue* q) { + q->next = h; + q->prev = h->prev; + q->prev->next = q; + h->prev = q; +} + +static inline void uv__queue_remove(struct uv__queue* q) { + q->prev->next = q->next; + q->next->prev = q->prev; +} #endif /* QUEUE_H_ */ diff --git a/packages/emnapi/src/uv/threadpool.c b/packages/emnapi/src/uv/threadpool.c index 6d7d3b2f..37548b18 100644 --- a/packages/emnapi/src/uv/threadpool.c +++ b/packages/emnapi/src/uv/threadpool.c @@ -19,7 +19,7 @@ * IN THE SOFTWARE. */ -// from libuv 1.43.0 +// from libuv 1.48.0 #if defined(__EMSCRIPTEN_PTHREADS__) || defined(_REENTRANT) @@ -43,10 +43,10 @@ static unsigned int slow_io_work_running; static unsigned int nthreads; static uv_thread_t* threads; static uv_thread_t default_threads[4]; -static QUEUE exit_message; -static QUEUE wq; -static QUEUE run_slow_work_message; -static QUEUE slow_io_pending_wq; +static struct uv__queue exit_message; +static struct uv__queue wq; +static struct uv__queue run_slow_work_message; +static struct uv__queue slow_io_pending_wq; static unsigned int slow_work_thread_threshold(void) { return (nthreads + 1) / 2; @@ -60,8 +60,8 @@ EMNAPI_INTERNAL_EXTERN void _emnapi_worker_unref(uv_thread_t pid); #ifdef __EMNAPI_WASI_THREADS__ EMNAPI_INTERNAL_EXTERN -void _emnapi_after_uvthreadpool_ready(void (*callback)(QUEUE* w, enum uv__work_kind kind), - QUEUE* w, +void _emnapi_after_uvthreadpool_ready(void (*callback)(struct uv__queue* w, enum uv__work_kind kind), + struct uv__queue* w, enum uv__work_kind kind); EMNAPI_INTERNAL_EXTERN void _emnapi_tell_js_uvthreadpool(uv_thread_t* threads, unsigned int n); EMNAPI_INTERNAL_EXTERN void _emnapi_emit_async_thread_ready(); @@ -72,7 +72,7 @@ EMNAPI_INTERNAL_EXTERN void _emnapi_emit_async_thread_ready(); */ static void* worker(void* arg) { struct uv__work* w; - QUEUE* q; + struct uv__queue* q; int is_slow_work; #ifndef __EMNAPI_WASI_THREADS__ uv_sem_post((uv_sem_t*) arg); @@ -87,49 +87,49 @@ static void* worker(void* arg) { /* Keep waiting while either no work is present or only slow I/O and we're at the threshold for that. */ - while (QUEUE_EMPTY(&wq) || - (QUEUE_HEAD(&wq) == &run_slow_work_message && - QUEUE_NEXT(&run_slow_work_message) == &wq && + while (uv__queue_empty(&wq) || + (uv__queue_head(&wq) == &run_slow_work_message && + uv__queue_next(&run_slow_work_message) == &wq && slow_io_work_running >= slow_work_thread_threshold())) { idle_threads += 1; uv_cond_wait(&cond, &mutex); idle_threads -= 1; } - q = QUEUE_HEAD(&wq); + q = uv__queue_head(&wq); if (q == &exit_message) { uv_cond_signal(&cond); uv_mutex_unlock(&mutex); break; } - QUEUE_REMOVE(q); - QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */ + uv__queue_remove(q); + uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */ is_slow_work = 0; if (q == &run_slow_work_message) { /* If we're at the slow I/O threshold, re-schedule until after all other work in the queue is done. */ if (slow_io_work_running >= slow_work_thread_threshold()) { - QUEUE_INSERT_TAIL(&wq, q); + uv__queue_insert_tail(&wq, q); continue; } /* If we encountered a request to run slow I/O work but there is none to run, that means it's cancelled => Start over. */ - if (QUEUE_EMPTY(&slow_io_pending_wq)) + if (uv__queue_empty(&slow_io_pending_wq)) continue; is_slow_work = 1; slow_io_work_running++; - q = QUEUE_HEAD(&slow_io_pending_wq); - QUEUE_REMOVE(q); - QUEUE_INIT(q); + q = uv__queue_head(&slow_io_pending_wq); + uv__queue_remove(q); + uv__queue_init(q); /* If there is more slow I/O work, schedule it to be run as well. */ - if (!QUEUE_EMPTY(&slow_io_pending_wq)) { - QUEUE_INSERT_TAIL(&wq, &run_slow_work_message); + if (!uv__queue_empty(&slow_io_pending_wq)) { + uv__queue_insert_tail(&wq, &run_slow_work_message); if (idle_threads > 0) uv_cond_signal(&cond); } @@ -137,13 +137,13 @@ static void* worker(void* arg) { uv_mutex_unlock(&mutex); - w = QUEUE_DATA(q, struct uv__work, wq); + w = uv__queue_data(q, struct uv__work, wq); w->work(w); uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ - QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); + uv__queue_insert_tail(&w->loop->wq, &w->wq); uv_async_send(&w->loop->wq_async); uv_mutex_unlock(&w->loop->wq_mutex); @@ -159,12 +159,12 @@ static void* worker(void* arg) { } -static void post(QUEUE* q, enum uv__work_kind kind) { +static void post(struct uv__queue* q, enum uv__work_kind kind) { uv_mutex_lock(&mutex); // if (kind == UV__WORK_SLOW_IO) { // /* Insert into a separate queue. */ - // QUEUE_INSERT_TAIL(&slow_io_pending_wq, q); - // if (!QUEUE_EMPTY(&run_slow_work_message)) { + // uv__queue_insert_tail(&slow_io_pending_wq, q); + // if (!uv__queue_empty(&run_slow_work_message)) { // /* Running slow I/O tasks is already scheduled => Nothing to do here. // The worker that runs said other task will schedule this one as well. */ // uv_mutex_unlock(&mutex); @@ -173,7 +173,7 @@ static void post(QUEUE* q, enum uv__work_kind kind) { // q = &run_slow_work_message; // } - QUEUE_INSERT_TAIL(&wq, q); + uv__queue_insert_tail(&wq, q); if (idle_threads > 0) uv_cond_signal(&cond); uv_mutex_unlock(&mutex); @@ -254,9 +254,9 @@ static void init_threads(void) { if (uv_mutex_init(&mutex)) abort(); - QUEUE_INIT(&wq); - QUEUE_INIT(&slow_io_pending_wq); - QUEUE_INIT(&run_slow_work_message); + uv__queue_init(&wq); + uv__queue_init(&slow_io_pending_wq); + uv__queue_init(&run_slow_work_message); #ifndef __EMNAPI_WASI_THREADS__ if (uv_sem_init(&sem, 0)) @@ -331,12 +331,13 @@ void uv__work_submit(uv_loop_t* loop, static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { int cancelled; + uv_once(&once, init_once); /* Ensure |mutex| is initialized. */ uv_mutex_lock(&mutex); uv_mutex_lock(&w->loop->wq_mutex); - cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL; + cancelled = !uv__queue_empty(&w->wq) && w->work != NULL; if (cancelled) - QUEUE_REMOVE(&w->wq); + uv__queue_remove(&w->wq); uv_mutex_unlock(&w->loop->wq_mutex); uv_mutex_unlock(&mutex); @@ -346,7 +347,7 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { w->work = uv__cancelled; uv_mutex_lock(&loop->wq_mutex); - QUEUE_INSERT_TAIL(&loop->wq, &w->wq); + uv__queue_insert_tail(&loop->wq, &w->wq); uv_async_send(&loop->wq_async); uv_mutex_unlock(&loop->wq_mutex); @@ -357,22 +358,39 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { void uv__work_done(uv_async_t* handle) { struct uv__work* w; uv_loop_t* loop; - QUEUE* q; - QUEUE wq; + struct uv__queue* q; + struct uv__queue wq; int err; + int nevents; loop = container_of(handle, uv_loop_t, wq_async); uv_mutex_lock(&loop->wq_mutex); - QUEUE_MOVE(&loop->wq, &wq); + uv__queue_move(&loop->wq, &wq); uv_mutex_unlock(&loop->wq_mutex); - while (!QUEUE_EMPTY(&wq)) { - q = QUEUE_HEAD(&wq); - QUEUE_REMOVE(q); + nevents = 0; + + while (!uv__queue_empty(&wq)) { + q = uv__queue_head(&wq); + uv__queue_remove(q); w = container_of(q, struct uv__work, wq); err = (w->work == uv__cancelled) ? ECANCELED : 0; w->done(w, err); + nevents++; + } + + /* This check accomplishes 2 things: + * 1. Even if the queue was empty, the call to uv__work_done() should count + * as an event. Which will have been added by the event loop when + * calling this callback. + * 2. Prevents accidental wrap around in case nevents == 0 events == 0. + */ + if (nevents > 1) { + /* Subtract 1 to counter the call to uv__work_done(). */ + uv__metrics_inc_events(loop, nevents - 1); + if (uv__get_internal_fields(loop)->current_timeout == 0) + uv__metrics_inc_events_waiting(loop, nevents - 1); } } diff --git a/packages/emnapi/src/uv/unix/async.c b/packages/emnapi/src/uv/unix/async.c index 238404d3..e3b59003 100644 --- a/packages/emnapi/src/uv/unix/async.c +++ b/packages/emnapi/src/uv/unix/async.c @@ -24,38 +24,14 @@ #if defined(__EMSCRIPTEN_PTHREADS__) || defined(_REENTRANT) +#include "uv.h" +#include "internal.h" + +#include #include #include -#include "../uv-common.h" #include "emnapi_common.h" -#if defined(__clang__) || \ - defined(__GNUC__) || \ - defined(__INTEL_COMPILER) -# define UV_UNUSED(declaration) __attribute__((unused)) declaration -#else -# define UV_UNUSED(declaration) declaration -#endif - -UV_UNUSED(static int cmpxchgi(int* ptr, int oldval, int newval)); - -UV_UNUSED(static int cmpxchgi(int* ptr, int oldval, int newval)) { - return __sync_val_compare_and_swap(ptr, oldval, newval); -} - -#ifndef EMNAPI_NEXTTICK_TYPE -#define EMNAPI_NEXTTICK_TYPE 0 -#endif -#if EMNAPI_NEXTTICK_TYPE == 0 -EMNAPI_INTERNAL_EXTERN void _emnapi_set_immediate(void (*callback)(void*), void* data); -#define NEXT_TICK(callback, data) _emnapi_set_immediate((callback), (data)) -#elif EMNAPI_NEXTTICK_TYPE == 1 -EMNAPI_INTERNAL_EXTERN void _emnapi_next_tick(void (*callback)(void*), void* data); -#define NEXT_TICK(callback, data) _emnapi_next_tick((callback), (data)) -#else -#error "Invalid EMNAPI_NEXTTICK_TYPE" -#endif - #if EMNAPI_USE_PROXYING #include #include @@ -88,36 +64,43 @@ void _emnapi_destroy_proxying_queue(uv_loop_t* loop) {} #endif +static void uv__async_send(uv_loop_t* loop); +static void uv__cpu_relax(void); + int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { - handle->loop = loop; - handle->type = UV_ASYNC; + uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC); handle->async_cb = async_cb; handle->pending = 0; - QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue); + handle->u.fd = 0; /* This will be used as a busy flag. */ + + uv__queue_insert_tail(&loop->async_handles, &handle->queue); + uv__handle_start(handle); return 0; } /* Only call this from the event loop thread. */ -static int uv__async_spin(uv_async_t* handle) { +static void uv__async_spin(uv_async_t* handle) { + _Atomic int* pending; + _Atomic int* busy; int i; - int rc; + + pending = (_Atomic int*) &handle->pending; + busy = (_Atomic int*) &handle->u.fd; + + /* Set the pending flag first, so no new events will be added by other + * threads after this function returns. */ + atomic_store(pending, 1); for (;;) { - /* 997 is not completely chosen at random. It's a prime number, acyclical - * by nature, and should therefore hopefully dampen sympathetic resonance. + /* 997 is not completely chosen at random. It's a prime number, acyclic by + * nature, and should therefore hopefully dampen sympathetic resonance. */ for (i = 0; i < 997; i++) { - /* rc=0 -- handle is not pending. - * rc=1 -- handle is pending, other thread is still working with it. - * rc=2 -- handle is pending, other thread is done. - */ - rc = cmpxchgi(&handle->pending, 2, 0); - - if (rc != 1) - return rc; + if (atomic_load(busy) == 0) + return; /* Other thread is busy with this handle, spin until it's done. */ - // cpu_relax(); + uv__cpu_relax(); } /* Yield the CPU. We may have preempted the other thread while it's @@ -129,20 +112,23 @@ static int uv__async_spin(uv_async_t* handle) { } static void uv__async_io(uv_loop_t* loop) { - QUEUE queue; - QUEUE* q; + struct uv__queue queue; + struct uv__queue* q; uv_async_t* h; + _Atomic int *pending; - QUEUE_MOVE(&loop->async_handles, &queue); - while (!QUEUE_EMPTY(&queue)) { - q = QUEUE_HEAD(&queue); - h = QUEUE_DATA(q, uv_async_t, queue); + uv__queue_move(&loop->async_handles, &queue); + while (!uv__queue_empty(&queue)) { + q = uv__queue_head(&queue); + h = uv__queue_data(q, uv_async_t, queue); - QUEUE_REMOVE(q); - QUEUE_INSERT_TAIL(&loop->async_handles, q); + uv__queue_remove(q); + uv__queue_insert_tail(&loop->async_handles, q); - if (0 == uv__async_spin(h)) - continue; /* Not pending. */ + /* Atomically fetch and clear pending flag */ + pending = (_Atomic int*) &h->pending; + if (atomic_exchange(pending, 0) == 0) + continue; if (h->async_cb == NULL) continue; @@ -195,28 +181,45 @@ static void uv__async_send(uv_loop_t* loop) { #define ACCESS_ONCE(type, var) (*(volatile type*) &(var)) int uv_async_send(uv_async_t* handle) { + _Atomic int* pending; + _Atomic int* busy; + + pending = (_Atomic int*) &handle->pending; + busy = (_Atomic int*) &handle->u.fd; + /* Do a cheap read first. */ - if (ACCESS_ONCE(int, handle->pending) != 0) + if (atomic_load_explicit(pending, memory_order_relaxed) != 0) return 0; - /* Tell the other thread we're busy with the handle. */ - if (cmpxchgi(&handle->pending, 0, 1) != 0) - return 0; + /* Set the loop to busy. */ + atomic_fetch_add(busy, 1); /* Wake up the other thread's event loop. */ - uv__async_send(handle->loop); + if (atomic_exchange(pending, 1) == 0) + uv__async_send(handle->loop); - /* Tell the other thread we're done. */ - if (cmpxchgi(&handle->pending, 1, 2) != 1) - abort(); + /* Set the loop to not-busy. */ + atomic_fetch_add(busy, -1); return 0; } void uv__async_close(uv_async_t* handle) { uv__async_spin(handle); - QUEUE_REMOVE(&handle->queue); - NEXT_TICK(((void (*)(void *))handle->close_cb), handle); + uv__queue_remove(&handle->queue); + uv__handle_stop(handle); +} + +static void uv__cpu_relax(void) { +#if defined(__i386__) || defined(__x86_64__) + __asm__ __volatile__ ("rep; nop" ::: "memory"); /* a.k.a. PAUSE */ +#elif (defined(__arm__) && __ARM_ARCH >= 7) || defined(__aarch64__) + __asm__ __volatile__ ("yield" ::: "memory"); +#elif (defined(__ppc__) || defined(__ppc64__)) && defined(__APPLE__) + __asm volatile ("" : : : "memory"); +#elif !defined(__APPLE__) && (defined(__powerpc64__) || defined(__ppc64__) || defined(__PPC64__)) + __asm__ __volatile__ ("or 1,1,1; or 2,2,2" ::: "memory"); +#endif } #endif diff --git a/packages/emnapi/src/uv/unix/core.c b/packages/emnapi/src/uv/unix/core.c index b9436d13..9c9bf4ee 100644 --- a/packages/emnapi/src/uv/unix/core.c +++ b/packages/emnapi/src/uv/unix/core.c @@ -1,10 +1,18 @@ #if defined(__EMSCRIPTEN_PTHREADS__) || defined(_REENTRANT) +#include "uv.h" +#include "internal.h" #include #include -#include "../uv-common.h" + +uint64_t uv_hrtime(void) { + return uv__hrtime(UV_CLOCK_PRECISE); +} void uv_close(uv_handle_t* handle, uv_close_cb close_cb) { + assert(!uv__is_closing(handle)); + + handle->flags |= UV_HANDLE_CLOSING; handle->close_cb = close_cb; switch (handle->type) { @@ -14,6 +22,33 @@ void uv_close(uv_handle_t* handle, uv_close_cb close_cb) { default: assert(0); } + + uv__make_close_pending(handle); +} + +static void uv__finish_close(uv_handle_t* handle) { + assert(handle->flags & UV_HANDLE_CLOSING); + assert(!(handle->flags & UV_HANDLE_CLOSED)); + handle->flags |= UV_HANDLE_CLOSED; + + uv__handle_unref(handle); + uv__queue_remove(&handle->handle_queue); + + if (handle->close_cb) { + handle->close_cb(handle); + } +} + +void uv__make_close_pending(uv_handle_t* handle) { + assert(handle->flags & UV_HANDLE_CLOSING); + assert(!(handle->flags & UV_HANDLE_CLOSED)); + // handle->next_closing = handle->loop->closing_handles; + // handle->loop->closing_handles = handle; + NEXT_TICK(((void (*)(void *))uv__finish_close), handle); +} + +int uv_is_closing(const uv_handle_t* handle) { + return uv__is_closing(handle); } int nanosleep(const struct timespec *, struct timespec *); diff --git a/packages/emnapi/src/uv/unix/internal.h b/packages/emnapi/src/uv/unix/internal.h new file mode 100644 index 00000000..fc20c11c --- /dev/null +++ b/packages/emnapi/src/uv/unix/internal.h @@ -0,0 +1,54 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#ifndef UV_UNIX_INTERNAL_H_ +#define UV_UNIX_INTERNAL_H_ + +#include "../uv-common.h" +#include "emnapi_common.h" +#include + +#ifndef EMNAPI_NEXTTICK_TYPE +#define EMNAPI_NEXTTICK_TYPE 0 +#endif +#if EMNAPI_NEXTTICK_TYPE == 0 +EMNAPI_INTERNAL_EXTERN void _emnapi_set_immediate(void (*callback)(void*), void* data); +#define NEXT_TICK(callback, data) _emnapi_set_immediate((callback), (data)) +#elif EMNAPI_NEXTTICK_TYPE == 1 +EMNAPI_INTERNAL_EXTERN void _emnapi_next_tick(void (*callback)(void*), void* data); +#define NEXT_TICK(callback, data) _emnapi_next_tick((callback), (data)) +#else +#error "Invalid EMNAPI_NEXTTICK_TYPE" +#endif + +typedef enum { + UV_CLOCK_PRECISE = 0, /* Use the highest resolution clock available. */ + UV_CLOCK_FAST = 1 /* Use the fastest clock with <= 1ms granularity. */ +} uv_clocktype_t; + +uint64_t uv__hrtime(uv_clocktype_t type); + +#if defined(__EMSCRIPTEN_PTHREADS__) || defined(_REENTRANT) +void uv__async_close(uv_async_t* handle); +void uv__make_close_pending(uv_handle_t* handle); +#endif + +#endif /* UV_UNIX_INTERNAL_H_ */ diff --git a/packages/emnapi/src/uv/unix/loop.c b/packages/emnapi/src/uv/unix/loop.c index 89389de3..eec60e80 100644 --- a/packages/emnapi/src/uv/unix/loop.c +++ b/packages/emnapi/src/uv/unix/loop.c @@ -1,16 +1,38 @@ #if defined(__EMSCRIPTEN_PTHREADS__) || defined(_REENTRANT) #include "../uv-common.h" +#include +#include int _emnapi_create_proxying_queue(uv_loop_t* loop); void _emnapi_destroy_proxying_queue(uv_loop_t* loop); int uv_loop_init(uv_loop_t* loop) { + uv__loop_internal_fields_t* lfields; + void* saved_data; int err; - QUEUE_INIT(&loop->wq); - QUEUE_INIT(&loop->async_handles); + + saved_data = loop->data; + memset(loop, 0, sizeof(*loop)); + loop->data = saved_data; + + lfields = (uv__loop_internal_fields_t*) uv__calloc(1, sizeof(*lfields)); + if (lfields == NULL) + return ENOMEM; + loop->internal_fields = lfields; + + err = uv_mutex_init(&lfields->loop_metrics.lock); + if (err) + goto fail_metrics_mutex_init; + memset(&lfields->loop_metrics.metrics, + 0, + sizeof(lfields->loop_metrics.metrics)); + + uv__queue_init(&loop->wq); + uv__queue_init(&loop->async_handles); + uv__queue_init(&loop->handle_queue); err = _emnapi_create_proxying_queue(loop); - if (err) return err; + if (err) goto fail_proxying_queue_init; err = uv_mutex_init(&loop->wq_mutex); if (err) goto fail_mutex_init; err = uv_async_init(loop, &loop->wq_async, uv__work_done); @@ -21,16 +43,30 @@ int uv_loop_init(uv_loop_t* loop) { uv_mutex_destroy(&loop->wq_mutex); fail_mutex_init: + +fail_proxying_queue_init: + +fail_metrics_mutex_init: + uv__free(lfields); + loop->internal_fields = NULL; + return err; } void uv__loop_close(uv_loop_t* loop) { + uv__loop_internal_fields_t* lfields; + uv_mutex_lock(&loop->wq_mutex); - assert(QUEUE_EMPTY(&loop->wq) && "thread pool work queue not empty!"); + assert(uv__queue_empty(&loop->wq) && "thread pool work queue not empty!"); assert(!uv__has_active_reqs(loop)); _emnapi_destroy_proxying_queue(loop); uv_mutex_unlock(&loop->wq_mutex); uv_mutex_destroy(&loop->wq_mutex); + + lfields = uv__get_internal_fields(loop); + uv_mutex_destroy(&lfields->loop_metrics.lock); + uv__free(lfields); + loop->internal_fields = NULL; } #endif diff --git a/packages/emnapi/src/uv/unix/posix-hrtime.c b/packages/emnapi/src/uv/unix/posix-hrtime.c new file mode 100644 index 00000000..aec918fe --- /dev/null +++ b/packages/emnapi/src/uv/unix/posix-hrtime.c @@ -0,0 +1,40 @@ +/* Copyright libuv project contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#if !defined(__wasm__) || (defined(__EMSCRIPTEN__) || defined(__wasi__)) + +#include "uv.h" +#include "internal.h" + +#include +#include +#include + +uint64_t uv__hrtime(uv_clocktype_t type) { + struct timespec t; + + if (clock_gettime(CLOCK_MONOTONIC, &t)) + abort(); + + return t.tv_sec * (uint64_t) 1e9 + t.tv_nsec; +} + +#endif diff --git a/packages/emnapi/src/uv/uv-common.c b/packages/emnapi/src/uv/uv-common.c index 5ec3d012..2cc4eb2a 100644 --- a/packages/emnapi/src/uv/uv-common.c +++ b/packages/emnapi/src/uv/uv-common.c @@ -1,9 +1,101 @@ #if defined(__EMSCRIPTEN_PTHREADS__) || defined(_REENTRANT) #include +#include +#include #include #include "uv-common.h" +typedef struct { + uv_malloc_func local_malloc; + uv_realloc_func local_realloc; + uv_calloc_func local_calloc; + uv_free_func local_free; +} uv__allocator_t; + +static uv__allocator_t uv__allocator = { + malloc, + realloc, + calloc, + free, +}; + +char* uv__strdup(const char* s) { + size_t len = strlen(s) + 1; + char* m = uv__malloc(len); + if (m == NULL) + return NULL; + return memcpy(m, s, len); +} + +char* uv__strndup(const char* s, size_t n) { + char* m; + size_t len = strlen(s); + if (n < len) + len = n; + m = uv__malloc(len + 1); + if (m == NULL) + return NULL; + m[len] = '\0'; + return memcpy(m, s, len); +} + +void* uv__malloc(size_t size) { + if (size > 0) + return uv__allocator.local_malloc(size); + return NULL; +} + +void uv__free(void* ptr) { + int saved_errno; + + /* Libuv expects that free() does not clobber errno. The system allocator + * honors that assumption but custom allocators may not be so careful. + */ + saved_errno = errno; + uv__allocator.local_free(ptr); + errno = saved_errno; +} + +void* uv__calloc(size_t count, size_t size) { + return uv__allocator.local_calloc(count, size); +} + +void* uv__realloc(void* ptr, size_t size) { + if (size > 0) + return uv__allocator.local_realloc(ptr, size); + uv__free(ptr); + return NULL; +} + +void* uv__reallocf(void* ptr, size_t size) { + void* newptr; + + newptr = uv__realloc(ptr, size); + if (newptr == NULL) + if (size > 0) + uv__free(ptr); + + return newptr; +} + +int uv_replace_allocator(uv_malloc_func malloc_func, + uv_realloc_func realloc_func, + uv_calloc_func calloc_func, + uv_free_func free_func) { + if (malloc_func == NULL || realloc_func == NULL || + calloc_func == NULL || free_func == NULL) { + return EINVAL; + } + + uv__allocator.local_malloc = malloc_func; + uv__allocator.local_realloc = realloc_func; + uv__allocator.local_calloc = calloc_func; + uv__allocator.local_free = free_func; + + return 0; +} + static uv_loop_t default_loop_struct; static uv_loop_t* default_loop_ptr; @@ -20,8 +112,8 @@ uv_loop_t* uv_default_loop(void) { } int uv_loop_close(uv_loop_t* loop) { - // QUEUE* q; - // uv_handle_t* h; + struct uv__queue* q; + uv_handle_t* h; #ifndef NDEBUG void* saved_data; #endif @@ -29,11 +121,11 @@ int uv_loop_close(uv_loop_t* loop) { if (uv__has_active_reqs(loop)) return EBUSY; - // QUEUE_FOREACH(q, &loop->handle_queue) { - // h = QUEUE_DATA(q, uv_handle_t, handle_queue); - // if (!(h->flags & UV_HANDLE_INTERNAL)) - // return UV_EBUSY; - // } + uv__queue_foreach(q, &loop->handle_queue) { + h = uv__queue_data(q, uv_handle_t, handle_queue); + if (!(h->flags & UV_HANDLE_INTERNAL)) + return EBUSY; + } uv__loop_close(loop); @@ -67,4 +159,28 @@ void uv_library_shutdown(void) { // #endif } +int uv_metrics_info(uv_loop_t* loop, uv_metrics_t* metrics) { + memcpy(metrics, + &uv__get_loop_metrics(loop)->metrics, + sizeof(*metrics)); + + return 0; +} + +uint64_t uv_metrics_idle_time(uv_loop_t* loop) { + uv__loop_metrics_t* loop_metrics; + uint64_t entry_time; + uint64_t idle_time; + + loop_metrics = uv__get_loop_metrics(loop); + uv_mutex_lock(&loop_metrics->lock); + idle_time = loop_metrics->provider_idle_time; + entry_time = loop_metrics->provider_entry_time; + uv_mutex_unlock(&loop_metrics->lock); + + if (entry_time > 0) + idle_time += uv_hrtime() - entry_time; + return idle_time; +} + #endif diff --git a/packages/emnapi/src/uv/uv-common.h b/packages/emnapi/src/uv/uv-common.h index 2b822f59..1c4c3098 100644 --- a/packages/emnapi/src/uv/uv-common.h +++ b/packages/emnapi/src/uv/uv-common.h @@ -27,6 +27,106 @@ #define container_of(ptr, type, member) \ ((type *) ((char *) (ptr) - offsetof(type, member))) +/* Handle flags. Some flags are specific to Windows or UNIX. */ +enum { + /* Used by all handles. */ + UV_HANDLE_CLOSING = 0x00000001, + UV_HANDLE_CLOSED = 0x00000002, + UV_HANDLE_ACTIVE = 0x00000004, + UV_HANDLE_REF = 0x00000008, + UV_HANDLE_INTERNAL = 0x00000010, + UV_HANDLE_ENDGAME_QUEUED = 0x00000020 +}; + +#define uv__has_active_reqs(loop) \ + ((loop)->active_reqs.count > 0) + +#define uv__req_register(loop, req) \ + do { \ + (loop)->active_reqs.count++; \ + } \ + while (0) + +#define uv__req_unregister(loop, req) \ + do { \ + assert(uv__has_active_reqs(loop)); \ + (loop)->active_reqs.count--; \ + } \ + while (0) + +#define uv__has_active_handles(loop) \ + ((loop)->active_handles > 0) + +#define uv__active_handle_add(h) \ + do { \ + (h)->loop->active_handles++; \ + } \ + while (0) + +#define uv__active_handle_rm(h) \ + do { \ + (h)->loop->active_handles--; \ + } \ + while (0) + +#define uv__is_active(h) \ + (((h)->flags & UV_HANDLE_ACTIVE) != 0) + +#define uv__is_closing(h) \ + (((h)->flags & (UV_HANDLE_CLOSING | UV_HANDLE_CLOSED)) != 0) +#define uv__handle_start(h) \ + do { \ + if (((h)->flags & UV_HANDLE_ACTIVE) != 0) break; \ + (h)->flags |= UV_HANDLE_ACTIVE; \ + if (((h)->flags & UV_HANDLE_REF) != 0) uv__active_handle_add(h); \ + } \ + while (0) + +#define uv__handle_stop(h) \ + do { \ + if (((h)->flags & UV_HANDLE_ACTIVE) == 0) break; \ + (h)->flags &= ~UV_HANDLE_ACTIVE; \ + if (((h)->flags & UV_HANDLE_REF) != 0) uv__active_handle_rm(h); \ + } \ + while (0) + +#define uv__handle_ref(h) \ + do { \ + if (((h)->flags & UV_HANDLE_REF) != 0) break; \ + (h)->flags |= UV_HANDLE_REF; \ + if (((h)->flags & UV_HANDLE_CLOSING) != 0) break; \ + if (((h)->flags & UV_HANDLE_ACTIVE) != 0) uv__active_handle_add(h); \ + } \ + while (0) + +#define uv__handle_unref(h) \ + do { \ + if (((h)->flags & UV_HANDLE_REF) == 0) break; \ + (h)->flags &= ~UV_HANDLE_REF; \ + if (((h)->flags & UV_HANDLE_CLOSING) != 0) break; \ + if (((h)->flags & UV_HANDLE_ACTIVE) != 0) uv__active_handle_rm(h); \ + } \ + while (0) + +#define uv__has_ref(h) \ + (((h)->flags & UV_HANDLE_REF) != 0) + +#if defined(_WIN32) +# define uv__handle_platform_init(h) ((h)->u.fd = -1) +#else +# define uv__handle_platform_init(h) ((h)->next_closing = NULL) +#endif + +#define uv__handle_init(loop_, h, type_) \ + do { \ + (h)->loop = (loop_); \ + (h)->type = (type_); \ + (h)->flags = UV_HANDLE_REF; /* Ref the loop when active. */ \ + uv__queue_insert_tail(&(loop_)->handle_queue, &(h)->handle_queue); \ + uv__handle_platform_init(h); \ + } \ + while (0) + #define UV_REQ_INIT(req, typ) \ do { \ (req)->type = (typ); \ @@ -56,6 +156,27 @@ } \ while (0) +#define uv__get_internal_fields(loop) \ + ((uv__loop_internal_fields_t*) loop->internal_fields) + +#define uv__get_loop_metrics(loop) \ + (&uv__get_internal_fields(loop)->loop_metrics) + +#define uv__metrics_inc_loop_count(loop) \ + do { \ + uv__get_loop_metrics(loop)->metrics.loop_count++; \ + } while (0) + +#define uv__metrics_inc_events(loop, e) \ + do { \ + uv__get_loop_metrics(loop)->metrics.events += (e); \ + } while (0) + +#define uv__metrics_inc_events_waiting(loop, e) \ + do { \ + uv__get_loop_metrics(loop)->metrics.events_waiting += (e); \ + } while (0) + #define uv__exchange_int_relaxed(p, v) \ atomic_exchange_explicit((_Atomic int*)(p), v, memory_order_relaxed) @@ -68,7 +189,30 @@ enum uv__work_kind { void uv__threadpool_cleanup(void); void uv__work_done(uv_async_t* handle); void uv__loop_close(uv_loop_t* loop); -void uv__async_close(uv_async_t* handle); + +void *uv__calloc(size_t count, size_t size); +char *uv__strdup(const char* s); +char *uv__strndup(const char* s, size_t n); +void* uv__malloc(size_t size); +void uv__free(void* ptr); +void* uv__realloc(void* ptr, size_t size); +void* uv__reallocf(void* ptr, size_t size); + +typedef struct uv__loop_metrics_s uv__loop_metrics_t; +typedef struct uv__loop_internal_fields_s uv__loop_internal_fields_t; + +struct uv__loop_metrics_s { + uv_metrics_t metrics; + uint64_t provider_entry_time; + uint64_t provider_idle_time; + uv_mutex_t lock; +}; + +struct uv__loop_internal_fields_s { + unsigned int flags; + uv__loop_metrics_t loop_metrics; + int current_timeout; +}; #endif