Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic Linux and Windows Thread Prioritisation for worker threads #842

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ check_function_exists(pthread_attr_setcpupercent_np HAVE_PTHREAD_ATTR_SETCPUPERC
check_function_exists(pthread_yield_np HAVE_PTHREAD_YIELD_NP)
check_function_exists(pthread_main_np HAVE_PTHREAD_MAIN_NP)
check_function_exists(pthread_workqueue_setdispatch_np HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP)
check_function_exists(pthread_setname_np HAVE_PTHREAD_SETNAME_NP)
check_function_exists(strlcpy HAVE_STRLCPY)
check_function_exists(sysconf HAVE_SYSCONF)
check_function_exists(arc4random HAVE_ARC4RANDOM)
Expand Down
3 changes: 3 additions & 0 deletions cmake/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@
/* Define to 1 if you have the `_pthread_workqueue_init' function. */
#cmakedefine HAVE__PTHREAD_WORKQUEUE_INIT

/* Define to 1 if you have the `pthread_setname_np' function. */
#cmakedefine01 HAVE_PTHREAD_SETNAME_NP

/* Define to use non-portable pthread TSD optimizations for Mac OS X) */
#cmakedefine USE_APPLE_TSD_OPTIMIZATIONS

Expand Down
4 changes: 2 additions & 2 deletions dispatch/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ DISPATCH_DECL_SUBCLASS(dispatch_queue_main, dispatch_queue_serial);
*
* @discussion
* Dispatch concurrent queues are lightweight objects to which regular and
* barrier workitems may be submited. Barrier workitems are invoked in
* barrier workitems may be submitted. Barrier workitems are invoked in
* exclusion of any other kind of workitem in FIFO order.
*
* Regular workitems can be invoked concurrently for the same concurrent queue,
* in any order. However, regular workitems will not be invoked before any
* barrier workitem submited ahead of them has been invoked.
* barrier workitem submitted ahead of them has been invoked.
*
* In other words, if a serial queue is equivalent to a mutex in the Dispatch
* world, a concurrent queue is equivalent to a reader-writer lock, where
Expand Down
4 changes: 2 additions & 2 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,9 @@ dispatch_get_global_queue(intptr_t priority, uintptr_t flags)
}
dispatch_qos_t qos = _dispatch_qos_from_queue_priority(priority);
#if !HAVE_PTHREAD_WORKQUEUE_QOS
if (qos == QOS_CLASS_MAINTENANCE) {
if (qos == DISPATCH_QOS_MAINTENANCE) {
qos = DISPATCH_QOS_BACKGROUND;
} else if (qos == QOS_CLASS_USER_INTERACTIVE) {
} else if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
qos = DISPATCH_QOS_USER_INITIATED;
}
#endif
Expand Down
94 changes: 92 additions & 2 deletions src/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,37 @@
#include "protocol.h" // _dispatch_send_wakeup_runloop_thread
#endif

#if defined(__linux__)
#include <errno.h>
#include <sys/resource.h>
#endif

