-
Notifications
You must be signed in to change notification settings - Fork 1
thread pool
This (a-sync thread-pool) module supplements the (a-sync event-loop) module. (a-sync event-loop) provides asynchronous procedures which can wait on the completion of a task running in a worker thread or on an event loop running in another thread. However, sometimes it is better to run tasks in a thread pool rather than launching worker threads, particularly where the workload suits having the number of threads in the thread pool approximating to the number of local processors available to the program. (In guile, the number of processors so available can be obtained using the current-processor-count procedure.)
This (a-sync thread-pool) module provides such a thread pool, together with two asynchronous procedures (await-task-in-thread-pool! and await-generator-in-thread-pool!) which can wait in an event loop for a task on the thread pool to complete and provide its result.
The thread pool objects provided by this module do not make provision for rate limiting similar to that provided by the event loops in the (a-sync event-loop) module. This is because there is no one-size-fits-all way of doing so. One common approach is, as in the case of the event loops provided by this library, to apply throttling to threads which add tasks by enforcing a wait in their thread of execution when the level of queued tasks reaches too high a level, so hindering their ability to add new ones. However this is counter productive where it is a task running on the thread pool which is adding the new tasks, particularly with a thread pool having only a few threads running in its pool. Another approach is to throw an exception when adding tasks which exceed a user-selectable level.
The best approach is for user code to provide its own rate limiting in cases where the way that that code is organised means that it could produce an excessive number of accumulating unexecuted tasks in the thread pool, possibly by applying delays when unexecuted tasks rise in number excessively, using timeouts with an event loop. This may be checked for by having code call the thread-pool-get-num-tasks procedure before adding a significant batch of new tasks in order to test queue size, and if necessary postpone adding the new tasks until the size of the already accumulated tasks has reduced.
This module provides the following procedures:
(make-thread-pool #:key max-threads min-threads idle non-blocking)
This procedure constructs a thread pool object of native OS threads. It takes four optional arguments. The #:max-thread keyname specifies the maximum number of threads which will run in the pool, and the default value is 8. The #:min-threads keyname specifies the minimum number of persistent threads which will run in the pool and will not be subject to an #:idle timeout, and the default value is 0. It is an error if #:min-threads is greater than #:max-threads.
Thread pool objects with a #:min-threads setting of greater than 0 are usually best kept as top level objects, because the minimum value of threads will keep alive in the pool until thread-pool-stop! is called. If such a thread pool is constructed within a local lexical scope, then either thread-pool-stop! must be applied to the pool before that scope is exited, or the last task added to the pool should itself apply thread-pool-stop! to the pool (which it can do if 'non-blocking' is #t). Otherwise, the minimum value of threads will remain alive uselessly in blocked condition in the pool until the program terminates, even though the pool may be inaccessible.
The #:idle keyname specifies the length of time in milliseconds that threads greater in number than #:min-threads and not executing any tasks will remain in existence. The default is 5000 (5 seconds).
The #:non-blocking keyname affects the operation of the thread-pool-stop! procedure. When set to #f, which is the default, that procedure will not return until all tasks previously added to the pool have completed. If set to #t, the thread-pool-stop! procedure will return immediately, before all tasks have finished.
The #:max-threads, #:non-blocking and #:idle settings may subsequently be altered by applying the thread-pool-change-max-threads!, thread-pool-set-non-blocking! or thread-pool-set-idle-time! procedure to the pool.
This procedure will throw an exception if the system is unable to start the number of threads given as the #:min-threads argument. In such a case, any threads which have in fact started in the pool will be killed.
(thread-pool? obj)
This procedure indicates whether 'obj' is a thread pool object constructed by make-thread-pool.
(thread-pool-get-num-tasks pool)
This procedure returns the number of tasks which the thread pool object is at present either running in the pool or has queued for execution. This procedure will not throw. It is also thread safe, although it accesses the task number field outside the pool mutex and therefore with relaxed memory ordering. That enables this procedure to be applied more efficiently for rate limiting purposes but the result might at any one time be marginally out of date.
(thread-pool-get-num-threads pool)
This procedure returns the number of threads currently running in the thread pool.
This procedure is thread safe (any thread may call it).
(thread-pool-get-max-threads pool)
This procedure returns the current maximum number of threads set for the thread pool.
This procedure is thread safe (any thread may call it).
(thread-pool-change-max-threads! pool delta)
This procedure will increase, or if 'delta' is negative reduce, the maximum number of threads which the thread pool object will currently run by the value of 'delta'. The main purpose of this is to enable a task to increment the maximum thread number where it is about to enter a call which may block (non-asynchronously) for some time, with a view to decrementing it later when it has finished making blocking calls, so as to enable another thread to keep a core active. A with-thread-pool-increment macro is available which will do this for you automatically in an exception-safe way (see the documentation on that macro below).
If 'delta' is negative and results in a max-threads value of less than the current number of running threads, the number of threads actually running will only be reduced as tasks complete, or as idle timeouts expire. The maximum number of threads will not be reduced by this procedure below the min-threads value, or if that value is 0, below 1.
This procedure does nothing if thread-pool-stop! has previously been called. This procedure is thread safe - any thread may call it.
If 'delta' is positive and tasks are currently queued for execution, a new thread or threads will be started for the queued tasks. This procedure may therefore throw an exception if the system is unable to start the required new thread(s). Because starting new threads can be time consuming, to minimize contention new threads are started outside the pool's mutex, although internal book-keeping is done within the mutex. One consequence is that if such an exception is thrown while another thread has concurrently tried to reduce the size of the pool, the number of threads running in the pool may be smaller than it was when this procedure was called. Where min-threads is 0, in certain circumstances after an exception where no new threads can be started, the pool could have no running threads in it (so that thread-pool-get-num-threads returns 0) even though some tasks previously added to the pool remain pending. If the system can start no new threads even though none are running in the pool, it will be significantly broken so it is not usually worth troubling about this - the program is doomed in that event whatever. However if that is wrong and the cause of the failure to start any threads can be addressed, then the thread pool can be brought back into use by calling this procedure again with a positive value (which can be preceded by a call with a negative value to prevent too many threads trying to start).
(thread-pool-get-non-blocking pool)
This procedure returns the current non-blocking status of the thread pool. (See the documentation on the thread-pool-stop! procedure for more information about what that means.)
This procedure is thread safe (any thread may call it).
(thread-pool-set-non-blocking! pool val)
This procedure sets the non-blocking status of the thread pool. If 'val' is #f, the thread-pool-stop procedure will block, if #t it will not. (See the documentation on the thread-pool-stop! procedure for more information about this.)
This procedure is thread safe (any thread may call it).
This procedure will throw a compound &thread-pool-error exception (also incorporating &origin and &message objects) if it is invoked after the thread pool object concerned has been closed by a call to thread-pool-stop!.
(thread-pool-get-idle-time pool)
This procedure returns the current idle time setting for the thread pool, in milliseconds.
This procedure is thread safe (any thread may call it).
(thread-pool-set-idle-time! pool millisecs)
This procedure sets the current idle time for the thread pool, namely the length of time in milliseconds that threads greater in number than the minimum and not executing any tasks will remain in existence waiting for new tasks. This will only have effect for threads in the pool which begin waiting for new tasks after this procedure is called.
This procedure is thread safe (any thread may call it).
(thread-pool-stop! pool)
This procedure will cause the thread-pool object to stop running tasks. However, all tasks already running or queued for execution will be permitted to execute and complete normally. If the thread-pool's non-blocking setting is set to #f, this procedure will wait until all the tasks still to execute have finished before returning, and if #t it will return straight away.
After this procedure has been called, any attempt to add further tasks with the thread-pool-add! procedure will fail, and that procedure will throw a compound &thread-pool-error exception (also incorporating &origin and &message objects). The same exception will be thrown if this procedure is applied to a thread pool to which this procedure has previously been applied.
This procedure is thread safe (any thread may call it) unless the non-blocking setting is #f, in which case no task running on the thread-pool object may call this procedure.
(thread-pool-add! pool task [fail-handler])
This procedure adds a new task to the thread pool. 'task' must be a thunk. If one or more threads in the pool are currently blocking and waiting for a task, then the task will begin executing immediately in one of the threads. If not, and the number of threads running in the pool is less than the value returned by thread-pool-get-max-threads, a new thread will start and the task will execute immediately in the new thread. Otherwise, the task will be queued for execution as soon as a thread becomes available. Tasks will begin execution in the order in which they are added to the thread pool object. This procedure is thread safe (any thread may call it, including any task running on the thread pool object).
An optional handler procedure may be passed to 'fail-handler' which will be invoked if the task throws an exception. The 'fail-handler' procedure will be passed the exception object which was thrown. If a task throws an exception and no handler procedure is provided, the program will terminate.
If this procedure starts a new thread (see above), it may throw an exception if the system is unable to start the thread correctly, and if it does so the task will not be added. This procedure will throw a compound &thread-pool-error exception (also incorporating &origin and &message objects) if it is invoked after the thread pool object concerned has been closed by a call to thread-pool-stop!.
(with-thread-pool-increment pool body0 body1 ...)
This macro is intended to be called by a task running on a thread pool which is about to make a blocking (non-asynchronous) call. It will increment the max-threads value of 'pool' by 1 (by calling thread-pool-change-max-threads!) so as to enable a queued task to keep a core active, and decrement it again when execution of the body clauses has completed.
The (i) increment, (ii) execution of body clauses, and (iii) decrement, form the three branches of a dynamic-wind, so the decrement automatically occurs if control leaves body execution because of an exception or other jump.
(await-task-in-thread-pool! await resume [loop] pool thunk [handler])
The 'loop' argument is optional. The procedure will run 'thunk' in the thread pool specified by the 'pool' argument. The result of executing 'thunk' will then be posted to the event loop specified by the 'loop' argument, or to the default event loop if no 'loop' argument is provided or if #f is provided as the 'loop' argument (pattern matching is used to detect the type of the third argument), and will comprise this procedure's return value. This procedure is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments). It will normally be necessary to call event-loop-block! on 'loop' (or on the default event loop) before invoking this procedure.
If the optional 'handler' argument is provided, then that handler will run if 'thunk' throws, and the return value of the handler would become the return value of this procedure; otherwise the program will terminate if an unhandled exception propagates out of 'thunk'. 'handler' should take a single argument, which will be the thrown exception object. Note that unlike a handler passed to the thread-pool-add! procedure, 'handler' will run in the event loop thread and not in a thread pool thread. Exceptions thrown by the handler procedure will propagate out of event-loop-run! for the 'loop' event loop.
This procedure calls 'await' and must (like the a-sync procedure) be called in the same thread as that in which the 'loop' or default event loop runs (as the case may be).
This procedure calls event-post! in the 'loop' or default event loop, which could be subject to throttling (see the documentation for the make-event-loop procedure for further information).
Exceptions may propagate out of this procedure if they arise while setting up, which shouldn't happen unless the thread pool given by the 'pool' argument has been closed (in which case a compound &thread-pool-error exception, also incorporating &origin and &message objects, will arise), the thread pool tries to start an additional native thread which the operating system fails to supply (which would cause a system exception to arise) or memory is exhausted.
Here is an example of the use of await-task-in-thread-pool!:
(set-default-event-loop!) ;; if none has yet been set
(let ((pool (make-thread-pool #:max-threads 4)))
(event-loop-block! #t) ;; because the task runs in another thread
(a-sync (lambda (await resume)
(simple-format #t "1 + 1 is ~A\n"
(await-task-in-thread-pool! await resume
pool
(lambda ()
(+ 1 1))))
(event-loop-block! #f))))
(event-loop-run!)
(await-generator-in-thread-pool! await resume [loop] pool generator proc [handler])
The loop argument is optional. The 'generator' argument is a procedure taking one argument, namely a yield argument (see the documentation on the make-iterator procedure for further details). This await-generator-in-thread-pool! procedure will cause 'generator' to run as a task in the 'pool' thread pool, and whenever 'generator' yields a value this will cause 'proc' to execute in the event loop specified by the 'loop' argument, or in the default event loop if no 'loop' argument is provided or if #f is provided as the 'loop' argument. 'proc' should be a procedure taking a single argument, namely the value yielded by the generator.
This procedure is intended to be called within a waitable procedure invoked by a-sync (which supplies the 'await' and 'resume' arguments). It will normally be necessary to call event-loop-block! on 'loop' (or on the default event loop) before invoking this procedure.
If the optional 'handler' argument is provided, then that handler will run if 'generator' throws an exception; otherwise the program will terminate if an unhandled exception propagates out of 'generator'. 'handler' should take a single argument, which will be the thrown exception object. Note that unlike a handler passed to the thread-pool-add! procedure, 'handler' will run in the event loop thread and not in a thread pool thread. This procedure will return #f if the generator completes normally, or 'guile-a-sync-thread-error if the generator throws an exception and 'handler' is run (the 'guile-a-sync-thread-error symbol is reserved to the implementation and should not be yielded by the generator). Exceptions thrown by the handler procedure will propagate out of event-loop-run! for the 'loop' event loop.
This procedure calls 'await' and will return when the generator has finished or, if 'handler' is provided, upon the generator raising an exception. This procedure must (like the a-sync procedure) be called in the same thread as that in which the 'loop' or default event loop runs (as the case may be).
This procedure calls event-post! in the 'loop' or default event loop, which could be subject to throttling (see the documentation for the make-event-loop procedure for further information).
Exceptions may propagate out of this procedure if they arise while setting up, which shouldn't happen unless the thread pool given by the 'pool' argument has been closed (in which case a compound &thread-pool-error exception, also incorporating &origin and &message objects, will arise), the thread pool tries to start an additional native thread which the operating system fails to supply (which would cause a system exception to arise) or memory is exhausted. Exceptions arising during the execution of 'proc', if not caught locally, will propagate out of event-loop-run! for 'loop' or the default event loop (as the case may be).
Here is an example of the use of await-generator-in-thread-pool!:
(set-default-event-loop!) ;; if none has yet been set
(let ((pool (make-thread-pool #:max-threads 4)))
(event-loop-block! #t) ;; because the generator runs in another thread
(a-sync (lambda (await resume)
(await-generator-in-thread-pool! await resume
pool
(lambda (yield)
(let loop ((count 0))
(when (< count 5)
(yield (* 2 count))
(loop (1+ count)))))
(lambda (val)
(display val)
(newline)))
(event-loop-block! #f))))
(event-loop-run!)
Modules:
- (a-sync coroutines)
- (a-sync event-loop)
- (a-sync await-ports)
- (a-sync thread-pool)
- (a-sync monotonic-time)
- (a-sync gnome-glib)
- (a-sync compose)
- (a-sync meeting)
- (a-sync g-golf)
- (a-sync guile-gi)
Other: