Skip to content

Commit

Permalink
Implement connection for both unix and tcp sockets
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander V. Nikolaev <[email protected]>
  • Loading branch information
avnik committed Sep 5, 2024
1 parent 769a94e commit c993f4a
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 31 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ async-channel = "2.3.1"
async-stream = "0.3"
http = "0.2"
http-body = "0.4.2"
hyper-util = { version = "0.1.4"}
tokio = {version = "1.0", features = ["rt-multi-thread", "time", "macros"]}
tokio-stream = "0.1"
tonic = {version="0.12.2", features = ["tls"]}
tonic-types = {version="0.12.2"}
tower = {version = "0.4"}
tracing = "0.1"
serde = { version = "1.0.202", features = ["derive"]}

Expand Down
20 changes: 14 additions & 6 deletions client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::endpoint::{EndpointConfig, TlsConfig};
use anyhow::bail;
use async_channel::Receiver;
use givc_common::pb;
pub use givc_common::query::{Event, QueryResult};
use givc_common::types::*;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use tracing::debug;

use givc_common::address::EndpointAddress;
use givc_common::pb;
pub use givc_common::query::{Event, QueryResult};
use givc_common::types::*;

use crate::endpoint::{EndpointConfig, TlsConfig};

type Client = pb::admin_service_client::AdminServiceClient<Channel>;