#if defined(_WIN32)
// Wrapper around SetThreadDescription for UTF-8 strings
void _dispatch_win32_set_thread_description(HANDLE hThread, const char *description) {
int wcsize = MultiByteToWideChar(CP_UTF8, 0, description, -1, NULL, 0);
if (wcsize == 0) {
return;
}

wchar_t* wcstr = (wchar_t*)malloc(wcsize * sizeof(wchar_t));
if (wcstr == NULL) {
return;
}

int result = MultiByteToWideChar(CP_UTF8, 0, description, -1, wcstr, wcsize);
if (result == 0) {
free(wcstr);
return;
}

if (likely(wcstr != NULL)) {
SetThreadDescription(hThread, wcstr);
free(wcstr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should just do this unconditionally. free(NULL) is well defined. We could even simplify that to then:

if (result == 0) {
  assert(wcstr);
  SetThreadDescription(hThread, wcstr);
}

free(wcstr);

}
}
#endif
hmelder marked this conversation as resolved.
Show resolved Hide resolved

static inline void _dispatch_root_queues_init(void);
static void _dispatch_lane_barrier_complete(dispatch_lane_class_t dqu,
dispatch_qos_t qos, dispatch_wakeup_flags_t flags);
Expand Down Expand Up @@ -6216,10 +6247,61 @@ _dispatch_worker_thread(void *context)
_dispatch_sigmask();
#endif
_dispatch_introspection_thread_add();
dispatch_priority_t pri = dq->dq_priority;
pthread_priority_t pp = _dispatch_get_priority();

// The Linux kernel does not have a direct analogue to the QoS-based
// thread policy engine found in XNU.
//
// We cannot use 'pthread_setschedprio', because all threads with default
// scheduling policy (SCHED_OTHER) have the same pthread 'priority'.
// For both CFS, which was introduced in Linux 2.6.23, and its successor
// EEVDF (since 6.6) 'sched_get_priority_max' and 'sched_get_priority_min'
// will just return 0.
//
// However, as outlined in "man 2 setpriority", the nice value is a
// per‐thread attribute: different threads in the same process can have
// different nice values. We can thus setup the thread's initial priority
// by converting the QoS class and relative priority to a 'nice' value.
#if defined(__linux__)
pp = _dispatch_priority_to_pp_strip_flags(pri);
int nice = _dispatch_pp_to_nice(pp);

#if HAVE_PTHREAD_SETNAME_NP
// pthread thread names are restricted to just 16 characters
// including NUL. It does not make sense to pass the queue's
// label as a name.
pthread_setname_np(pthread_self(), "DispatchWorker");
#endif

errno = 0;
int rc = setpriority(PRIO_PROCESS, 0, nice);
if (rc != -1 || errno == 0) {
_dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
} else {
_dispatch_log("Failed to set thread priority for worker thread: pqc=%p errno=%d\n", pqc, errno);
}
#elif defined(_WIN32)
pp = _dispatch_priority_to_pp_strip_flags(pri);
int win_priority = _dispatch_pp_to_win32_priority(pp);

HANDLE current = GetCurrentThread();

// Set thread description to the label of the root queue
if (dq->dq_label) {
_dispatch_win32_set_thread_description(current, dq->dq_label);
}

int rc = SetThreadPriority(current, win_priority);
if (rc) {
_dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
} else {
DWORD dwError = GetLastError();
_dispatch_log("Failed to set thread priority for worker thread: pqc=%p win_priority=%d dwError=%lu\n", pqc, win_priority, dwError);
}
#endif

const int64_t timeout = 5ull * NSEC_PER_SEC;
pthread_priority_t pp = _dispatch_get_priority();
dispatch_priority_t pri = dq->dq_priority;

// If the queue is neither
// - the manager
Expand Down Expand Up @@ -6258,6 +6340,14 @@ _dispatch_worker_thread(void *context)
(void)os_atomic_inc2o(dq, dgq_thread_pool_size, release);
_dispatch_root_queue_poke(dq, 1, 0);
_dispatch_release(dq); // retained in _dispatch_root_queue_poke_slow

#if defined(_WIN32)
// Make sure to properly end the background processing mode
if (win_priority == THREAD_MODE_BACKGROUND_BEGIN) {
SetThreadPriority(current, THREAD_MODE_BACKGROUND_END);
}
#endif

return NULL;
}
#if defined(_WIN32)
Expand Down
65 changes: 65 additions & 0 deletions src/shims/priority.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,71 @@ _dispatch_qos_to_pp(dispatch_qos_t qos)
return pp | _PTHREAD_PRIORITY_PRIORITY_MASK;
}


#if defined(__linux__)
// These presets roughly match the `android.os.Process' constants
// used for `setThreadPriority()'.
//
// Be aware that with the Completely Fair Scheduler (CFS) the weight is computed
// as 1024 / (1.25) ^ (nice) where nice is in the range -20 to 19.
// This means that nice is not a linear scale.
#define DISPATCH_NICE_BACKGROUND 10
#define DISPATCH_NICE_UTILITY 2
#define DISPATCH_NICE_DEFAULT 0
// Note that you might not have permission to increase the priority
// of a thread beyond the default priority.
#define DISPATCH_NICE_USER_INITIATED -2
#define DISPATCH_NICE_USER_INTERACTIVE -4

DISPATCH_ALWAYS_INLINE
static inline int _dispatch_pp_to_nice(pthread_priority_t pp)
{
// FIXME: What about relative priorities?
uint32_t qos = _dispatch_qos_from_pp(pp);

switch (qos) {
case DISPATCH_QOS_BACKGROUND:
return DISPATCH_NICE_BACKGROUND;
case DISPATCH_QOS_UTILITY:
return DISPATCH_NICE_UTILITY;
case DISPATCH_QOS_DEFAULT:
return DISPATCH_NICE_DEFAULT;
case DISPATCH_QOS_USER_INITIATED:
return DISPATCH_NICE_USER_INITIATED;
case DISPATCH_QOS_USER_INTERACTIVE:
return DISPATCH_NICE_USER_INTERACTIVE;
}

return DISPATCH_NICE_DEFAULT;
}
#endif // defined(__linux__)

#if defined(_WIN32)
DISPATCH_ALWAYS_INLINE
static inline int _dispatch_pp_to_win32_priority(pthread_priority_t pp) {
uint32_t qos = _dispatch_qos_from_pp(pp);

switch (qos) {
case DISPATCH_QOS_BACKGROUND:
// Make sure to end background mode before exiting the thread!
return THREAD_MODE_BACKGROUND_BEGIN;
case DISPATCH_QOS_UTILITY:
return THREAD_PRIORITY_BELOW_NORMAL;
case DISPATCH_QOS_DEFAULT:
return THREAD_PRIORITY_NORMAL;
// User input threads should be THREAD_PRIORITY_NORMAL, to
// avoid unintentionally starving the system
case DISPATCH_QOS_USER_INITIATED:
return THREAD_PRIORITY_NORMAL;
case DISPATCH_QOS_USER_INTERACTIVE:
return THREAD_PRIORITY_NORMAL;
}

return THREAD_PRIORITY_NORMAL;
}
#endif // defined(_WIN32)


// including maintenance
DISPATCH_ALWAYS_INLINE
static inline bool
Expand Down