From df610b804569681f00a032b5b79c8edd8e7c046f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franti=C5=A1ek=20Hanzl=C3=ADk?= Date: Sun, 21 Aug 2022 20:18:54 +0200 Subject: [PATCH] attempt #0 --- .envrc | 7 ++ .gitignore | 2 + .vscode/settings.json | 3 + Cargo.toml | 6 +- example-service/Cargo.toml | 10 +-- example-service/src/lib.rs | 6 -- flake.lock | 164 ++++++++++++++++++++++++++++++++++++ flake.nix | 45 ++++++++++ nix/rust.nix | 27 ++++++ tarpc/Cargo.toml | 34 +++----- tarpc/src/client.rs | 17 +--- tarpc/src/context.rs | 52 +----------- tarpc/src/lib.rs | 6 -- tarpc/src/server.rs | 31 ++----- tarpc/src/server/testing.rs | 1 - vscode-env.nix | 10 +++ 16 files changed, 288 insertions(+), 133 deletions(-) create mode 100644 .envrc create mode 100644 .vscode/settings.json create mode 100644 flake.lock create mode 100644 flake.nix create mode 100644 nix/rust.nix create mode 100644 vscode-env.nix diff --git a/.envrc b/.envrc new file mode 100644 index 00000000..d61b8fbc --- /dev/null +++ b/.envrc @@ -0,0 +1,7 @@ +if ! has nix_direnv_version || ! nix_direnv_version 2.1.1; then + source_url "https://raw.githubusercontent.com/nix-community/nix-direnv/2.1.1/direnvrc" "sha256-b6qJ4r34rbE23yWjMqbmu3ia2z4b2wIlZUksBke/ol0=" +fi + +use flake + +source_env_if_exists .envrc.local diff --git a/.gitignore b/.gitignore index 15eeb075..8da066d4 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ Cargo.lock *.bk tarpc.iml .idea + +/.direnv diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..4feb320c --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "nixEnvSelector.nixFile": "${workspaceRoot}/vscode-env.nix" +} diff --git a/Cargo.toml b/Cargo.toml index c7557d06..4e26b878 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,7 @@ [workspace] resolver = "2" -members = [ - "example-service", - "tarpc", - "plugins", -] +members = ["example-service", "tarpc", "plugins"] [profile.dev] split-debuginfo = "unpacked" diff --git a/example-service/Cargo.toml b/example-service/Cargo.toml index 6f162d27..900d7f38 100644 --- a/example-service/Cargo.toml +++ b/example-service/Cargo.toml @@ -18,14 +18,10 @@ anyhow = "1.0" clap = { version = "3.0.0-rc.9", features = ["derive"] } log = "0.4" futures = "0.3" -opentelemetry = { version = "0.17", features = ["rt-tokio"] } -opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] } -rand = "0.8" -tarpc = { version = "0.30", path = "../tarpc", features = ["full"] } -tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] } +tarpc = { version = "0.30", path = "../tarpc", features = ["serde-transport"] } +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tracing = { version = "0.1" } -tracing-opentelemetry = "0.17" -tracing-subscriber = {version = "0.3", features = ["env-filter"]} +tracing-subscriber = { version = "0.3", features = ["env-filter"] } [lib] name = "service" diff --git a/example-service/src/lib.rs b/example-service/src/lib.rs index bc38fe93..973678e2 100644 --- a/example-service/src/lib.rs +++ b/example-service/src/lib.rs @@ -19,15 +19,9 @@ pub trait World { pub fn init_tracing(service_name: &str) -> anyhow::Result<()> { env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12"); - let tracer = opentelemetry_jaeger::new_pipeline() - .with_service_name(service_name) - .with_max_packet_size(2usize.pow(13)) - .install_batch(opentelemetry::runtime::Tokio)?; - tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::from_default_env()) .with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)) - .with(tracing_opentelemetry::layer().with_tracer(tracer)) .try_init()?; Ok(()) diff --git a/flake.lock b/flake.lock new file mode 100644 index 00000000..c081b8c4 --- /dev/null +++ b/flake.lock @@ -0,0 +1,164 @@ +{ + "nodes": { + "cargo2nix": { + "inputs": { + "flake-compat": "flake-compat", + "flake-utils": "flake-utils", + "nixpkgs": [ + "nixpkgs" + ], + "rust-overlay": "rust-overlay" + }, + "locked": { + "lastModified": 1655189312, + "narHash": "sha256-gpJ57OgIebUpO+7F00VltxSEy6dz2x6HeJ5BcRM8rDA=", + "owner": "cargo2nix", + "repo": "cargo2nix", + "rev": "c149357cc3d17f2849c73eb7a09d07a307cdcfe8", + "type": "github" + }, + "original": { + "owner": "cargo2nix", + "repo": "cargo2nix", + "type": "github" + } + }, + "flake-compat": { + "flake": false, + "locked": { + "lastModified": 1650374568, + "narHash": "sha256-Z+s0J8/r907g149rllvwhb4pKi8Wam5ij0st8PwAh+E=", + "owner": "edolstra", + "repo": "flake-compat", + "rev": "b4a34015c698c7793d592d66adbab377907a2be8", + "type": "github" + }, + "original": { + "owner": "edolstra", + "repo": "flake-compat", + "type": "github" + } + }, + "flake-compat_2": { + "flake": false, + "locked": { + "lastModified": 1650374568, + "narHash": "sha256-Z+s0J8/r907g149rllvwhb4pKi8Wam5ij0st8PwAh+E=", + "owner": "edolstra", + "repo": "flake-compat", + "rev": "b4a34015c698c7793d592d66adbab377907a2be8", + "type": "github" + }, + "original": { + "owner": "edolstra", + "repo": "flake-compat", + "type": "github" + } + }, + "flake-utils": { + "locked": { + "lastModified": 1653893745, + "narHash": "sha256-0jntwV3Z8//YwuOjzhV2sgJJPt+HY6KhU7VZUL0fKZQ=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "1ed9fb1935d260de5fe1c2f7ee0ebaae17ed2fa1", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "flake-utils_2": { + "locked": { + "lastModified": 1659877975, + "narHash": "sha256-zllb8aq3YO3h8B/U0/J1WBgAL8EX5yWf5pMj3G0NAmc=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "c0e246b9b83f637f4681389ecabcb2681b4f3af0", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1660574513, + "narHash": "sha256-nkMQ1TKIIAYIVbbUzjxfjPn3H1zZFW20TrHUFAjwvNU=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "af9e00071d0971eb292fd5abef334e66eda3cb69", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "cargo2nix": "cargo2nix", + "flake-compat": "flake-compat_2", + "flake-utils": "flake-utils_2", + "nixpkgs": "nixpkgs", + "rust-overlay": "rust-overlay_2" + } + }, + "rust-overlay": { + "inputs": { + "flake-utils": [ + "cargo2nix", + "flake-utils" + ], + "nixpkgs": [ + "cargo2nix", + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1653878966, + "narHash": "sha256-T51Gck/vrJZi1m+uTbhEFTRgZmE59sydVONadADv358=", + "owner": "oxalica", + "repo": "rust-overlay", + "rev": "8526d618af012a923ca116be9603e818b502a8db", + "type": "github" + }, + "original": { + "owner": "oxalica", + "repo": "rust-overlay", + "type": "github" + } + }, + "rust-overlay_2": { + "inputs": { + "flake-utils": [ + "flake-utils" + ], + "nixpkgs": [ + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1660618363, + "narHash": "sha256-SdIGzmiMjAfKyZScxb497AA4oHiiaHMPhb9iwLtoaMk=", + "owner": "oxalica", + "repo": "rust-overlay", + "rev": "b480a97c66038e9a6ff68cbf0c4fa4a783416e0b", + "type": "github" + }, + "original": { + "owner": "oxalica", + "repo": "rust-overlay", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 00000000..e36922cf --- /dev/null +++ b/flake.nix @@ -0,0 +1,45 @@ +{ + description = "tarpc-wasm"; + + inputs = { + flake-compat = { url = "github:edolstra/flake-compat"; flake = false; }; + nixpkgs = { url = "github:nixos/nixpkgs/nixos-unstable"; }; + flake-utils = { url = "github:numtide/flake-utils"; inputs.nixpkgs.follows = "nixpkgs"; }; + cargo2nix = { url = "github:cargo2nix/cargo2nix"; inputs.nixpkgs.follows = "nixpkgs"; }; + rust-overlay = { url = "github:oxalica/rust-overlay"; inputs = { nixpkgs.follows = "nixpkgs"; flake-utils.follows = "flake-utils"; }; }; + }; + + outputs = { self, nixpkgs, cargo2nix, flake-utils, rust-overlay, ... }: + flake-utils.lib.eachDefaultSystem (system: + let + pkgs = import nixpkgs { + inherit system; + overlays = [ + cargo2nix.overlays.default + rust-overlay.overlays.default + ]; + }; + + node_pkg = pkgs.nodejs-18_x.override { enableNpm = false; }; + + rust = import ./nix/rust.nix { + inherit pkgs; + channel = "nightly"; + version = "2022-08-16"; + }; + in rec { + devShell = pkgs.mkShell rec { + nativeBuildInputs = with pkgs; [ + binaryen + cargo-expand + rust.toolchain + wasm-bindgen-cli + wasm-pack + + node_pkg + (yarn.override { nodejs = node_pkg; }) # by default yarn uses the latest version of nodejs, so we override it to the correct version here + ]; + }; + } + ); +} diff --git a/nix/rust.nix b/nix/rust.nix new file mode 100644 index 00000000..140cf287 --- /dev/null +++ b/nix/rust.nix @@ -0,0 +1,27 @@ +{ + pkgs, + channel, + version +}: let + toolchain = pkgs.rust-bin.${channel}.${version}.default.override { + extensions = [ "rust-src" "rustfmt" "rust-analyzer" ]; + targets = [ + (pkgs.rust.toRustTarget pkgs.stdenv.buildPlatform) + (pkgs.rust.toRustTarget pkgs.stdenv.hostPlatform) + "wasm32-unknown-unknown" + ]; + }; + + project = pkgs.rustBuilder.makePackageSet { + rustChannel = toolchain; + packageFun = import ../Cargo.nix; + target = null; + }; + + platform = pkgs.makeRustPlatform { + cargo = toolchain; + rustc = toolchain; + }; +in { + inherit toolchain project platform; +} diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 9a8af270..077b31bb 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -3,8 +3,8 @@ name = "tarpc" version = "0.30.0" rust-version = "1.58.0" authors = [ - "Adam Wright ", - "Tim Kuehn ", + "Adam Wright ", + "Tim Kuehn ", ] edition = "2021" license = "MIT" @@ -27,12 +27,12 @@ serde-transport-bincode = ["tokio-serde/bincode"] tcp = ["tokio/net"] full = [ - "serde1", - "tokio1", - "serde-transport", - "serde-transport-json", - "serde-transport-bincode", - "tcp", + "serde1", + "tokio1", + "serde-transport", + "serde-transport-json", + "serde-transport-bincode", + "tcp", ] [badges] @@ -44,21 +44,17 @@ fnv = "1.0" futures = "0.3" humantime = "2.0" pin-project = "1.0" -rand = "0.8" serde = { optional = true, version = "1.0", features = ["derive"] } static_assertions = "1.1.0" tarpc-plugins = { path = "../plugins", version = "0.12" } thiserror = "1.0" -tokio = { version = "1", features = ["time"] } +tokio = { version = "1", features = [] } tokio-util = { version = "0.7.3", features = ["time"] } tokio-serde = { optional = true, version = "0.8" } tracing = { version = "0.1", default-features = false, features = [ - "attributes", - "log", + "attributes", + "log", ] } -tracing-opentelemetry = { version = "0.17.2", default-features = false } -opentelemetry = { version = "0.17.0", default-features = false } - [dev-dependencies] assert_matches = "1.4" @@ -66,16 +62,14 @@ bincode = "1.3" bytes = { version = "1", features = ["serde"] } flate2 = "1.0" futures-test = "0.3" -opentelemetry = { version = "0.17.0", default-features = false, features = [ - "rt-tokio", -] } -opentelemetry-jaeger = { version = "0.16.0", features = ["rt-tokio"] } pin-utils = "0.1.0-alpha" serde_bytes = "0.11" tracing-subscriber = { version = "0.3", features = ["env-filter"] } -tokio = { version = "1", features = ["full", "test-util"] } +tokio = { version = "1", features = ["macros", "test-util"] } tokio-serde = { version = "0.8", features = ["json", "bincode"] } trybuild = "1.0" +wasm-bindgen = "0.2" +wasm-bindgen-test = "0.2" [package.metadata.docs.rs] all-features = true diff --git a/tarpc/src/client.rs b/tarpc/src/client.rs index 39a6d2cc..ba46e37b 100644 --- a/tarpc/src/client.rs +++ b/tarpc/src/client.rs @@ -10,7 +10,7 @@ mod in_flight_requests; use crate::{ cancellations::{cancellations, CanceledRequests, RequestCancellation}, - context, trace, ClientMessage, Request, Response, ServerError, Transport, + context, ClientMessage, Request, Response, ServerError, Transport, }; use futures::{prelude::*, ready, stream::Fuse, task::*}; use in_flight_requests::{DeadlineExceededError, InFlightRequests}; @@ -116,7 +116,6 @@ impl Channel { name = "RPC", skip(self, ctx, request_name, request), fields( - rpc.trace_id = tracing::field::Empty, rpc.deadline = %humantime::format_rfc3339(ctx.deadline), otel.kind = "client", otel.name = request_name) @@ -128,13 +127,7 @@ impl Channel { request: Req, ) -> Result { let span = Span::current(); - ctx.trace_context = trace::Context::try_from(&span).unwrap_or_else(|_| { - tracing::trace!( - "OpenTelemetry subscriber not installed; making unsampled child context." - ); - ctx.trace_context.new_child() - }); - span.record("rpc.trace_id", &tracing::field::display(ctx.trace_id())); + let (response_completion, mut response) = oneshot::channel(); let request_id = u64::try_from(self.next_request_id.fetch_add(1, Ordering::Relaxed)).unwrap(); @@ -516,7 +509,6 @@ where message: request, context: context::Context { deadline: ctx.deadline, - trace_context: ctx.trace_context, }, }); self.start_send(request)?; @@ -539,10 +531,7 @@ where }; let _entered = span.enter(); - let cancel = ClientMessage::Cancel { - trace_context: context.trace_context, - request_id, - }; + let cancel = ClientMessage::Cancel { request_id }; self.start_send(cancel)?; tracing::info!("CancelRequest"); Poll::Ready(Some(Ok(()))) diff --git a/tarpc/src/context.rs b/tarpc/src/context.rs index e3a6aff1..dd3edef5 100644 --- a/tarpc/src/context.rs +++ b/tarpc/src/context.rs @@ -6,15 +6,8 @@ //! Provides a request context that carries a deadline and trace context. This context is sent from //! client to server and is used by the server to enforce response deadlines. - -use crate::trace::{self, TraceId}; -use opentelemetry::trace::TraceContextExt; use static_assertions::assert_impl_all; -use std::{ - convert::TryFrom, - time::{Duration, SystemTime}, -}; -use tracing_opentelemetry::OpenTelemetrySpanExt; +use std::time::{Duration, SystemTime}; /// A request context that carries request-scoped information like deadlines and trace information. /// It is sent from client to server and is used by the server to enforce response deadlines. @@ -31,11 +24,6 @@ pub struct Context { // Serialized as a Duration to prevent clock skew issues. #[cfg_attr(feature = "serde1", serde(with = "absolute_to_relative_time"))] pub deadline: SystemTime, - /// Uniquely identifies requests originating from the same source. - /// When a service handles a request by making requests itself, those requests should - /// include the same `trace_id` as that included on the original request. This way, - /// users can trace related actions across a distributed system. - pub trace_context: trace::Context, } #[cfg(feature = "serde1")] @@ -109,44 +97,8 @@ impl Default for Deadline { impl Context { /// Returns the context for the current request, or a default Context if no request is active. pub fn current() -> Self { - let span = tracing::Span::current(); Self { - trace_context: trace::Context::try_from(&span) - .unwrap_or_else(|_| trace::Context::default()), - deadline: span - .context() - .get::() - .cloned() - .unwrap_or_default() - .0, + deadline: Deadline::default().0, } } - - /// Returns the ID of the request-scoped trace. - pub fn trace_id(&self) -> &TraceId { - &self.trace_context.trace_id - } -} - -/// An extension trait for [`tracing::Span`] for propagating tarpc Contexts. -pub(crate) trait SpanExt { - /// Sets the given context on this span. Newly-created spans will be children of the given - /// context's trace context. - fn set_context(&self, context: &Context); -} - -impl SpanExt for tracing::Span { - fn set_context(&self, context: &Context) { - self.set_parent( - opentelemetry::Context::new() - .with_remote_span_context(opentelemetry::trace::SpanContext::new( - opentelemetry::trace::TraceId::from(context.trace_context.trace_id), - opentelemetry::trace::SpanId::from(context.trace_context.span_id), - opentelemetry::trace::TraceFlags::from(context.trace_context.sampling_decision), - true, - opentelemetry::trace::TraceState::default(), - )) - .with_value(Deadline(context.deadline)), - ); - } } diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 891efdd9..c0d51159 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -215,8 +215,6 @@ pub use {tokio_serde, tokio_util}; #[cfg_attr(docsrs, doc(cfg(feature = "serde-transport")))] pub mod serde_transport; -pub mod trace; - #[cfg(feature = "serde1")] pub use tarpc_plugins::derive_serde; @@ -330,10 +328,6 @@ pub enum ClientMessage { /// not be canceled, because the framework layer does not /// know about them. Cancel { - /// The trace context associates the message with a specific chain of causally-related actions, - /// possibly orchestrated across many distributed systems. - #[cfg_attr(feature = "serde1", serde(default))] - trace_context: trace::Context, /// The ID of the request to cancel. request_id: u64, }, diff --git a/tarpc/src/server.rs b/tarpc/src/server.rs index 7cf6a95a..2ee79a30 100644 --- a/tarpc/src/server.rs +++ b/tarpc/src/server.rs @@ -8,8 +8,7 @@ use crate::{ cancellations::{cancellations, CanceledRequests, RequestCancellation}, - context::{self, SpanExt}, - trace, ClientMessage, Request, Response, Transport, + context, ClientMessage, Request, Response, Transport, }; use ::tokio::sync::mpsc; use futures::{ @@ -22,7 +21,6 @@ use futures::{ use in_flight_requests::{AlreadyExistsError, InFlightRequests}; use pin_project::pin_project; use std::{ - convert::TryFrom, error::Error, fmt, marker::PhantomData, @@ -182,19 +180,11 @@ where ) -> Result, AlreadyExistsError> { let span = info_span!( "RPC", - rpc.trace_id = %request.context.trace_id(), rpc.deadline = %humantime::format_rfc3339(request.context.deadline), otel.kind = "server", otel.name = tracing::field::Empty, ); - span.set_context(&request.context); - request.context.trace_context = trace::Context::try_from(&span).unwrap_or_else(|_| { - tracing::trace!( - "OpenTelemetry subscriber not installed; making unsampled \ - child context." - ); - request.context.trace_context.new_child() - }); + let entered = span.enter(); tracing::info!("ReceiveRequest"); let start = self.in_flight_requests_mut().start_request( @@ -428,13 +418,9 @@ where } } } - ClientMessage::Cancel { - trace_context, - request_id, - } => { + ClientMessage::Cancel { request_id } => { if !self.in_flight_requests_mut().cancel_request(request_id) { tracing::trace!( - rpc.trace_id = %trace_context.trace_id, "Received cancellation, but response handler is already complete.", ); } @@ -786,7 +772,7 @@ where mod tests { use super::{in_flight_requests::AlreadyExistsError, BaseChannel, Channel, Config, Requests}; use crate::{ - context, trace, + context, transport::channel::{self, UnboundedChannel}, ClientMessage, Request, Response, }; @@ -924,12 +910,9 @@ mod tests { }) .unwrap(); - tx.send(ClientMessage::Cancel { - trace_context: trace::Context::default(), - request_id: 0, - }) - .await - .unwrap(); + tx.send(ClientMessage::Cancel { request_id: 0 }) + .await + .unwrap(); assert_matches!( channel.as_mut().poll_next(&mut noop_context()), diff --git a/tarpc/src/server/testing.rs b/tarpc/src/server/testing.rs index 1c683da8..a3c11f95 100644 --- a/tarpc/src/server/testing.rs +++ b/tarpc/src/server/testing.rs @@ -94,7 +94,6 @@ impl FakeChannel>, Response> { request: Request { context: context::Context { deadline: SystemTime::UNIX_EPOCH, - trace_context: Default::default(), }, id, message, diff --git a/vscode-env.nix b/vscode-env.nix new file mode 100644 index 00000000..5a0a64ac --- /dev/null +++ b/vscode-env.nix @@ -0,0 +1,10 @@ +(import + ( + let lock = builtins.fromJSON (builtins.readFile ./flake.lock); in + fetchTarball { + url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz"; + sha256 = lock.nodes.flake-compat.locked.narHash; + } + ) + { src = ./.; } +).shellNix