#[derive(Debug)]
Expand Down Expand Up @@ -40,6 +43,13 @@ impl AdminClient {
// New style api, not yet implemented, stub atm to make current code happy
// FIXME: Still doubt if constructor should be sync or async
pub fn new(addr: String, port: u16, tls_info: Option<(String, TlsConfig)>) -> Self {
Self::from_endpoint_address(EndpointAddress::Tcp { addr, port }, tls_info)
}

pub fn from_endpoint_address(
addr: EndpointAddress,
tls_info: Option<(String, TlsConfig)>,
) -> Self {
let (name, tls) = match tls_info {
Some((name, tls)) => (name, Some(tls)),
None => (String::from("bogus(no tls)"), None),
Expand All @@ -48,8 +58,6 @@ impl AdminClient {
endpoint: EndpointConfig {
transport: TransportConfig {
address: addr,
port: port,
protocol: String::from("bogus"),
tls_name: name,
},
tls: tls,
Expand Down
50 changes: 42 additions & 8 deletions client/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use anyhow::anyhow;
use givc_common::types::TransportConfig;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::time::Duration;
use tonic::transport::Endpoint;

use anyhow::anyhow;
use hyper_util::rt::TokioIo;
use tokio::net::UnixStream;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity, ServerTlsConfig};
use tonic::transport::{Endpoint, Uri};
use tower::service_fn;
use tracing::info;

use givc_common::address::EndpointAddress;
use givc_common::types::TransportConfig;

#[derive(Debug, Clone)]
pub struct TlsConfig {
pub ca_cert_file_path: PathBuf,
Expand Down Expand Up @@ -50,25 +56,53 @@ impl TlsConfig {
}
}

fn transport_config_to_url(tc: &TransportConfig, with_tls: bool) -> String {
fn transport_config_to_url(ea: &EndpointAddress, with_tls: bool) -> String {
let scheme = match with_tls {
true => "https",
false => "http",
};
format!("{}://{}:{}", scheme, tc.address, tc.port)
match ea {
EndpointAddress::Tcp { addr, port } => format!("{}://{}:{}", scheme, addr, port),
_ => format!("{}://[::]:443", scheme), // Bogus url, to make tonic connector happy
}
}

async fn connect_unix_socket(endpoint: Endpoint, path: &String) -> anyhow::Result<Channel> {
let mut path = Some(path.to_owned());
let ch = endpoint
.connect_with_connector(service_fn(move |_: Uri| {
let path = path.take();
async move {
if let Some(path) = path {
// Connect to a Uds socket
Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?))
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Path already taken",
))
}
}
}))
.await?;
Ok(ch)
}

impl EndpointConfig {
pub async fn connect(&self) -> anyhow::Result<Channel> {
let url = transport_config_to_url(&self.transport, self.tls.is_some());
let url = transport_config_to_url(&self.transport.address, self.tls.is_some());
info!("Connecting to {url}, TLS name {:?}", &self.tls);
let mut endpoint = Endpoint::try_from(url)?
.timeout(Duration::from_secs(5))
.concurrency_limit(30);
if let Some(tls) = &self.tls {
endpoint = endpoint.tls_config(tls.client_config()?)?;
};
let channel = endpoint.connect().await?;
let channel = match &self.transport.address {
EndpointAddress::Tcp { .. } => endpoint.connect().await?,
EndpointAddress::Unix(unix) => connect_unix_socket(endpoint, unix).await?,
EndpointAddress::Abstract(abs) => connect_unix_socket(endpoint, abs).await?,
};
Ok(channel)
}
}
19 changes: 19 additions & 0 deletions common/src/address.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::convert::{Into, TryFrom};
//use std::net::SocketAddr;
use std::path::PathBuf;

// use tokio_vsock::VsockAddr;

use crate::pb;

#[derive(Clone, Debug, PartialEq)]
pub enum EndpointAddress {
Tcp {
// IP + port (FIXME: should be SocketAddres)
addr: String,
port: u16,
},
Unix(String), // "/path/to/sock" (same host only)
Abstract(String), // "@abstract-socket-name" (same host only)
// VSock(VsockAddr), // cid+port. FIXME: cid have two magic numbers for host and local
}
1 change: 1 addition & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod address;
pub mod query;
pub mod types;

Expand Down
49 changes: 38 additions & 11 deletions common/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// This module contain literal translations of types from internal/pkgs/types/types.go
// Some of them would be rewritten, replaced, or even removed
use super::address::EndpointAddress;
use crate::pb;
use anyhow::{anyhow, bail};
use std::convert::{Into, TryFrom};
Expand Down Expand Up @@ -181,9 +182,7 @@ impl Into<pb::UnitStatus> for UnitStatus {

#[derive(Debug, Clone, PartialEq)]
pub struct EndpointEntry {
pub protocol: String, // Bogus, should we drop it?
pub address: String,
pub port: u16,
pub address: EndpointAddress,
pub tls_name: String,
}

Expand All @@ -192,22 +191,50 @@ pub type TransportConfig = EndpointEntry;
impl TryFrom<pb::TransportConfig> for EndpointEntry {
type Error = anyhow::Error;
fn try_from(tc: pb::TransportConfig) -> Result<Self, Self::Error> {
let endpoint = match tc.protocol.as_str() {
"tcp" => EndpointAddress::Tcp {
addr: tc.address,
port: tc.port.parse()?,
},
"unix" => EndpointAddress::Unix(tc.address),
"abstract" => EndpointAddress::Abstract(tc.address),
// "vsock" => unimplemented!(),
unknown => bail!("Unknown protocol: {unknown}"),
};
Ok(Self {
protocol: tc.protocol,
address: tc.address,
port: tc.port.parse()?,
address: endpoint,
tls_name: tc.name,
})
}
}

impl Into<pb::TransportConfig> for EndpointEntry {
fn into(self) -> pb::TransportConfig {
pb::TransportConfig {
protocol: self.protocol,
address: self.address,
port: self.port.to_string(),
name: self.tls_name,
match self.address {
EndpointAddress::Tcp { addr, port } => pb::TransportConfig {
protocol: "tcp".into(),
address: addr,
port: port.to_string(),
name: self.tls_name,
},
EndpointAddress::Unix(unix) => pb::TransportConfig {
protocol: "unix".into(),
address: unix,
port: "".into(),
name: self.tls_name,
},
EndpointAddress::Abstract(abstr) => pb::TransportConfig {
protocol: "abstract".into(),
address: abstr,
port: "".into(),
name: self.tls_name,
},
// EndpointAddress::Vsock(vs) => pb::TransportConfig {
// protocol: "vsock".into(),
// address: vs.cid().to_string(),
// port: vs.port().to_string(),
// name: self.tls_name,
// }
}
}
}
8 changes: 5 additions & 3 deletions src/admin/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl RegistryEntry {
#[cfg(test)]
impl RegistryEntry {
pub fn dummy(n: String) -> Self {
use givc_common::address::EndpointAddress;
Self {
name: n,
r#type: UnitType {
Expand All @@ -56,9 +57,10 @@ impl RegistryEntry {
path: "bogus".to_string(),
},
placement: Placement::Endpoint(EndpointEntry {
protocol: "bogus".to_string(),
address: "127.0.0.1".to_string(),
port: 42,
address: EndpointAddress::Tcp {
addr: "127.0.0.1".to_string(),
port: 42,
},
tls_name: "bogus".to_string(),
}),
watch: true,
Expand Down
8 changes: 5 additions & 3 deletions src/bin/givc-agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use givc::systemd_api::server::SystemdService;
use givc::types::*;
use givc::utils::naming::*;
use givc_client::AdminClient;
use givc_common::address::EndpointAddress;
use givc_common::pb;
use givc_common::pb::reflection::SYSTEMD_DESCRIPTOR;
use std::net::SocketAddr;
Expand Down Expand Up @@ -88,9 +89,10 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {

// Perfect example of bad designed code, admin.register_service(entry) should hide structure filling
let endpoint = EndpointEntry {
address: cli.addr,
port: cli.port,
protocol: String::from("bogus"),
address: EndpointAddress::Tcp {
addr: cli.addr,
port: cli.port,
},
tls_name: cli.name,
};
// We can't use just one name field like in "go" code
Expand Down

0 comments on commit c993f4a

Please sign in to comment.