diff --git a/Cargo.lock b/Cargo.lock index 952f4f7..5179213 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -574,11 +574,13 @@ dependencies = [ "givc-common", "http 0.2.12", "http-body 0.4.6", + "hyper-util", "serde", "tokio", "tokio-stream", "tonic", "tonic-types", + "tower", "tracing", ] diff --git a/client/Cargo.toml b/client/Cargo.toml index 1482e19..b36cdc9 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -12,10 +12,12 @@ async-channel = "2.3.1" async-stream = "0.3" http = "0.2" http-body = "0.4.2" +hyper-util = { version = "0.1.4"} tokio = {version = "1.0", features = ["rt-multi-thread", "time", "macros"]} tokio-stream = "0.1" tonic = {version="0.12.2", features = ["tls"]} tonic-types = {version="0.12.2"} +tower = {version = "0.4"} tracing = "0.1" serde = { version = "1.0.202", features = ["derive"]} diff --git a/client/src/client.rs b/client/src/client.rs index a7b16f0..f418fc4 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -1,13 +1,16 @@ -use crate::endpoint::{EndpointConfig, TlsConfig}; use anyhow::bail; use async_channel::Receiver; -use givc_common::pb; -pub use givc_common::query::{Event, QueryResult}; -use givc_common::types::*; use tokio_stream::StreamExt; use tonic::transport::Channel; use tracing::debug; +use givc_common::address::EndpointAddress; +use givc_common::pb; +pub use givc_common::query::{Event, QueryResult}; +use givc_common::types::*; + +use crate::endpoint::{EndpointConfig, TlsConfig}; + type Client = pb::admin_service_client::AdminServiceClient; #[derive(Debug)] @@ -40,6 +43,13 @@ impl AdminClient { // New style api, not yet implemented, stub atm to make current code happy // FIXME: Still doubt if constructor should be sync or async pub fn new(addr: String, port: u16, tls_info: Option<(String, TlsConfig)>) -> Self { + Self::from_endpoint_address(EndpointAddress::Tcp { addr, port }, tls_info) + } + + pub fn from_endpoint_address( + addr: EndpointAddress, + tls_info: Option<(String, TlsConfig)>, + ) -> Self { let (name, tls) = match tls_info { Some((name, tls)) => (name, Some(tls)), None => (String::from("bogus(no tls)"), None), @@ -48,8 +58,6 @@ impl AdminClient { endpoint: EndpointConfig { transport: TransportConfig { address: addr, - port: port, - protocol: String::from("bogus"), tls_name: name, }, tls: tls, diff --git a/client/src/endpoint.rs b/client/src/endpoint.rs index e22c9c3..210d36d 100644 --- a/client/src/endpoint.rs +++ b/client/src/endpoint.rs @@ -1,11 +1,17 @@ -use anyhow::anyhow; -use givc_common::types::TransportConfig; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::time::Duration; -use tonic::transport::Endpoint; + +use anyhow::anyhow; +use hyper_util::rt::TokioIo; +use tokio::net::UnixStream; use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity, ServerTlsConfig}; +use tonic::transport::{Endpoint, Uri}; +use tower::service_fn; use tracing::info; +use givc_common::address::EndpointAddress; +use givc_common::types::TransportConfig; + #[derive(Debug, Clone)] pub struct TlsConfig { pub ca_cert_file_path: PathBuf, @@ -50,17 +56,41 @@ impl TlsConfig { } } -fn transport_config_to_url(tc: &TransportConfig, with_tls: bool) -> String { +fn transport_config_to_url(ea: &EndpointAddress, with_tls: bool) -> String { let scheme = match with_tls { true => "https", false => "http", }; - format!("{}://{}:{}", scheme, tc.address, tc.port) + match ea { + EndpointAddress::Tcp { addr, port } => format!("{}://{}:{}", scheme, addr, port), + _ => format!("{}://[::]:443", scheme), // Bogus url, to make tonic connector happy + } +} + +async fn connect_unix_socket(endpoint: Endpoint, path: &String) -> anyhow::Result { + let mut path = Some(path.to_owned()); + let ch = endpoint + .connect_with_connector(service_fn(move |_: Uri| { + let path = path.take(); + async move { + if let Some(path) = path { + // Connect to a Uds socket + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?)) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Path already taken", + )) + } + } + })) + .await?; + Ok(ch) } impl EndpointConfig { pub async fn connect(&self) -> anyhow::Result { - let url = transport_config_to_url(&self.transport, self.tls.is_some()); + let url = transport_config_to_url(&self.transport.address, self.tls.is_some()); info!("Connecting to {url}, TLS name {:?}", &self.tls); let mut endpoint = Endpoint::try_from(url)? .timeout(Duration::from_secs(5)) @@ -68,7 +98,11 @@ impl EndpointConfig { if let Some(tls) = &self.tls { endpoint = endpoint.tls_config(tls.client_config()?)?; }; - let channel = endpoint.connect().await?; + let channel = match &self.transport.address { + EndpointAddress::Tcp { .. } => endpoint.connect().await?, + EndpointAddress::Unix(unix) => connect_unix_socket(endpoint, unix).await?, + EndpointAddress::Abstract(abs) => connect_unix_socket(endpoint, abs).await?, + }; Ok(channel) } } diff --git a/common/src/address.rs b/common/src/address.rs new file mode 100644 index 0000000..2a46825 --- /dev/null +++ b/common/src/address.rs @@ -0,0 +1,19 @@ +use std::convert::{Into, TryFrom}; +//use std::net::SocketAddr; +use std::path::PathBuf; + +// use tokio_vsock::VsockAddr; + +use crate::pb; + +#[derive(Clone, Debug, PartialEq)] +pub enum EndpointAddress { + Tcp { + // IP + port (FIXME: should be SocketAddres) + addr: String, + port: u16, + }, + Unix(String), // "/path/to/sock" (same host only) + Abstract(String), // "@abstract-socket-name" (same host only) + // VSock(VsockAddr), // cid+port. FIXME: cid have two magic numbers for host and local +} diff --git a/common/src/lib.rs b/common/src/lib.rs index d66f3a1..9209e20 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -1,3 +1,4 @@ +pub mod address; pub mod query; pub mod types; diff --git a/common/src/types.rs b/common/src/types.rs index f9ea79e..35d5fae 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -1,5 +1,6 @@ // This module contain literal translations of types from internal/pkgs/types/types.go // Some of them would be rewritten, replaced, or even removed +use super::address::EndpointAddress; use crate::pb; use anyhow::{anyhow, bail}; use std::convert::{Into, TryFrom}; @@ -181,9 +182,7 @@ impl Into for UnitStatus { #[derive(Debug, Clone, PartialEq)] pub struct EndpointEntry { - pub protocol: String, // Bogus, should we drop it? - pub address: String, - pub port: u16, + pub address: EndpointAddress, pub tls_name: String, } @@ -192,10 +191,18 @@ pub type TransportConfig = EndpointEntry; impl TryFrom for EndpointEntry { type Error = anyhow::Error; fn try_from(tc: pb::TransportConfig) -> Result { + let endpoint = match tc.protocol.as_str() { + "tcp" => EndpointAddress::Tcp { + addr: tc.address, + port: tc.port.parse()?, + }, + "unix" => EndpointAddress::Unix(tc.address), + "abstract" => EndpointAddress::Abstract(tc.address), + // "vsock" => unimplemented!(), + unknown => bail!("Unknown protocol: {unknown}"), + }; Ok(Self { - protocol: tc.protocol, - address: tc.address, - port: tc.port.parse()?, + address: endpoint, tls_name: tc.name, }) } @@ -203,11 +210,31 @@ impl TryFrom for EndpointEntry { impl Into for EndpointEntry { fn into(self) -> pb::TransportConfig { - pb::TransportConfig { - protocol: self.protocol, - address: self.address, - port: self.port.to_string(), - name: self.tls_name, + match self.address { + EndpointAddress::Tcp { addr, port } => pb::TransportConfig { + protocol: "tcp".into(), + address: addr, + port: port.to_string(), + name: self.tls_name, + }, + EndpointAddress::Unix(unix) => pb::TransportConfig { + protocol: "unix".into(), + address: unix, + port: "".into(), + name: self.tls_name, + }, + EndpointAddress::Abstract(abstr) => pb::TransportConfig { + protocol: "abstract".into(), + address: abstr, + port: "".into(), + name: self.tls_name, + }, + // EndpointAddress::Vsock(vs) => pb::TransportConfig { + // protocol: "vsock".into(), + // address: vs.cid().to_string(), + // port: vs.port().to_string(), + // name: self.tls_name, + // } } } } diff --git a/src/admin/entry.rs b/src/admin/entry.rs index e8dcb70..1f4cec8 100644 --- a/src/admin/entry.rs +++ b/src/admin/entry.rs @@ -41,6 +41,7 @@ impl RegistryEntry { #[cfg(test)] impl RegistryEntry { pub fn dummy(n: String) -> Self { + use givc_common::address::EndpointAddress; Self { name: n, r#type: UnitType { @@ -56,9 +57,10 @@ impl RegistryEntry { path: "bogus".to_string(), }, placement: Placement::Endpoint(EndpointEntry { - protocol: "bogus".to_string(), - address: "127.0.0.1".to_string(), - port: 42, + address: EndpointAddress::Tcp { + addr: "127.0.0.1".to_string(), + port: 42, + }, tls_name: "bogus".to_string(), }), watch: true, diff --git a/src/bin/givc-agent.rs b/src/bin/givc-agent.rs index 1b565f7..36a4cdc 100644 --- a/src/bin/givc-agent.rs +++ b/src/bin/givc-agent.rs @@ -4,6 +4,7 @@ use givc::systemd_api::server::SystemdService; use givc::types::*; use givc::utils::naming::*; use givc_client::AdminClient; +use givc_common::address::EndpointAddress; use givc_common::pb; use givc_common::pb::reflection::SYSTEMD_DESCRIPTOR; use std::net::SocketAddr; @@ -88,9 +89,10 @@ async fn main() -> std::result::Result<(), Box> { // Perfect example of bad designed code, admin.register_service(entry) should hide structure filling let endpoint = EndpointEntry { - address: cli.addr, - port: cli.port, - protocol: String::from("bogus"), + address: EndpointAddress::Tcp { + addr: cli.addr, + port: cli.port, + }, tls_name: cli.name, }; // We can't use just one name field like in "go" code