forked from dabeaz/curio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
CHANGES
1373 lines (1057 loc) · 58.3 KB
/
CHANGES
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
CHANGES
-------
Version 0.10 : In Progress
--------------------------
**** SPECIAL NOTE ****: For the past few years, Curio has been been an
experimental project. However, as it moves towards having a more
"stable" release, I feel that it is better served by being small as
opposed to providing every possible feature. Thus, a wide range of
features are being removed or refactored a bit. If a feature you are
using gets removed and you'd like to request its return, please submit
a bug report! - Dave
10/15/2019 Refactored task.py into separate files for tasks and time
management.
09/29/2019 Removed Promise. Not documented, but also want to rethink the
whole design/implementation of it. The "standard" way that
Python implements "promises" is through the Future class
as found in the concurrent.futures module. However, Futures
are tricky. They often have callback functions attached to
them and they can be cancelled. Some further thought needs
to be given as to how such features might integrate with the
rest of Curio. Code for the old Promise class can be
found the examples directory.
09/26/2019 Removed support for the Asyncio bridge. It wasn't documented
and there are many possible ways in which Curio might
potentially interact with an asyncio event loop. For example,
using queues. Asyncio interaction may be revisited in the
future.
09/13/2019 Support for context-manager use of spawn_thread() has been
withdrawn. It's a neat feature, but the implementation
is pretty hairy. It also creates a situation where async
functions partially run in coroutines and partially in threads.
All things equal, it's probably more sane to make this
kind of distinction at the function level, not at the level
of code blocks.
09/11/2019 Removed AsyncObject and AsyncInstanceType. Very specialized.
Not used elsewhere in Curio. Involves metaclasses. One
less thing to maintain.
09/11/2019 I want to use f-strings. Now used for string formatting
everywhere except in logging messages. Sorry Python 3.5.
09/11/2019 Removed the allow_cancel optional argument to spawn().
If a task wants to disable cancellation, it should
explicitly manage that on its own.
09/11/2019 Removed the report_crash option to spawn(). Having
it as an optional argument is really the wrong place for
this. On-by-default crash logging is extremely useful for
debugging. However, if you want to disable it, it's
annoying to have to go change your source code on a task-by-task
basis. A better option is to suppress it via configuration
of the logging module. Better yet: write your code so that
it doesn't crash.
09/10/2019 Some refactoring of some internal scheduling operations.
The SchedFIFO and SchedBarrier classes are now available
for more general use by any code that wants to implement
different sorts of synchronization primitives.
09/10/2019 Removed the abide() function. This feature was from the
earliest days of Curio when there was initial thinking
about the interaction of async tasks and existing threads.
The same functionality can still be implemented using run_in_thread()
or block_in_thread() instead. In the big picture, the problem
being solved might not be that common. So, in the interest
of making Curio smaller, abide() has ridden off into the sunset.
09/08/2019 Removed BoundedSemaphore.
09/03/2019 Removed the experimental aside() functionality. Too much
magic. Better left to others.
09/03/2019 Removed the gather() function. Use TaskGroup instead.
09/03/2019 Removed get_all_tasks() function.
09/03/2019 Removed the Task.pdb() method.
09/03/2019 Removed the Task.interrupt() method.
09/03/2019 The pointless (and completely unused) name argument to TaskGroup()
has been removed.
08/09/2019 Exceptions raised inside the Curio kernel itself are no longer
reported to tasks. Instead, they cause Curio to die. The
kernel is never supposed to raise exceptions on its own--any
exception that might be raised is an internal programming error.
This change should not impact user-level code, but might affect
uncontrolled Ctrl-C handling. If a KeyboardInterrupt occurs
in the middle of kernel-level code, it will cause an uncontrolled
death. If this actually matters to you, then modify your code to
properly handle Ctrl-C via signal handling.
04/14/2019 The Channel.connect() method no longer implements auto-retry.
In practice, this kind of feature can cause interference. Better
to let the caller do the retry if they want.
04/14/2019 Simplified the implementation and semantics of cancellation control.
The enable_cancellation() function has been removed. It is now
only possible to disable cancellation. Nesting is still allowed.
Pending cancellation exceptions are raised on the first blocking
call executed when reenabled. The check_cancellation() function
can be used to explicitly check for cancellation as before.
03/09/2019 Fixed a bug in network.open_connection() that was passing arguments
incorrectly. Issue #291.
11/18/2018 Removed code that attempted to detect unsafe async generator
functions. Such code is now executed without checks or
warnings. It's still possible that code in finally blocks
might not execute unless you use curio.meta.finalize() or
a function such as async_generator.aclosing() (third party).
The @safe_generator decorator is now also unnecessary.
11/11/2018 Removed the wait argument to TaskGroup.join(). The waiting policy
for task groups is specified in the TaskGroup constructor. For
example:
with TaskGroup(wait=any) as group:
...
09/05/2018 Tasks submitted to Kernel.run() no longer log exceptions should they
crash. Such tasks already return immediately to the caller with the
exception raised.
09/05/2018 Refinement to Kernel.__exit__() to make sure the kernel shuts down
regardless of any exceptions that have occurred. See Issue #275.
04/29/2018 New task-related function. get_all_tasks() returns a list of all active
Tasks. For example:
tasks = await get_all_tasks()
Tasks also have a where() method that returns a (filename, lineno) tuple
indicating where they are executing.
04/29/2018 Curio now properly allows async context managers to be defined using
context.asynccontextmanager. Issue #247.
04/29/2018 Removed the cancel_remaining keyword argument from TaskGroup.next_done()
04/28/2018 Added new "object" wait mode to TaskGroup. It waits for the
first non-None result to be returned by a task. Then all
remaining tasks are cancelled. For example:
async def coro1():
await sleep(1)
return None
async def coro2():
await sleep(2)
return 37
async def coro3():
await sleep(3)
return 42
async with TaskGroup(wait=object) as tg:
await tg.spawn(coro1) # Ignored (returns None)
await tg.spawn(coro2) # Returns result
await tg.spawn(coro3) # Cancelled
print(tg.completed.result) # -> 37
04/27/2018 Removed the ignore_result keyword argument to TaskGroup.spawn().
It's not really needed and the extra complexity isn't worth it.
04/27/2018 Added TaskGroup.next_result() function. It's mostly a convenience
function for returning the result of the next task that completes.
If the task failed with an exception, that exception is raised.
04/14/2018 Changed the method of spawning processes for run_in_process to
use the "spawn" method of the multiprocessing module. This
prevents issues of having open file-descriptors cloned by
accident via fork(). For example, as in Issue #256.
Version 0.9 : March 10, 2018
----------------------------
02/24/2018 Refinements to Task crash reporting. By default all Tasks
that terminate with an uncaught exception log that exception
as soon as it is detected. Although there is a risk that this
creates extra output, silencing the output makes it almost
impossible to debug programs. This is because errors get
deferred to a later join() method---and you often don't know
when that's going to take place. You might just be staring
a program wondering why it's not working. If you really
want errors to be silent, use spawn(coro, report_crash=False).
02/23/2018 Some refinements to the AWAIT() function. You can now call it as
follows:
result = AWAIT(callable, *args, **kwargs)
If the passed callable is a coroutine function or a function that
produces an awaitable object, it will be passed to Curio and executed
in an asynchronous context. If callable is just a normal function,
then callable(*args, **kwargs) is returned.
This change is made to make it slightly easier to use objects
as UniversalQueue within the context of an async thread. For example,
if you do this::
q = UniversalQueue()
...
@async_thread
def consumer():
while True:
item = AWAIT(q.get)
print("Got:", item)
The AWAIT operation will run the asynchronous version of q.get()
instead of the normal synchronous version. You want this--the
async version supports cancellation and other nice features.
02/22/2018 The async_thread() function is now meant to be used as a decorator only.
@async_thread
def func(args):
...
When the decorated function is called from asynchronous code using
await func(args), it will seamlessly run in a separate execution thread.
However, the function can also be called from synchronous code using
func(args). In that case, the function runs as it normally does.
It's subtle, but the @async_thread decorator allows a function to adapt
to either a synchronous or asynchronous environment depending on how
it has been called.
02/22/2018 New function spawn_thread() can be used to launch asynchronous threads.
It mirrors the use of the spawn() function. For example:
def func(args):
...
t = await spawn_thread(func, args)
result = await t.join()
Previously, async threads were created using the AsyncThread class.
That still works, but the spawn_thread() function is probably easier.
The launched function must be a normal synchronous function.
spawn_thread() can also be used as a context manager (see below).
02/19/2018 Added ability of async threads to be created via context manager.
For example:
async with spawn_thread():
# Various blocking/synchronous operations
# Executes in a separate thread
...
When used, the body of the context manager runs in a
separate thread and may involve blocking operations.
However, be aware that any use of async/await is NOT
allowed in such a block. Any attempt to await on an async
function inside the block will result in a RuntimeError
exception.
02/06/2018 Refinements to the schedular activation API and debugging
features.
02/04/2018 The ZMQ module has been removed from Curio core and put into
the examples directory. This should be spun into a separate
package maintained independently of Curio.
02/03/2018 Local() objects have been removed. The idea of having a
thread-local style object for storing attributes is fraught
with problems--especially given the potential mix of tasks,
threads, and processes that is possible in Curio. The
implementation of this has been moved into the examples
directory where it can be adapted/copied into your code if
it's still needed. Better yet, maybe take a look at PEP 567.
02/02/2018 Some refactoring and simplification of the kernel run()
method. First, run() only executes as long as the
submitted coroutine is active. Upon termination, run()
immediately returns regardless of whether or not other
tasks are still running. Curio was already generating
warning messages for orphaned tasks--tasks for which
Task.join() is never invoked. As such, this change should
not affect most properly written applications.
Second, the timeout argument is no longer supported. If
you want a timeout on what's happening, put it into the
supplied async function itself.
Finally, if Kernel.run() is called with no arguments, it
causes the kernel to process any activity that might be
pending at that moment in time before returning. This is
something that might be useful should it be necessary
to integrate Curio with a foreign event-loop.
01/31/2018 If the main task crashes with an exception, the kernel
now returns immediately--even if child tasks are still
in progress. This prevents a problem where the main task
crashes, but it's not reported for an extended period due
to child tasks continuing to run. In addition, if the
main task crashes, the kernel does not perform a shutdown--
leaving tasks as they were at the time of the crash. This
might facilitate debugging.
01/31/2018 Printing a Task object will now show the file and line number
of where it's currently waiting.
01/23/2018 A refinement in task crash reporting. Previously, all
task crashes were logged. However, Curio was also logging
crashes in all unjoined tasks. Sometimes this would result
in duplicate tracebacks. It's been modified to now only report
crashes in unjoined tasks. If joining, it's assumed that the
exception would be noticed there.
01/22/2018 Semaphore and BoundedSemaphore now exposes a read-only
property ".value" that gives the current Semaphore value.
BoundedSemaphore objects expose a ".bound" property that
gives the upper bound.
01/03/2018 The Local() object has been officially deprecated in the
documentation and will be removed at some point.
Implementing task-level locals is much more complicated
than it seems at first glance and there is discussion of a
more general solution in PEP 567. If you need this kind
of functionality, you should copy the task local code into
your own application.
12/22/2017 writeable() method of Socket removed. Use of this was highly
specialized and potentially confusing since it's not normally
needed. Use await _write_wait(sock) if you need to wait
for a socket to be writable before calling send().
12/19/2017 Slight change to Task Groups that allow tasks to spawn
other tasks into the same group. See Issue #239.
09/15/2017 Added the .readinto() method onto async files.
09/15/2017 Made the @async_thread decorator return the actual
exception that gets raised in the event of a failure.
This makes it more like a normal function call. Previously,
it was returning a TaskError exception for any crash.
That's a bit weird since the use of threads or spawning
of an external task is meant to be more implicit.
Version 0.8 : August 27, 2017
-----------------------------
07/01/2017 New time queue implementation. For timeout handling, it's
faster and far more space efficient.
05/11/2017 Fixed Issue #212, "never joined" message when add terminated
tasks to a task group and using task.result to obtain the
result.
05/11/2017 Added a new keyword argument to Task.cancel() to allow a different
exception to be raised. For example:
t.cancel(exc=SomeException)
The default exception is still TaskCancelled.
04/23/2017 Change to ssl.wrap_socket() function and method to make it an
async method. If applied to an already connected socket, it
needs to run the handshake--which needs to be async. See
Issue #206.
04/14/2017 Refinement to SignalEvent to make it work better on Windows.
It's now triggered via the same file descriptor used for SignalQueue
objects.
03/26/2017 Added a Task.interrupt() method. Cancels the task's current
blocking operation with an 'TaskInterrupted' exception.
This is really just a more nuanced form of cancellation,
similar to a timeout. However, it's understood that an
interrupted operation doesn't necessarily mean that the
task should quit. Instead, the task might coordinate with
other tasks in some way and retry later.
Version 0.7 : March 17, 2017
----------------------------
03/15/2017 The undocumented wait() function for waiting on multiple
tasks has evolved into a more general TaskGroup object.
To replicate the old wait(), do this:
t1 = spawn(coro1, args)
t2 = spawn(coro2, args)
t3 = spawn(coro3, args)
async with TaskGroup([t1,t2,t3]) as g:
first_done = await g.next_done()
await g.cancel_remaining()
TaskGroups have more functionality such as the ability
to spawn tasks, report multiple errors and more.
For example, the above code could also be written as follows:
async with TaskGroup() as g:
await g.spawn(coro1, args)
await g.spawn(coro2, args)
await g.spawn(coro3, args)
first_done = await g.next_done()
await g.cancel_remaining()
03/12/2017 Added a .cancelled attribute to the context manager used
by the ignore_after() and ignore_at() functions. It
can be used to determine if a timeout fired. For example:
async with ignore_after(10) as context:
await sleep(100)
if context.cancelled:
print('Cancelled!')
03/10/2017 SignalSet is gone. Use a SignalQueue instead. Usage
is almost identical:
async with SignalQueue(signal.SIGUSR1) as sq:
while True:
signo = await sq.get()
...
03/08/2017 More work on signal handling. New objects: SignalQueue
and SignalEvent. SignalEvents are neat:
import signal
from curio import SignalEvent
ControlC = SignalEvent(signal.SIGINT)
async def coro():
await ControlC.wait()
print("Goodbye")
03/08/2017 UniversalEvent object added. An event that's safe for use in
Curio and threads.
03/08/2017 Further improvement to signal handling. Now handled by a backing
thread. Supports signal delivery to multiple threads,
multiple instances of Curio running, asyncio, and all sorts
of stuff.
03/08/2017 Removed signal handling from the kernel up into Curio
"user-space". No existing code the uses signals should break.
03/07/2017 Refined error reporting and warnings related to Task
termination. If any non-daemonic task is garbage collected
and it hasn't been explicitly joined or cancelled, a
warning message is logged. This warning means that a task
was launched, but that nobody ever looked at its result.
If any unjoined task is garbage collected and it has
crashed with an uncaught exception, that exception is
logged as an error.
This change has a few impacts. First, if a task crashes,
but is joined, you won't get a spurious output message
showing the crash. The exception was delivered to someone
else. On the other hand, you might get more warning
messages if you've been launching tasks without paying
attention to their result.
03/06/2017 A lot of cleanup of the kernel. Moved some functionality
elsewhere. Removed unneeded traps. Removed excess
abstraction in the interest of readability. The different
trap functions in Curio are almost all quite small.
However, details concerning their execution behavior was
split across a few different places in the code and wrapped
with decorators--making it somewhat hard to piece together
how they worked looking at them in isolation. Each trap
is now basically self-contained. You can look at the code and
see exactly what it does. Each trap is also afforded more
flexibility about how it could work in the future (e.g.,
scheduling behavior, cancellation, etc.).
Debug logging features have been removed from the kernel and
placed into a new subsystem. See the file curio/debug.py.
This is still in progress.
03/05/2017 Change to how the debugging monitor is invoked. It's still
an option on the run() function. However, it is not a
option on the Kernel class itself. If you need to do that,
use this:
from curio import Kernel
from curio.monitor import Monitor
k = Kernel()
m = Monitor(k)
03/04/2017 Support for using Event.set() from a synchronous context has
been withdrawn. This was undocumented and experimental.
There are other mechanisms for achieving this. For example,
communicating through a UniversalQueue.
03/03/2017 timeout_after() and related functions now accept
coroutine functions and arguments such as this:
async def coro(x, y):
pass
async def main():
try:
await timeout_after(5, coro, 2, 3)
except TaskTimeout:
pass
03/03/2017 spawn() and run() have been made consistent in their
calling conventions compared to worker related functions.
For example:
async def coro(x, y):
pass
async def main():
t = await spawn(coro, 2, 3) # Instead of spawn(coro(2,3))
The old approach still works, but the new one will be preferred
going forward.
03/03/2017 Support for keyword arguments on many task-related worker
functions (run_in_thread, run_in_process, block_in_thread, etc.)
has been rescinded. If you need keyword arguments, use
functools.partial. For example:
await run_in_thread(partial(foo, kw=some_value))
03/03/2017 Functionality for using Queue.put() in a synchronous context
has been withdrawn. This was always experimental and undocumented.
There are better alternatives for doing this. For example, use a
UniversalQueue.
03/01/2017 Addition of an asyncio bridge. You can instantiate a separate
asyncio loop and submit tasks to it. For example:
async def coro():
# Some coroutine that runs on asyncio
...
async with AsyncioLoop() as loop:
await loop.run_asyncio(coro)
The same loop can be used by any number of Curio tasks and
requests can run concurrently. The asyncio loop runs in
a separate thread than Curio.
Original idea contributed by Laura Dickinson and adapted a
a bit into the AsyncioLoop class.
02/26/2017 Modified the gather() function so that it also cancels all tasks
if it is cancelled by timeout or other means. See issue #186.
The resulting exception has a .results attribute set with
the results of all tasks at the time of cancellation.
02/19/2017 Added new curio.zmq module for supporting ZeroMQ.
Version 0.6 : February 15, 2017
-------------------------------
02/13/2017 Added a withfd=True option to UniversalQueue. For example:
q = UniversalQueue(withfd=True)
If added, the queue internally sets up an I/O loopback
where putting items on the queue write bytes to an I/O
channel. The queue then spouts a fileno() method and
becomes pollable in other event loops. This is potentially
useful strategy for integrating Curio with GUIs and other
kinds of foreign event loops.
02/11/2017 Added a guard for proper use of asynchronous generators
involving asynchronous finalization. Must be wrapped by finalize().
For example:
async def some_generator():
...
try:
yield val
finally:
await action()
async def coro():
...
async with finalize(some_generator()) as agen:
async for item in agen:
...
Failure to do this results in a RuntimeError if an
asynchronous generator is iterated. This is not needed for
generators that don't perform finalization steps involving
async code.
02/08/2017 New Kernel.run() method implementation. It should be backwards
compatible, but there are two new ways of using it:
kernel = Kernel()
...
# Run a coroutine with a timeout/deadline applied to it
try:
result = kernel.run(coro, timeout=secs)
except TaskTimeout:
print('Timed out')
# Run all daemonic tasks through a single scheduling cycle
# with no blocking
kernel.run()
# Run all daemonic tasks through a cycle, but specify a
# timeout on internal blocking
kernel.run(timeout=secs)
02/06/2017 New aside() function for launching a Curio task in an
independent process. For example:
async def child(name, n):
print('Hello from', name)
for i in range(n):
print('name says', i)
await sleep(1)
async def main():
t = await aside(child, 'Spam', 10) # Runs in subprocess
await t.join()
run(main())
In a nutshell, aside(coro, *args, **kwargs) creates a clean
Python interpreter and invokes curio.run(coro(*args,
**kwargs)) on the supplied coroutine. The return value of
aside() is a Task object. Joining with it returns the
child exit code (normally 0). Cancelling it causes a
TaskCancelled exception to be raised in the child.
aside() does not involve a process fork or pipe. There
is no underlying communication between the child and parent
process. If you want communication, use a Channel object
or set up some other kind of networking.
02/06/2017 Some improvements to message passing and launching tasks in
subprocesses. A new Channel object makes it easy
to establish message passing between two different interpreters.
For example, here is a producer program:
# producer.py
from curio import Channel, run
async def producer(ch):
while True:
c = await ch.accept(authkey=b'peekaboo')
for i in range(10):
await c.send(i)
await c.send(None) # Sentinel
if __name__ == '__main__':
ch = Channel(('localhost', 30000))
run(producer(ch))
Here is a consumer program::
# consumer.py
from curio import Channel, run
async def consumer(ch):
c = await ch.connect(authkey=b'peekaboo')
while True:
msg = await c.recv()
if msg is None:
break
print('Got:', msg)
if __name__ == '__main__':
ch = Channel(('localhost', 30000))
run(consumer(ch))
A Channel is a lot like a socket except that it sends discrete
messages. Any picklable Python compatible object can be
passed.
02/03/2017 Fixed a few regressions in SSL sockets and the Kernel.run() method.
Version 0.5 : February 2, 2017
------------------------------
01/08/2017 Some refinements to the abide() function. You can now have it
reserve a dedicated thread. This allows it to work with things
like Condition variables. For example::
cond = threading.Condition() # Foreign condition variable
...
async with abide(code, reserve_thread=True) as c:
# c is an async-wrapper around (code)
# The following operation uses the same thread that was
# used to acquire the lock.
await c.wait()
...
abide() also prefers to use the block_in_thread() function that
makes it much more efficient when synchronizing with basic locks
and events.
01/08/2017 Some reworking of internals related to thread/process workers and
task cancellation. One issue with launching work into a thread
worker is that threads have no mechanism for cancellation. They
run fully to completion no matter what. Thus, if you perform some
work like this:
await run_in_thread(callable, args)
and the calling task gets cancelled, it's impossible to find out
what happened with the thread. Basically, it's lost to the sands
of time. However, you can now supply an optional call_on_cancel
argument to the function and use it like this:
def cancelled_result(future):
result = future.result()
...
await run_in_thread(callable, args, call_on_cancel=cancelled_result)
The call_on_cancel function is a normal synchronous
function. It receives the Future instance that was being used
to receive the result of the threaded operation. This
Future is guaranteed to have the result/exception set.
Be aware that there is no way to know when the call_on_cancel
function might be triggered. It might be far in the future.
The Curio kernel might not even be running. Thus, it's
generally not safe to make too many assumptions about it.
The only guarantee is that the call_on_cancel function is
called after a result is computed and it's called in the
same thread.
The main purpose of this feature is to have better support
for cleanup of failed synchronization operations involving
threads.
01/06/2017 New function. block_in_thread(). This works like run_in_thread()
except that it's used with the expectation that whatever operation
is being performed is likely going to block for an undetermined
time period. The underlying operation is handled more efficiently.
For each unique callable, there is at most 1 background thread
being used regardless of how many tasks might be trying to
perform the same operation. For example, suppose you were
trying to synchronize with a foreign queue:
import queue
work_q = queue.Queue() # Standard thread queue
async def worker():
while True:
item = await block_in_thread(work_q.get)
...
# Spin up a huge number of workers
for n in range(1000):
await spawn(worker())
In this code, there is one queue and 1000 worker tasks trying to
read items. The block_in_thread() function only uses 1 background
thread to handle it. If you used run_in_thread() instead, it
consume all available worker threads and you'd probably deadlock.
01/05/2017 Experimental new feature--asynchronous threads! An async thread
is an actual real-life thread where it is safe to call Curio
coroutines and use its various synchronization features.
As an example, suppose you had some code like this:
async def handler(client, addr):
async with client:
async for data in client.as_stream():
n = int(data)
time.sleep(n)
await client.sendall(b'Awake!\n')
print('Connection closed')
run(tcp_server('', 25000, handler))
Imagine that the time.sleep() function represents some kind of
synchronous, blocking operation. In the above code, it would
block the Curio kernel, prevents all other tasks from running.
Not a problem, change the handler() function to an async thread
and use the await() function like this:
from curio.thread import await, async_thread
@async_thread
def handler(client, addr):
with client:
for data in client.as_stream():
n = int(data)
time.sleep(n)
await(client.sendall(b'Awake!\n'))
print('Connection closed')
run(tcp_server('', 25000, handler))
You'll find that the above code works fine and doesn't block
the kernel.
Asynchronous threads only work in the context of Curio. They
may use all of Curio's features. Everywhere you would normally
use await, you use the await() function. with and for statements
will work with objects supporting asynchronous operation.
01/04/2017 Modified enable_cancellation() and disable_cancellation() so that
they can also be used as functions. This makes it easier to
shield a single operation. For example:
await disable_cancellation(coro())
Functionally, it is the same as this:
async with disable_cancellation():
await coro()
This is mostly a convenience feature.
01/04/2017 Two tasks that attempt to wait on the same file descriptor
now results in an exception. Closes issue #104.
01/04/2017 Modified the monitor so that killing the Curio process also
kills the monitor thread and disconnects any connected clients.
Addresses issue #108.
01/04/2017 Modified task.cancel() so that it also cancels any pending
timeout. This prevents the delivery of a timeout exception
(if any) in code that might be executing to cleanup from
task cancellation.
01/03/2017 Added a TaskCancelled exception. This is now what gets
raised when tasks are cancelled using the task.cancel()
method. It is a subclass of CancelledError. This change
makes CancelledError more of an abstract exception class
for cancellation. The TaskCancelled, TaskTimeout, and
TimeoutCancellationError exceptions are more specific
subclasses that indicates exactly what has happened.
01/02/2017 Major reorganization of how task cancellation works. There
are two major parts to it.
Kernel:
Every task has a boolean flag "task.allow_cancel" that
determines whether or not cancellation exceptions (which
includes cancellation and timeouts) can be raised in the
task or not. The flag acts as a simple mask. If set True,
a cancellation results in an exception being raised in the
task. If set False, the cancellation-related exception is
placed into "task.cancel_pending" instead. That attribute
holds onto the exception indefinitely, waiting for the task
to re-enable cancellations. Once re-enabled, the exception
is raised immediately the next time the task performs a
blocking operation.
Coroutines:
From coroutines, control of the cancellation flag is
performed by two functions which are used as context
managers:
To disable cancellation, use the following construct:
async def coro():
async with disable_cancellation():
...
await something1()
await something2()
...
await blocking_op() # Cancellation raised here (if any)
Within a disable_cancellation() block, it is illegal for
a CancelledError exception to be raised--even manually. Doing
so causes a RuntimeError.
To re-enable cancellation in specific regions of code, use
enable_cancellation() like this:
async def coro():
async with disable_cancellation():
while True:
await something1()
await something2()
async with enable_cancellation() as c:
await blocking_op()
if c.cancel_pending:
# Cancellation is pending right now. Must bail out.
break
await blocking_op() # Cancellation raised here (if any)
Use of enable_cancellation() is never allowed outside of an
enclosing disable_cancellation() block. Doing so will
cause a RuntimeError exception. Within an
enable_cancellation() block, all of the normal cancellation
rules apply. This includes raising of exceptions,
timeouts, and so forth. However, CancelledError exceptions
will never escape the block. Instead, they turn back into
a pending exception which can be checked as shown above.
Normally cancellations are only delivered on blocking operations.
If you want to force a check, you can use check_cancellation()
like this:
if await check_cancellation():
# Cancellation is pending, but not allowed right now
...
Depending on the setting of the allow_cancel flag,
check_cancellation() will either raise the cancellation
exception immediately or report that it is pending.
12/27/2016 Modified timeout_after(None) so that it leaves any prior timeout
setting in place (if any). However, if a timeout occurs, it
will appear as a TimeoutCancellationError instead of the usual
TaskTimeout exception. This is subtle, but it means that the
timeout occurred to due to an outer timeout setting. This
change makes it easier to write functions that accept optional
timeout settings. For example:
async def func(args, timeout=None):
try:
async with timeout_after(timeout):
statements
...
except TaskTimeout as e:
# Timeout specifically due to timeout setting supplied
...
except CancelledError as e:
# Function cancelled for some other reason
# (possibly an outer timeout)
...
12/23/2016 Added further information to cancellation/timeout exceptions
where partial I/O may have been performed. For readall() and
read_exactly() methods, the bytes_read attribute contains
all data read so far. The readlines() method attaches a
lines_read attribute. For write() and writelines(), a bytes_written
attribute is added to the exception. For example:
try:
data = timeout_after(5, s.readall())
except TimeoutError as e:
data = e.bytes_read # Data received prior to timeout
Here is a sending example:
try:
timeout_after(5, s.write(data))
except TimeoutError as e:
nwritten = e.bytes_written
The primary purpose of these attributes is to allow more
robust recovery in the event of cancellation.
12/23/2016 The timeout arguments to subprocess related functions have been
removed. Use the curio timeout_after() function instead to deal
with this case. For example:
try:
out = timeout_after(5, subprocess.check_output(args))
except TaskTimeout as e:
# Get the partially read output
partial_stdout = e.stdout
partial_stderr = e.stderr
... other recovery ...
If there is an exception, the stdout and stderr
attributes contain any partially read data on standard output
and standard error. These attributes mirror those present
on the CalledProcessError exception raised if there is an error.
12/03/2016 Added a parentid attribute to Task instances so you can find parent
tasks. Nothing else is done with this internally.
12/03/2016 Withdrew the pdb and crash_handler arguments to Kernel() and the
run() function. Added a pdb() method to tasks that can be used
to enter the debugger on a crashed task. High-level handling
of crashed/terminated tasks is being rethought. The old
crash_handler() callback was next to useless since no useful
actions could be performed (i.e., there was no ability to respawn
tasks or execute any kind of coroutine in response to a crash).
11/05/2016 Pulled time related functionality into the kernel as a new call.
Use the following to get the current value of the kernel clock:
await curio.clock()
Timeout related functions such as timeout_after() and ignore_after()
now rely on the kernel clock instead of using time.monotonic().
This changes consolidates all use of the clock into one place
and makes it easier (later) to reconfigure timing should it be
desired. For example, perhaps changing the scale of the clock