diff --git a/NEWS b/NEWS index c0553d7e3..7e8c8ec60 100755 --- a/NEWS +++ b/NEWS @@ -1,9 +1,11 @@ -CZMQ version 1.3.0 (stable), released on 2012/xx/xx +CZMQ version 1.3.1 (stable), released on 2012/10/27 =================================================== Changes ------- +* See git log. + CZMQ version 1.2.0 (stable), released on 2012/08/06 =================================================== diff --git a/doc/Makefile.am b/doc/Makefile.am index 709ffba62..3b82950b0 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -9,6 +9,7 @@ MAN7 = czmq.7 \ zlist.7 \ zloop.7 \ zmsg.7 \ + zmutex.7 \ zsocket.7 \ zsockopt.7 \ zstr.7 \ diff --git a/doc/zclock.txt b/doc/zclock.txt index 112ebb10b..faf2f6696 100644 --- a/doc/zclock.txt +++ b/doc/zclock.txt @@ -23,7 +23,7 @@ CZMQ_EXPORT void // Self test of this class int - zclock_test (Bool verbose); + zclock_test (bool verbose); ---- DESCRIPTION diff --git a/doc/zctx.txt b/doc/zctx.txt index 28fa582dc..41b5a5728 100644 --- a/doc/zctx.txt +++ b/doc/zctx.txt @@ -39,7 +39,7 @@ CZMQ_EXPORT void * // Self test of this class int - zctx_test (Bool verbose); + zctx_test (bool verbose); // Global signal indicator, TRUE when user presses Ctrl-C or the process // gets a SIGTERM signal. diff --git a/doc/zfile.txt b/doc/zfile.txt index 2c8890109..223c98a7c 100644 --- a/doc/zfile.txt +++ b/doc/zfile.txt @@ -26,7 +26,7 @@ CZMQ_EXPORT ssize_t // Self test of this class int - zfile_test (Bool verbose); + zfile_test (bool verbose); ---- DESCRIPTION diff --git a/doc/zframe.txt b/doc/zframe.txt index 08d9fd8a4..d8a3d85cd 100644 --- a/doc/zframe.txt +++ b/doc/zframe.txt @@ -65,7 +65,7 @@ CZMQ_EXPORT char * zframe_strdup (zframe_t *self); // Return TRUE if frame body is equal to string, excluding terminator -CZMQ_EXPORT Bool +CZMQ_EXPORT bool zframe_streq (zframe_t *self, const char *string); // Return frame zero copy indicator (1 or 0) @@ -78,7 +78,7 @@ CZMQ_EXPORT int // Return TRUE if two frames have identical size and data // If either frame is NULL, equality is always false. -CZMQ_EXPORT Bool +CZMQ_EXPORT bool zframe_eq (zframe_t *self, zframe_t *other); // Print contents of frame to stderr @@ -91,7 +91,7 @@ CZMQ_EXPORT void // Self test of this class int - zframe_test (Bool verbose); + zframe_test (bool verbose); ---- DESCRIPTION diff --git a/doc/zhash.txt b/doc/zhash.txt index 70158e1aa..5b7b756a8 100644 --- a/doc/zhash.txt +++ b/doc/zhash.txt @@ -62,6 +62,10 @@ CZMQ_EXPORT size_t // Make copy of hash table CZMQ_EXPORT zhash_t * zhash_dup (zhash_t *self); + +// Return keys for items in table +CZMQ_EXPORT zlist_t * + zhash_keys (zhash_t *self); // Apply function to each item in the hash table. Items are iterated in no // defined order. Stops if callback function returns non-zero and returns @@ -149,7 +153,7 @@ EXAMPLE // Check that the queue is robust against random usage struct { char name [100]; - Bool exists; + bool exists; } testset [200]; memset (testset, 0, sizeof (testset)); int testmax = 200, testnbr, iteration; diff --git a/doc/zloop.txt b/doc/zloop.txt index 75dbf7167..e53b34d23 100644 --- a/doc/zloop.txt +++ b/doc/zloop.txt @@ -45,7 +45,7 @@ CZMQ_EXPORT int // Set verbose tracing of reactor on/off CZMQ_EXPORT void - zloop_set_verbose (zloop_t *self, Bool verbose); + zloop_set_verbose (zloop_t *self, bool verbose); // Start the reactor. Takes control of the thread and returns when the 0MQ // context is terminated or the process is interrupted, or any event handler @@ -56,7 +56,7 @@ CZMQ_EXPORT int // Self test of this class int - zloop_test (Bool verbose); + zloop_test (bool verbose); ---- DESCRIPTION diff --git a/doc/zmsg.txt b/doc/zmsg.txt index 2f15ff526..d3b45a564 100644 --- a/doc/zmsg.txt +++ b/doc/zmsg.txt @@ -122,7 +122,7 @@ CZMQ_EXPORT void // Self test of this class int - zmsg_test (Bool verbose); + zmsg_test (bool verbose); ---- DESCRIPTION diff --git a/doc/zmutex.txt b/doc/zmutex.txt new file mode 100644 index 000000000..94f5d73e2 --- /dev/null +++ b/doc/zmutex.txt @@ -0,0 +1,54 @@ +zmutex(7) +========= + +NAME +---- +zmutex - working with mutexes + +SYNOPSIS +-------- +---- +// Create a new mutex container +CZMQ_EXPORT zmutex_t * + zmutex_new (void); + +// Destroy a mutex container +CZMQ_EXPORT void + zmutex_destroy (zmutex_t **self_p); + +// Lock mutex +CZMQ_EXPORT void + zmutex_lock (zmutex_t *self); + +// Unlock mutex +CZMQ_EXPORT void + zmutex_unlock (zmutex_t *self); + +// Self test of this class +int + zmutex_test (bool verbose); +---- + +DESCRIPTION +----------- + +The zmutex class provides a portable wrapper for mutexes. Please do not +use this class to do multi-threading. It is for the rare case where you +absolutely need thread-safe global state. This should happen in system +code only. DO NOT USE THIS TO SHARE SOCKETS BETWEEN THREADS, OR DARK +THINGS WILL HAPPEN TO YOUR CODE. + + +EXAMPLE +------- +.From zmutex_test method +---- + zmutex_t *mutex = zmutex_new (); + zmutex_lock (mutex); + zmutex_unlock (mutex); + zmutex_destroy (&mutex); +---- + +SEE ALSO +-------- +linkczmq:czmq[7] diff --git a/doc/zsocket.txt b/doc/zsocket.txt index e842d72cd..114502f50 100644 --- a/doc/zsocket.txt +++ b/doc/zsocket.txt @@ -45,7 +45,7 @@ CZMQ_EXPORT int // Poll for input events on the socket. Returns TRUE if there is input // ready on the socket, else FALSE. -CZMQ_EXPORT Bool +CZMQ_EXPORT bool zsocket_poll (void *socket, int msecs); // Returns socket type as printable constant string @@ -54,7 +54,7 @@ CZMQ_EXPORT char * // Self test of this class int - zsocket_test (Bool verbose); + zsocket_test (bool verbose); ---- DESCRIPTION diff --git a/doc/zsockopt.txt b/doc/zsockopt.txt index 39769592f..61e565742 100644 --- a/doc/zsockopt.txt +++ b/doc/zsockopt.txt @@ -117,7 +117,7 @@ void zsocket_set_hwm (void *zocket, int hwm); #endif // Self test of this class -int zsockopt_test (Bool verbose); +int zsockopt_test (bool verbose); ---- DESCRIPTION @@ -137,24 +137,31 @@ EXAMPLE assert (ctx); void *zocket; #if (ZMQ_VERSION_MAJOR == 2) +# if defined (ZMQ_HWM) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_hwm (zocket, 1); assert (zsocket_hwm (zocket) == 1); zsocket_hwm (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_SWAP) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_swap (zocket, 1); assert (zsocket_swap (zocket) == 1); zsocket_swap (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_AFFINITY) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_affinity (zocket, 1); assert (zsocket_affinity (zocket) == 1); zsocket_affinity (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_IDENTITY) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_identity (zocket, "test"); @@ -162,139 +169,189 @@ EXAMPLE assert (identity); free (identity); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RATE) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_rate (zocket, 1); assert (zsocket_rate (zocket) == 1); zsocket_rate (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RECOVERY_IVL) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_recovery_ivl (zocket, 1); assert (zsocket_recovery_ivl (zocket) == 1); zsocket_recovery_ivl (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RECOVERY_IVL_MSEC) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_recovery_ivl_msec (zocket, 1); assert (zsocket_recovery_ivl_msec (zocket) == 1); zsocket_recovery_ivl_msec (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_MCAST_LOOP) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_mcast_loop (zocket, 1); assert (zsocket_mcast_loop (zocket) == 1); zsocket_mcast_loop (zocket); zsocket_destroy (ctx, zocket); +# endif # if (ZMQ_VERSION_MINOR == 2) +# if defined (ZMQ_RCVTIMEO) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_rcvtimeo (zocket, 1); assert (zsocket_rcvtimeo (zocket) == 1); zsocket_rcvtimeo (zocket); zsocket_destroy (ctx, zocket); +# endif # endif # if (ZMQ_VERSION_MINOR == 2) +# if defined (ZMQ_SNDTIMEO) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_sndtimeo (zocket, 1); assert (zsocket_sndtimeo (zocket) == 1); zsocket_sndtimeo (zocket); zsocket_destroy (ctx, zocket); +# endif # endif +# if defined (ZMQ_SNDBUF) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_sndbuf (zocket, 1); assert (zsocket_sndbuf (zocket) == 1); zsocket_sndbuf (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RCVBUF) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_rcvbuf (zocket, 1); assert (zsocket_rcvbuf (zocket) == 1); zsocket_rcvbuf (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_LINGER) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_linger (zocket, 1); assert (zsocket_linger (zocket) == 1); zsocket_linger (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RECONNECT_IVL) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_reconnect_ivl (zocket, 1); assert (zsocket_reconnect_ivl (zocket) == 1); zsocket_reconnect_ivl (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RECONNECT_IVL_MAX) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_reconnect_ivl_max (zocket, 1); assert (zsocket_reconnect_ivl_max (zocket) == 1); zsocket_reconnect_ivl_max (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_BACKLOG) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_backlog (zocket, 1); assert (zsocket_backlog (zocket) == 1); zsocket_backlog (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_SUBSCRIBE) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_subscribe (zocket, "test"); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_UNSUBSCRIBE) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_unsubscribe (zocket, "test"); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_TYPE) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_type (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RCVMORE) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_rcvmore (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_FD) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_fd (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_EVENTS) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_events (zocket); zsocket_destroy (ctx, zocket); +# endif #endif #if (ZMQ_VERSION_MAJOR == 3) +# if defined (ZMQ_TYPE) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_type (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_SNDHWM) zocket = zsocket_new (ctx, ZMQ_PUB); assert (zocket); zsocket_set_sndhwm (zocket, 1); assert (zsocket_sndhwm (zocket) == 1); zsocket_sndhwm (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RCVHWM) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_rcvhwm (zocket, 1); assert (zsocket_rcvhwm (zocket) == 1); zsocket_rcvhwm (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_AFFINITY) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_affinity (zocket, 1); assert (zsocket_affinity (zocket) == 1); zsocket_affinity (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_SUBSCRIBE) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_subscribe (zocket, "test"); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_UNSUBSCRIBE) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_unsubscribe (zocket, "test"); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_IDENTITY) zocket = zsocket_new (ctx, ZMQ_DEALER); assert (zocket); zsocket_set_identity (zocket, "test"); @@ -302,114 +359,155 @@ EXAMPLE assert (identity); free (identity); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RATE) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_rate (zocket, 1); assert (zsocket_rate (zocket) == 1); zsocket_rate (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RECOVERY_IVL) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_recovery_ivl (zocket, 1); assert (zsocket_recovery_ivl (zocket) == 1); zsocket_recovery_ivl (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_SNDBUF) zocket = zsocket_new (ctx, ZMQ_PUB); assert (zocket); zsocket_set_sndbuf (zocket, 1); assert (zsocket_sndbuf (zocket) == 1); zsocket_sndbuf (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RCVBUF) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_rcvbuf (zocket, 1); assert (zsocket_rcvbuf (zocket) == 1); zsocket_rcvbuf (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_LINGER) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_linger (zocket, 1); assert (zsocket_linger (zocket) == 1); zsocket_linger (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RECONNECT_IVL) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_reconnect_ivl (zocket, 1); assert (zsocket_reconnect_ivl (zocket) == 1); zsocket_reconnect_ivl (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RECONNECT_IVL_MAX) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_reconnect_ivl_max (zocket, 1); assert (zsocket_reconnect_ivl_max (zocket) == 1); zsocket_reconnect_ivl_max (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_BACKLOG) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_backlog (zocket, 1); assert (zsocket_backlog (zocket) == 1); zsocket_backlog (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_MAXMSGSIZE) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_maxmsgsize (zocket, 1); assert (zsocket_maxmsgsize (zocket) == 1); zsocket_maxmsgsize (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_MULTICAST_HOPS) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_multicast_hops (zocket, 1); assert (zsocket_multicast_hops (zocket) == 1); zsocket_multicast_hops (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RCVTIMEO) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_rcvtimeo (zocket, 1); assert (zsocket_rcvtimeo (zocket) == 1); zsocket_rcvtimeo (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_SNDTIMEO) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_sndtimeo (zocket, 1); assert (zsocket_sndtimeo (zocket) == 1); zsocket_sndtimeo (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_IPV4ONLY) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_set_ipv4only (zocket, 1); assert (zsocket_ipv4only (zocket) == 1); zsocket_ipv4only (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_DELAY_ATTACH_ON_CONNECT) zocket = zsocket_new (ctx, ZMQ_PUB); assert (zocket); zsocket_set_delay_attach_on_connect (zocket, 1); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_ROUTER_MANDATORY) zocket = zsocket_new (ctx, ZMQ_ROUTER); assert (zocket); zsocket_set_router_mandatory (zocket, 1); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_XPUB_VERBOSE) zocket = zsocket_new (ctx, ZMQ_XPUB); assert (zocket); zsocket_set_xpub_verbose (zocket, 1); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_RCVMORE) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_rcvmore (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_FD) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_fd (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_EVENTS) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); zsocket_events (zocket); zsocket_destroy (ctx, zocket); +# endif +# if defined (ZMQ_LAST_ENDPOINT) zocket = zsocket_new (ctx, ZMQ_SUB); assert (zocket); char *last_endpoint = zsocket_last_endpoint (zocket); assert (last_endpoint); free (last_endpoint); zsocket_destroy (ctx, zocket); +# endif zocket = zsocket_new (ctx, ZMQ_SUB); zsocket_set_hwm (zocket, 1); diff --git a/doc/zstr.txt b/doc/zstr.txt index 977189369..4b57e066c 100644 --- a/doc/zstr.txt +++ b/doc/zstr.txt @@ -34,7 +34,7 @@ CZMQ_EXPORT int // Self test of this class int - zstr_test (Bool verbose); + zstr_test (bool verbose); ---- DESCRIPTION diff --git a/doc/zthread.txt b/doc/zthread.txt index 19a8f9920..7f96c1ccd 100644 --- a/doc/zthread.txt +++ b/doc/zthread.txt @@ -29,7 +29,7 @@ CZMQ_EXPORT void * // Self test of this class int - zthread_test (Bool verbose); + zthread_test (bool verbose); ---- DESCRIPTION diff --git a/src/zsocket.c b/src/zsocket.c index 79a40c7b1..bef9fc2bb 100644 --- a/src/zsocket.c +++ b/src/zsocket.c @@ -61,6 +61,13 @@ zsocket_destroy (zctx_t *ctx, void *self) } +// Static mutex used only in the following code +static zmutex_t *s_mutex = NULL; +static void s_mutex_free (void) +{ + zmutex_destroy (&s_mutex); +} + // -------------------------------------------------------------------------- // Bind a socket to a formatted endpoint. If the port is specified as // '*', binds to any free port from ZSOCKET_DYNFROM to ZSOCKET_DYNTO @@ -70,6 +77,19 @@ zsocket_destroy (zctx_t *ctx, void *self) int zsocket_bind (void *self, const char *format, ...) { + // We avoid reusing ephemeral ports so that peers which have + // received an ephemeral port to connect to won't connect to + // old peers and in fact connect to new ones which have bound + // to that old port by misfortune. For the rare case that + // you do lots of binds/unbinds in one process, we use a mutex + // to protect against collisions. + // + static int dynport = ZSOCKET_DYNFROM; + if (!s_mutex) { + s_mutex = zmutex_new (); + atexit (s_mutex_free); + } + // Ephemeral port needs 4 additional characters char endpoint [256 + 4]; va_list argptr; @@ -82,13 +102,21 @@ zsocket_bind (void *self, const char *format, ...) if (endpoint [endpoint_size - 2] == ':' && endpoint [endpoint_size - 1] == '*') { rc = -1; // Unless successful - int port; - for (port = ZSOCKET_DYNFROM; port < ZSOCKET_DYNTO; port++) { + int port = dynport; + while (true) { + // Try to bind on the next plausible port sprintf (endpoint + endpoint_size - 1, "%d", port); if (zmq_bind (self, endpoint) == 0) { rc = port; break; } + // Failed, so increment port and try again + zmutex_lock (s_mutex); + dynport++; + if (dynport >= ZSOCKET_DYNTO) + dynport = ZSOCKET_DYNFROM; + port = dynport; + zmutex_unlock (s_mutex); } } else {