From dca9b6626fe7b4369f87ccc10c407f575af96aad Mon Sep 17 00:00:00 2001 From: jean-airoldie <25088801+jean-airoldie@users.noreply.github.com> Date: Tue, 28 May 2019 18:01:53 -0400 Subject: [PATCH] Add WIP Book (#38) * Added book initial book draft --- .travis.yml | 13 ++ libzmq-book/.gitignore | 1 + libzmq-book/book.toml | 5 + libzmq-book/src/SUMMARY.md | 16 +++ libzmq-book/src/about.md | 22 ++++ libzmq-book/src/advanced/README.md | 3 + libzmq-book/src/advanced/protocols.md | 40 ++++++ libzmq-book/src/basics/README.md | 3 + libzmq-book/src/basics/methods.md | 122 ++++++++++++++++++ libzmq-book/src/basics/patterns.md | 72 +++++++++++ libzmq-book/src/basics/socket.md | 18 +++ libzmq-book/src/examples/README.md | 3 + libzmq-book/src/examples/basic_req_rep.md | 11 ++ libzmq-book/src/examples/reliable_req_rep.md | 20 +++ libzmq-book/src/examples/secure_req_rep.md | 24 ++++ libzmq-book/src/glossary.md | 45 +++++++ libzmq/Cargo.toml | 10 +- libzmq/examples/auth.rs | 77 ----------- libzmq/examples/basic_req_rep.rs | 50 +++++++ libzmq/examples/curve_no_auth.yml | 42 ------ libzmq/examples/reliable_req_rep.rs | 76 +++++++++++ libzmq/examples/secure_req_rep.rs | 91 +++++++++++++ .../{curve.yml => secure_req_rep.yml} | 14 +- libzmq/src/auth/curve.rs | 14 +- libzmq/src/auth/mechanism.rs | 2 +- 25 files changed, 664 insertions(+), 130 deletions(-) create mode 100644 libzmq-book/.gitignore create mode 100644 libzmq-book/book.toml create mode 100644 libzmq-book/src/SUMMARY.md create mode 100644 libzmq-book/src/about.md create mode 100644 libzmq-book/src/advanced/README.md create mode 100644 libzmq-book/src/advanced/protocols.md create mode 100644 libzmq-book/src/basics/README.md create mode 100644 libzmq-book/src/basics/methods.md create mode 100644 libzmq-book/src/basics/patterns.md create mode 100644 libzmq-book/src/basics/socket.md create mode 100644 libzmq-book/src/examples/README.md create mode 100644 libzmq-book/src/examples/basic_req_rep.md create mode 100644 libzmq-book/src/examples/reliable_req_rep.md create mode 100644 libzmq-book/src/examples/secure_req_rep.md create mode 100644 libzmq-book/src/glossary.md delete mode 100644 libzmq/examples/auth.rs create mode 100644 libzmq/examples/basic_req_rep.rs delete mode 100644 libzmq/examples/curve_no_auth.yml create mode 100644 libzmq/examples/reliable_req_rep.rs create mode 100644 libzmq/examples/secure_req_rep.rs rename libzmq/examples/{curve.yml => secure_req_rep.yml} (88%) diff --git a/.travis.yml b/.travis.yml index 4bad71b..fc413ec 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,11 +5,24 @@ compiler: - clang env: - RUST_BACKTRACE=1 RUST_LOG=error +cache: + - cargo before_script: - rustup component add rustfmt clippy + - (test -x $HOME/.cargo/bin/cargo-install-update || cargo install cargo-update) + - (test -x $HOME/.cargo/bin/mdbook || cargo install --vers "^0.2" mdbook) + - cargo install-update -a script: - cargo test --all-targets --no-run - cargo test --all - cargo test --examples - cargo fmt --all -- --check - cargo clippy --all-targets -- -D warnings +deploy: + provider: pages + skip-cleanup: true + github-token: $GITHUB_TOKEN + local-dir: libzmq-book + keep-history: false + on: + branch: master diff --git a/libzmq-book/.gitignore b/libzmq-book/.gitignore new file mode 100644 index 0000000..7585238 --- /dev/null +++ b/libzmq-book/.gitignore @@ -0,0 +1 @@ +book diff --git a/libzmq-book/book.toml b/libzmq-book/book.toml new file mode 100644 index 0000000..eee4c66 --- /dev/null +++ b/libzmq-book/book.toml @@ -0,0 +1,5 @@ +[book] +authors = ["jean-airoldie"] +multilingual = false +src = "src" +title = "Guide" diff --git a/libzmq-book/src/SUMMARY.md b/libzmq-book/src/SUMMARY.md new file mode 100644 index 0000000..88db0dd --- /dev/null +++ b/libzmq-book/src/SUMMARY.md @@ -0,0 +1,16 @@ +# Summary + +[About](./about.md) + +* [Basics](./basics/README.md) + * [Socket](./basics/socket.md) + * [Methods](./basics/methods.md) + * [Patterns](./basics/patterns.md) +* [Advanced](./advanced/README.md) + * [Custom Protocols](./advanced/protocols.md) +* [Examples](./examples/README.md) + * [Basic Request Reply](./examples/basic_req_rep.md) + * [Reliable Request Reply](./examples/reliable_req_rep.md) + * [Secure Request Reply](./examples/secure_req_rep.md) + +[Glossary](./glossary.md) diff --git a/libzmq-book/src/about.md b/libzmq-book/src/about.md new file mode 100644 index 0000000..026b341 --- /dev/null +++ b/libzmq-book/src/about.md @@ -0,0 +1,22 @@ +# About + +## This is a WIP guide + +I believe that `ZeroMQ` is a diamond in the rough that is still a in the early +adoption stage despite being a mature project. In my opinon, the lack of traction +for the library is due to the community rather than the technology itself. + +`ZeroMQ` suffers from a significant learning curve due to its foreign concepts. +It requires the programmer to think differently about messaging. When this is +combined with a lacking documentation, it results in very significant time +requirement to properly understand how the library works and how it can be used. + +I want this guide to reduce this time requirement by explaining the key concepts of +`ZeroMQ`, give real world usage examples as well as a general vision of how it +could be use. To do so, we will use [libzmq-rs] which is a library aimed at +making `ZeroMQ` dead simple to use. + +## Reading Tips +* There is a search-bar friendly glossary at the end of the guide. + +[libzmq-rs]: https://github.com/jean-airoldie/libzmq-rs diff --git a/libzmq-book/src/advanced/README.md b/libzmq-book/src/advanced/README.md new file mode 100644 index 0000000..a004a78 --- /dev/null +++ b/libzmq-book/src/advanced/README.md @@ -0,0 +1,3 @@ +# Advanced + +Now that the basic stuff is taken care off, lets dig deeper. diff --git a/libzmq-book/src/advanced/protocols.md b/libzmq-book/src/advanced/protocols.md new file mode 100644 index 0000000..65724f7 --- /dev/null +++ b/libzmq-book/src/advanced/protocols.md @@ -0,0 +1,40 @@ +# Custom Protocols + +For two peers to be able to communicate, they must share a contract. In the +world of communication, these are called protocols. `ZeroMQ` enables +programmer to create protocols that suit their needs by removing most of the +boilerplate. + +You might have realized by now that there is no strict concept of request-reply +as a socket operation. Indeed the library does not enforce a client socket +to follow a `send` call by a `recv` call. This does't mean however that this +strict type of request-reply could not be achieved. To do so, a programmer could +easily write the following code: + +```rust +// Client side +fn request_reply(&mut self, msg: Msg) -> Result { + self.client.send(msg)?; + self.client.recv_msg()? +} + +// Server side +fn run(&mut self) -> Result<(), Error> { + loop { + let request = self.server.recv_msg()?; + let reply = self.on_request(request)?; + self.server.send(reply) + } +} +``` + +This creates an implicit contract between the client and the server. +We will disregard the error handling and timeouts for simplicity. +* The client must send one request at a time and wait for one reply. +* The server must wait for a request and send one reply. + +Since contract must be ensured at the application level, it must be properly +documentated for developpers to be able to respect it. + +`ZeroMQ` does not enforce a particular messaging protocol, instead +it offers all the tools to build one. diff --git a/libzmq-book/src/basics/README.md b/libzmq-book/src/basics/README.md new file mode 100644 index 0000000..138f234 --- /dev/null +++ b/libzmq-book/src/basics/README.md @@ -0,0 +1,3 @@ +# Basics + +This is the minimal set of concepts required to get a decent grasp of `libzmq`. diff --git a/libzmq-book/src/basics/methods.md b/libzmq-book/src/basics/methods.md new file mode 100644 index 0000000..fcd59cb --- /dev/null +++ b/libzmq-book/src/basics/methods.md @@ -0,0 +1,122 @@ +# Basics + +These are the basic methods required to use a socket. + +## Connect + +The socket [connect] method is use to connect to a peer socket bound +at an endpoint to communicate with a peer. Usually a client socket will connect +to a server socket, but it could be the other way around. + +```rust +let addr: TcpAddr = "8.8.8.8:420".try_into()?; +client.connect(addr)?; +``` + +Calling [connect] on a socket is not guaranteed to connect to the peer right +away. Usually, the actual connect call will be delayed until it is needed +(e.g. when sending a message). + +Connections in `ZeroMQ` are different from traditional connections is the +sense that they automatically handle failure. For instance, if a connection +fails because of a network error, it will be automatically reconnected if +possible. + +Furthermore, to successfully connect to a peer, the handshake corresponding to +the mechanism used must succeed. This handshake is also done in the background +and might fail for various reasons. + +## Bind + +The socket [bind] method is used to bind a local endpoint to accept connections +from peers. Usually a server socket will bind a known endpoint so that other socket +can connect to it. + +```rust +let addr: TcpAddr = "127.0.0.1:*".try_into()?; +server.bind(addr)?; +``` + +Contrairy to [connect], [bind] will attempt to bind to the endpoint straight +away. If the bind call succeeds, the socket will start to accept connections +attempts to this endpoint. + +## Send + +This a fundamental operation of a socket used to transfert messages to another +socket. To be able to send messages, a socket must implement the [SendMsg] trait. + +```rust +client.send(msg)?; +``` + +When [send] is called on a socket, it will attempt to queue the message +to its outgoing buffer. If the buffer is full, meaning it has reached the +high water mark, the operation will block. If the [send_timeout] is set +to `None`, the operation will block until the buffer can accomodate for +the message. Otherwise if a duration is specified, it attempt to queue +the message for that duration and if it fails, return [WouldBlock]. +the timeout. + +There is also the [try_send] method which will return with [WouldBlock] immediately +if it cannot queue the message. + +Queued messages are send by a background I/O thread to the peer socket. +For the messages to be actually sent two conditions must be met: +* The connection with the peer socket is up. +* The peer socket can receive messages (its incoming buffer is not full). + +Conceptually, a full outgoing buffer can mean many things: +* The connection has crashed temporarily (network error etc.) +* The peer socket has crashed and is restarting. +* The peer socket receives message slower than we can send + them (thus this is a back throttling mechanism) +* Etc. + +Many of these scenarios are conceptually indistinguishable. Therefore +the user has to decide what to do depending on the context. + +## Recv + +You guessed it, [recv] is a socket operation used to receive messages from +another socket. To be able to receive messages, a socket must implement +the [RecvMsg] trait. + +```rust +let msg = client.recv_msg()?; +``` + +Calling [recv] on a socket will attempt to extract a message from its +incoming buffer. If the incoming buffer is empty, the operation will +block until a mesage is received in the buffer. If the [recv_timeout] +is specified, it will try to extract a message from the buffer for the +given duration and return [WouldBlock] if it failed. + +There is also the [try_recv] method which, similarly to [try_send], will return +with [WouldBlock] immediately if it cannot queue the message. + +The incoming buffer receives message from the background I/O thread from the +peer socket. For the messages to be actually received two conditions must be met: +* The connection with the peer socket is up. +* The incoming buffer is not full. + +Conceptually, an empty incoming buffer can mean many things: +* The socket can receive messages faster than what the peer can send. +* The peer has no messages to send. +* The connection has a network error. +* The peer has crashed +* Etc. + +Like before, many of these scenarios are conceptually indistinguishable. +We have to decide what to do depending on the context. + +[send]: https://docs.rs/libzmq/0.1/libzmq/prelude/trait.SendMsg.html#method.send +[try_send]: https://docs.rs/libzmq/0.1/libzmq/prelude/trait.SendMsg.html#method.try_send +[SendMsg]: https://docs.rs/libzmq/0.1/libzmq/prelude/trait.SendMsg.html +[recv]: https://docs.rs/libzmq/0.1/libzmq/prelude/trait.RecvMsg.html#method.recv +[try_recv]: https://docs.rs/libzmq/0.1/libzmq/prelude/trait.RecvMsg.html#method.try_recv +[RecvMsg]: https://docs.rs/libzmq/0.1/libzmq/prelude/trait.RecvMsg.html +[connect]: https://docs.rs/libzmq/0.1/libzmq/prelude/trait.Socket.html#method.connect +[bind]: https://docs.rs/libzmq/0.1/libzmq/prelude/trait.Socket.html#method.bind +[send_timeout]: https://docs.rs/libzmq/0.1/libzmq/prelude/trait.SendMsg.html#method.send_timeout +[recv_timeout]: https://docs.rs/libzmq/0.1/libzmq/prelude/trait.RecvMsg.html#method.recv_timeout diff --git a/libzmq-book/src/basics/patterns.md b/libzmq-book/src/basics/patterns.md new file mode 100644 index 0000000..c2e4a52 --- /dev/null +++ b/libzmq-book/src/basics/patterns.md @@ -0,0 +1,72 @@ +# Patterns + +These are the most basic socket patterns in `libzmq`. + +## Client-Server + +The `Client-Server` pattern is a advanced asynchronous request-reply pattern. + +The [Server] receives messages with a unique [RoutingId] associated with a +[Client]. This [RoutingId] can be used by the [Server] to route replies to the +[Client]. +``` + <───> client +server <───> client + <───> client + +``` + +The [Client] socket receives upstream messages in a fair-queued fashion +``` +server ─┐ +server ────> client +server ─┘ +``` + +## Radio-Dish + +The `Radio-Dish` pattern is an asynchronous publish-subscribe pattern. + +The [Radio] socket send messages in a fan-out fashion to all [Dish] +that [joined] the message's [Group]. +``` + ────> dish +radio ────> dish + ────> dish +``` + +The [Dish] socket receive messages from [Group] it has [joined] in a +fair-queued fashion. +``` +radio ─┐ +radio ────> dish +radio ─┘ +``` + +## Scatter-Gather + +The `Scatter-Gather` pattern is an asynchronous pipeline pattern. + +The [Scatter] socket send messages downstream in a round-robin fashion +``` + ┌──> gather +scatter ────> gather + └──> gather +``` + +The [Gather] socket receives upstream messages in a fair-queued fashion +``` +scatter ─┐ +scatter ───> gather +scatter ─┘ +``` + +[Server]: https://docs.rs/libzmq/0.1/libzmq/struct.Server.html +[RoutingId]: https://docs.rs/libzmq/0.1/libzmq/struct.RoutingId.html +[Client]: https://docs.rs/libzmq/0.1/libzmq/struct.Client.html +[Radio]: https://docs.rs/libzmq/0.1/libzmq/struct.Radio.html +[Dish]: https://docs.rs/libzmq/0.1/libzmq/struct.Dish.html +[Group]: https://docs.rs/libzmq/0.1/libzmq/struct.Group.html +[Scatter]: https://docs.rs/libzmq/0.1/libzmq/struct.Scatter.html +[Gather]: https://docs.rs/libzmq/0.1/libzmq/struct.Gather.html +[joined]: https://docs.rs/libzmq/0.1/libzmq/struct.Dish.html#method.join diff --git a/libzmq-book/src/basics/socket.md b/libzmq-book/src/basics/socket.md new file mode 100644 index 0000000..3cda550 --- /dev/null +++ b/libzmq-book/src/basics/socket.md @@ -0,0 +1,18 @@ +# Socket + +The concept of a `socket` in `ZeroMQ` is completely novel. A `ZeroMQ` socket +differs from a traditional `TCP` socket in the following ways (but not limited to): + +* A socket sends and receives atomic messages; messages are guaranteed to + either be transmitted in their entirety, or not transmitted at all. +* A socket send and receive messages asynchronously. +* A socket socket can transmit messages over many supported transports, including `TCP`. +* Incoming and outgoing messages can be queued and transmitted asynchronously + by a background I/O thread. +* A socket can be connected to zero or more peers at any time. +* A socket can be bound to zero or more endpoints at any time. Each bound + endpoint can listen to zero or more peers. +* Peer reconnection and disconnection is handled in the background. +* Support for many authentication and encryption strategies via [Mechanism]. + +[Mechanism]: https://docs.rs/libzmq/0.1/libzmq/auth/enum.Mechanism.html diff --git a/libzmq-book/src/examples/README.md b/libzmq-book/src/examples/README.md new file mode 100644 index 0000000..68fad5d --- /dev/null +++ b/libzmq-book/src/examples/README.md @@ -0,0 +1,3 @@ +# Examples + +Here are a few examples usage of varying complexity. diff --git a/libzmq-book/src/examples/basic_req_rep.md b/libzmq-book/src/examples/basic_req_rep.md new file mode 100644 index 0000000..e7141ad --- /dev/null +++ b/libzmq-book/src/examples/basic_req_rep.md @@ -0,0 +1,11 @@ +# Basic Request Reply + +This is as simple as it gets. We have a [Server] that does some request-reply +work in a dedicated thread. We have a [Client] that sends a "ping" and gets +a "pong" back. There is no attempt at security and no attempt at error handling. +For a `INPROC` server, that might be enough. + +{{#playpen ../../../libzmq/examples/basic_req_rep.rs}} + +[Server]: https://docs.rs/libzmq/0.1/libzmq/struct.Server.html +[Client]: https://docs.rs/libzmq/0.1/libzmq/struct.Client.html diff --git a/libzmq-book/src/examples/reliable_req_rep.md b/libzmq-book/src/examples/reliable_req_rep.md new file mode 100644 index 0000000..37cf97f --- /dev/null +++ b/libzmq-book/src/examples/reliable_req_rep.md @@ -0,0 +1,20 @@ +# Reliable Request Reply + +This is a basic example when using the `TCP` transport adapting the code +from the previous `Basic Request Reply` example. + +Note that this example does not make any attempt at security. + +Since `TCP` is connection oriented transport, we have to take in account that +the connection might fail at any time. We use heartbeating to detect failure +but also `send` and `recv` timeouts to prevent blocking forever. + +In this example, the server is protected against failures since it will drop +messages if it is unable to route them before `send_timeout` expires (`WouldBlock`), +or it detects that the peer disconnected via the heartbeats (`HostUnreachable`). + +The client in this case will simply fail if it unable to send a request before the +`send_timeout` or unable to receive a reply before the `recv_timeout` (`WouldBlock`). +The client might choose to retry later or connect to another server etc. + +{{#playpen ../../../libzmq/examples/reliable_req_rep.rs}} diff --git a/libzmq-book/src/examples/secure_req_rep.md b/libzmq-book/src/examples/secure_req_rep.md new file mode 100644 index 0000000..210f497 --- /dev/null +++ b/libzmq-book/src/examples/secure_req_rep.md @@ -0,0 +1,24 @@ +# Secure Request Reply + +The previous example did not offer neither authentication nor encryption. +For a public `TCP` connection, its a must. Let's fix that by adapting the +previous example. + +However, this time we will use an external configuration file to get +rid of all the boilerplate. This will also allows our application +to run indepently of the socket configuration. + +## Config File + +In this case we used `yaml` configuration file, but any file format +supported by `Serde` will work (as long as it supports typed enums). +```yml +{{#include ../../../libzmq/examples/secure_req_rep.yml}} +``` + +## The code + +Aside from the additionnal logic for reading the config file, +the code is now simpler than before. + +{{#playpen ../../../libzmq/examples/secure_req_rep.rs}} diff --git a/libzmq-book/src/glossary.md b/libzmq-book/src/glossary.md new file mode 100644 index 0000000..fd407e0 --- /dev/null +++ b/libzmq-book/src/glossary.md @@ -0,0 +1,45 @@ +# Glossary +Some high level definitions. + +## Endpoint +A endpoint is a rendez-vous address for a specified transport. The syntax +of the address depends on the nature of the transport. For instance +a `TcpAddr` is an endpoint over the `TCP` transport. + +## Transport +A protocol to transfert data. Could be a network protocol, such as `TCP`, +could be a inter-thread protocol, such as `INPROC`, etc. + +## Connection +Connections in `ZeroMQ` are different from traditional connections is the +sense that they automatically handle failure. For instance, if a connection +fails because of a network error, it will be automatically reconnected if +possible. Thus sockets should not worry about the state of a given connection. + +## Message +An atomic arbitrary set of bytes owned by the `ZeroMQ` engine. `ZeroMQ` does +not know how to interpret these bytes, only the user does. Messages are +the units that are transferred between sockets. + +## Socket +A `ZeroMQ` construct used to send and receive messages using connections +accros endpoints. The specific behavior of the socket depends on its type. + +## High Water Mark +The message limit in the incoming or outgoing buffer. If the incoming +buffer has reached this limit, the socket will stop receiving messages +in the background. If the outgoing buffer has reached this limit, attempting +to queue a message will block the calling thread. Conceptually, this is a +socket's back throttling mechanism. + +## Context +A `ZeroMQ` context is a session that keeps track of all the sockets, +the messages, the async threads and the internal queries. + +## Mechanism +A specific protocol used by sockets to authenticate and encrypt traffic. + +## Mute State +A socket that is in mute state is unable to queue and receive messages. +This is likely because it has no peers. The condition for the mute state to +occur depends on the socket type. diff --git a/libzmq/Cargo.toml b/libzmq/Cargo.toml index c5ee405..deaf95e 100644 --- a/libzmq/Cargo.toml +++ b/libzmq/Cargo.toml @@ -49,7 +49,13 @@ name = "bench_main" harness = false [[example]] -name = "auth" +name = "gen_curve_cert" [[example]] -name = "gen_curve_cert" +name = "basic_req_rep" + +[[example]] +name = "reliable_req_rep" + +[[example]] +name = "secure_req_rep" diff --git a/libzmq/examples/auth.rs b/libzmq/examples/auth.rs deleted file mode 100644 index a9d3155..0000000 --- a/libzmq/examples/auth.rs +++ /dev/null @@ -1,77 +0,0 @@ -use libzmq::{config::*, prelude::*, Ctx}; - -use serde::{Deserialize, Serialize}; - -use std::{ - fs::File, - io::Read, - path::{Path, PathBuf}, -}; - -const CONFIG_FILES: &[&str] = &["curve.yml", "curve_no_auth.yml"]; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Config { - auth: AuthConfig, - client: ClientConfig, - server: ServerConfig, -} - -fn read_file(name: &Path) -> std::io::Result> { - let mut file = File::open(name)?; - let mut buf = Vec::new(); - file.read_to_end(&mut buf)?; - Ok(buf) -} - -fn run(path: &Path) -> Result<(), failure::Error> { - // We create a context since we want an isolated `AuthServer` for - // our configuration. - let ctx = Ctx::new(); - - let config: Config = - serde_yaml::from_slice(&read_file(path).unwrap()).unwrap(); - dbg!(&config); - - // Create a `AuthClient` and transmits the configuration to the - // background `AuthServer` thread. - let _ = config.auth.with_ctx(&ctx)?; - - let client = config.client.with_ctx(&ctx)?; - let server = config.server.with_ctx(&ctx)?; - - // In case the server binds to a system defined port so as - // prevent potential conflicts with the host machine. - let bound = server.last_endpoint()?; - client.connect(bound)?; - - // Do some request reply work. - client.send("hello")?; - - let msg = server.recv_msg()?; - server.send(msg)?; - - let _ = client.recv_msg()?; - - Ok(()) -} - -fn main() -> Result<(), failure::Error> { - for filename in CONFIG_FILES { - let path = PathBuf::from("examples").join(filename); - run(&path)?; - } - - Ok(()) -} - -// Make sure that the examples properly run since the config files are dynamic. -#[cfg(test)] -mod tests { - use super::main; - - #[test] - fn main_runs() { - main().unwrap(); - } -} diff --git a/libzmq/examples/basic_req_rep.rs b/libzmq/examples/basic_req_rep.rs new file mode 100644 index 0000000..8a2f17a --- /dev/null +++ b/libzmq/examples/basic_req_rep.rs @@ -0,0 +1,50 @@ +use libzmq::{prelude::*, *}; + +use std::thread; + +fn main() -> Result<(), failure::Error> { + let addr: InprocAddr = InprocAddr::new_unique(); + + let server = ServerBuilder::new().bind(&addr).build()?; + + // Spawn the server thread. + let handle = thread::spawn(move || -> Result<(), Error> { + loop { + let request = server.recv_msg()?; + assert_eq!(request.to_str(), Ok("ping")); + + // Retrieve the routing_id to route the reply to the client. + let id = request.routing_id().unwrap(); + let mut reply: Msg = "pong".into(); + reply.set_routing_id(id); + // We cast the Error to Error<()>. This drops the Msg. + server.send(reply).map_err(Error::cast)?; + } + }); + + let client = ClientBuilder::new().connect(addr).build()?; + + // Do some request-reply work. + client.send("ping")?; + let msg = client.recv_msg()?; + assert_eq!(msg.to_str(), Ok("pong")); + + // This will cause the server to fail with `CtxTerminated`. + Ctx::global().shutdown(); + + // Join with the thread. + let err = handle.join().unwrap().unwrap_err(); + assert_eq!(err.kind(), ErrorKind::CtxTerminated); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::main; + + #[test] + fn main_runs() { + main().unwrap(); + } +} diff --git a/libzmq/examples/curve_no_auth.yml b/libzmq/examples/curve_no_auth.yml deleted file mode 100644 index 3e3641a..0000000 --- a/libzmq/examples/curve_no_auth.yml +++ /dev/null @@ -1,42 +0,0 @@ -# The curve keys where generated by running: -# `$ cargo run --example gen_curve_cert` - -auth: - # Disable curve authentication. Traffic is still encrypted. - curve_auth: false - -client: - # In a real life scenario the server would have a known addr. - #connect: - # - tcp: "127.0.0.1:3000" - heartbeat_interval: 3s - heartbeat_timeout: 3s - heartbeat_ttl: 3s - send_high_water_mark: 10 - send_timeout: 300ms - recv_high_water_mark: 100 - recv_timeout: 300ms - mechanism: - curve_client: - # We don't specify a client certificate so it will be generated instead. - # In this case, putting the `client` field is optional but more explicit. - client: ~ - # This is the server's public key. - server: "et189NB9uJC7?J+XU8JRhCbF?gOP9+o%kli=y2b8" - -server: - # Here we use a system defined port so as to not conflict with the host - # machine. In a real life scenario we would have a port available. - bind: - - tcp: "127.0.0.1:*" - heartbeat_interval: 3s - heartbeat_timeout: 3s - heartbeat_ttl: 3s - send_high_water_mark: 10 - send_timeout: 300ms - recv_high_water_mark: 100 - recv_timeout: 300ms - mechanism: - curve_server: - secret: "iaoRiIVA^VgV:f4a<@{8K{cP62cE:dh=4:oY+^l(" - diff --git a/libzmq/examples/reliable_req_rep.rs b/libzmq/examples/reliable_req_rep.rs new file mode 100644 index 0000000..cd64bf3 --- /dev/null +++ b/libzmq/examples/reliable_req_rep.rs @@ -0,0 +1,76 @@ +use libzmq::{prelude::*, *}; + +use std::{convert::TryInto, thread, time::Duration}; + +fn main() -> Result<(), failure::Error> { + // We use a system assigned port here. + let addr: TcpAddr = "127.0.0.1:*".try_into()?; + let duration = Duration::from_millis(300); + + let hb = Heartbeat::new(duration) + .add_timeout(3 * duration) + .add_ttl(3 * duration); + + let server = ServerBuilder::new() + .bind(addr) + .send_timeout(duration) + .heartbeat(&hb) + .build()?; + + // Retrieve the assigned port. + let bound = server.last_endpoint()?.unwrap(); + + // Spawn the server thread. In a real application, this + // would be on another node. + let handle = thread::spawn(move || -> Result<(), Error> { + use ErrorKind::*; + loop { + let request = server.recv_msg()?; + assert_eq!(request.to_str(), Ok("ping")); + + // Retrieve the routing_id to route the reply to the client. + let id = request.routing_id().unwrap(); + let mut reply: Msg = "pong".into(); + reply.set_routing_id(id); + + if let Err(err) = server.send(reply) { + match err.kind() { + // Cannot route msg, drop it. + WouldBlock | HostUnreachable => (), + _ => return Err(err.cast()), + } + } + } + }); + + let client = ClientBuilder::new() + .connect(bound) + .recv_timeout(duration) + .send_timeout(duration) + .heartbeat(hb) + .build()?; + + // Do some request-reply work. + client.send("ping")?; + let msg = client.recv_msg()?; + assert_eq!(msg.to_str(), Ok("pong")); + + // This will cause the server to fail with `CtxTerminated`. + Ctx::global().shutdown(); + + // Join with the thread. + let err = handle.join().unwrap().unwrap_err(); + assert_eq!(err.kind(), ErrorKind::CtxTerminated); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::main; + + #[test] + fn main_runs() { + main().unwrap(); + } +} diff --git a/libzmq/examples/secure_req_rep.rs b/libzmq/examples/secure_req_rep.rs new file mode 100644 index 0000000..ee40bbd --- /dev/null +++ b/libzmq/examples/secure_req_rep.rs @@ -0,0 +1,91 @@ +use libzmq::{config::*, prelude::*, *}; + +use serde::{Deserialize, Serialize}; + +use std::{ + fs::File, + io::Read, + path::{Path, PathBuf}, + thread, +}; + +const CONFIG_FILE: &str = "secure_req_rep.yml"; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Config { + auth: AuthConfig, + client: ClientConfig, + server: ServerConfig, +} + +fn read_file(name: &Path) -> std::io::Result> { + let mut file = File::open(name)?; + let mut buf = Vec::new(); + file.read_to_end(&mut buf)?; + Ok(buf) +} + +fn main() -> Result<(), failure::Error> { + let path = PathBuf::from("examples").join(CONFIG_FILE); + + let config: Config = + serde_yaml::from_slice(&read_file(&path).unwrap()).unwrap(); + + // Configure the `AuthServer`. We won't need the returned `AuthClient`. + let _ = config.auth.build()?; + + // Configure our two sockets. + let server = config.server.build()?; + let client = config.client.build()?; + + // Once again we used a system assigned port for our server. + let bound = server.last_endpoint()?; + client.connect(bound)?; + + // Spawn the server thread. In a real application, this + // would be on another node. + let handle = thread::spawn(move || -> Result<(), Error> { + use ErrorKind::*; + loop { + let request = server.recv_msg()?; + assert_eq!(request.to_str(), Ok("ping")); + + // Retrieve the routing_id to route the reply to the client. + let id = request.routing_id().unwrap(); + let mut reply: Msg = "pong".into(); + reply.set_routing_id(id); + + if let Err(err) = server.send(reply) { + match err.kind() { + // Cannot route msg, drop it. + WouldBlock | HostUnreachable => (), + _ => return Err(err.cast()), + } + } + } + }); + + // Do some request-reply work. + client.send("ping")?; + let msg = client.recv_msg()?; + assert_eq!(msg.to_str(), Ok("pong")); + + // This will cause the server to fail with `CtxTerminated`. + Ctx::global().shutdown(); + + // Join with the thread. + let err = handle.join().unwrap().unwrap_err(); + assert_eq!(err.kind(), ErrorKind::CtxTerminated); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::main; + + #[test] + fn main_runs() { + main().unwrap(); + } +} diff --git a/libzmq/examples/curve.yml b/libzmq/examples/secure_req_rep.yml similarity index 88% rename from libzmq/examples/curve.yml rename to libzmq/examples/secure_req_rep.yml index fc5886c..139e322 100644 --- a/libzmq/examples/curve.yml +++ b/libzmq/examples/secure_req_rep.yml @@ -11,9 +11,10 @@ client: # In a real life scenario the server would have a known addr. #connect: # - tcp: "127.0.0.1:3000" - heartbeat_interval: 3s - heartbeat_timeout: 3s - heartbeat_ttl: 3s + heartbeat: + interval: 1s + timeout: 3s + ttl: 3s send_high_water_mark: 10 send_timeout: 300ms recv_high_water_mark: 100 @@ -31,9 +32,10 @@ server: # machine. In a real life scenario we would have a port available. bind: - tcp: "127.0.0.1:*" - heartbeat_interval: 3s - heartbeat_timeout: 3s - heartbeat_ttl: 3s + heartbeat: + interval: 1s + timeout: 3s + ttl: 3s send_high_water_mark: 10 send_timeout: 300ms recv_high_water_mark: 100 diff --git a/libzmq/src/auth/curve.rs b/libzmq/src/auth/curve.rs index f08caa8..e4e06be 100644 --- a/libzmq/src/auth/curve.rs +++ b/libzmq/src/auth/curve.rs @@ -124,7 +124,7 @@ fn z85_decode(input: &str) -> Result, CurveError> { /// /// [`Z85`]: https://rfc.zeromq.org/spec:32/Z85/ /// [`CurveCert::new_unique()`]: struct.CurveCert.html#method.new_unique -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(transparent)] pub struct CurvePublicKey { inner: CurveKey, @@ -172,6 +172,14 @@ impl fmt::Display for CurvePublicKey { } } +impl fmt::Debug for CurvePublicKey { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("CurvePublicKey") + .field("key", &self.as_str()) + .finish() + } +} + impl From for CurveKey { fn from(public: CurvePublicKey) -> Self { public.inner @@ -274,7 +282,9 @@ impl CurveSecretKey { impl fmt::Debug for CurveSecretKey { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "") + f.debug_struct("CurvePublicKey") + .field("key", &"") + .finish() } } diff --git a/libzmq/src/auth/mechanism.rs b/libzmq/src/auth/mechanism.rs index 4ab0b12..8cc40fe 100644 --- a/libzmq/src/auth/mechanism.rs +++ b/libzmq/src/auth/mechanism.rs @@ -109,7 +109,7 @@ impl CurveClientCreds { } } - /// Assigns as client certificate to the credentials. + /// Associates a client `CurveCert` with the credentials. pub fn add_cert(mut self, client: C) -> Self where C: Into,