Skip to content

Commit

Permalink
Merge pull request #30 from avnik/avnik/more-info
Browse files Browse the repository at this point in the history
Yield more information about services to client
  • Loading branch information
mbssrc authored Dec 20, 2024
2 parents f98999a + 0a14228 commit e0f702c
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 38 deletions.
4 changes: 4 additions & 0 deletions api/admin/admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ message QueryListItem {
string Description = 2;
string VmStatus = 3;
string TrustLevel = 4;
string VmType = 5;
string ServiceType = 6;
optional string VmName = 7; // None for host running services
optional string AgentName = 8; // None for agents
}

message QueryListResponse {
Expand Down
2 changes: 1 addition & 1 deletion client/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl EndpointConfig {
let url = transport_config_to_url(&self.transport.address, self.tls.is_some());
info!("Connecting to {url}, TLS name {:?}", &self.tls);
let mut endpoint = Endpoint::try_from(url.clone())?
.timeout(Duration::from_secs(5))
.connect_timeout(Duration::from_millis(300))
.concurrency_limit(30);
if let Some(tls) = &self.tls {
endpoint = endpoint.tls_config(tls.client_config()?)?;
Expand Down
15 changes: 15 additions & 0 deletions common/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Types related to QueryList and Watch API
use super::types::{ServiceType, VmType};
use crate::pb;
use pb::admin::watch_item::Status;

Expand Down Expand Up @@ -33,6 +34,10 @@ pub struct QueryResult {
pub description: String, //App name, some details
pub status: VMStatus,
pub trust_level: TrustLevel,
pub vm_type: VmType,
pub service_type: ServiceType,
pub vm_name: Option<String>,
pub agent_name: Option<String>,
}

impl QueryResult {
Expand All @@ -51,6 +56,12 @@ impl TryFrom<pb::QueryListItem> for QueryResult {
.with_context(|| format!("While parsing vm_status {}", item.vm_status))?,
trust_level: TrustLevel::from_str(item.trust_level.as_str())
.with_context(|| format!("While parsing trust_level {}", item.trust_level))?,
vm_type: VmType::from_str(item.vm_type.as_str())
.with_context(|| format!("While parsing vm_type {}", item.vm_type))?,
service_type: ServiceType::from_str(item.service_type.as_str())
.with_context(|| format!("While parsing service_type {}", item.service_type))?,
agent_name: item.agent_name,
vm_name: item.vm_name,
})
}
}
Expand All @@ -62,6 +73,10 @@ impl From<QueryResult> for pb::QueryListItem {
description: val.description,
vm_status: val.status.to_string(),
trust_level: val.trust_level.to_string(),
vm_type: val.vm_type.to_string(),
service_type: val.service_type.to_string(),
agent_name: val.agent_name,
vm_name: val.vm_name,
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::pb;
use std::convert::{Into, TryFrom};

use anyhow::bail;
use serde::Serialize;
use strum::{Display, EnumString};
use tokio_vsock::VsockAddr;

#[derive(Debug, Copy, Clone, PartialEq)]
Expand All @@ -13,15 +15,15 @@ pub struct UnitType {
pub service: ServiceType,
}

#[derive(Debug, Copy, Clone, PartialEq)]
#[derive(Debug, Copy, Clone, PartialEq, Serialize, EnumString, Display)]
pub enum VmType {
Host,
AdmVM,
SysVM,
AppVM,
}

#[derive(Debug, Copy, Clone, PartialEq)]
#[derive(Debug, Copy, Clone, PartialEq, Serialize, EnumString, Display)]
pub enum ServiceType {
Mgr,
Svc,
Expand Down
2 changes: 1 addition & 1 deletion internal/pkgs/servicemanager/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *SystemdControlServer) GetUnitStatus(ctx context.Context, req *systemd_a
unitStatus, err := s.Controller.FindUnit(req.UnitName)
if err != nil {
log.Infof("[GetUnitStatus] Error finding unit: %v", err)
return nil, grpc_status.Error(grpc_codes.NotFound, "error fetching unit status")
return nil, grpc_status.Error(grpc_codes.NotFound, fmt.Sprintf("error fetching unit status: %s", req.UnitName))
}
if len(unitStatus) != 1 {
errStr := fmt.Sprintf("error, got %d units named %s", len(unitStatus), req.UnitName)
Expand Down
54 changes: 43 additions & 11 deletions src/admin/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ use givc_common::types::*;
#[derive(Debug, Clone, PartialEq)]
pub enum Placement {
// Service is a `givc-agent` and could be directly connected
Endpoint(EndpointEntry),
Endpoint { endpoint: EndpointEntry, vm: String },

// Service or application managed by specified agent
Managed(String),
Managed { vm: String, by: String },

// Running on host
Host,
}

#[derive(Debug, Clone, PartialEq)]
Expand All @@ -26,14 +29,31 @@ pub struct RegistryEntry {
}

impl RegistryEntry {
pub fn agent_name(&self) -> Option<&str> {
match &self.placement {
Placement::Endpoint { .. } => Some(&self.name),
Placement::Managed { by, .. } => Some(by),
Placement::Host => None,
}
}

pub fn vm_name(&self) -> Option<&str> {
match &self.placement {
Placement::Endpoint { vm, .. } => Some(vm),
Placement::Managed { vm, .. } => Some(vm),
Placement::Host => None,
}
}

pub fn agent(&self) -> anyhow::Result<&EndpointEntry> {
match &self.placement {
Placement::Endpoint(endpoint) => Ok(endpoint),
Placement::Managed(by) => Err(anyhow!(
Placement::Endpoint { endpoint, .. } => Ok(endpoint),
Placement::Managed { by, .. } => Err(anyhow!(
"Agent endpoint {} is managed by {}!",
self.name,
by
)),
Placement::Host => Err(anyhow!("Its a host!")),
}
}
}
Expand All @@ -57,13 +77,16 @@ impl RegistryEntry {
path: "bogus".to_string(),
freezer_state: "bogus".to_string(),
},
placement: Placement::Endpoint(EndpointEntry {
address: EndpointAddress::Tcp {
addr: "127.0.0.1".to_string(),
port: 42,
placement: Placement::Endpoint {
endpoint: EndpointEntry {
address: EndpointAddress::Tcp {
addr: "127.0.0.1".to_string(),
port: 42,
},
tls_name: "bogus".to_string(),
},
tls_name: "bogus".to_string(),
}),
vm: "bogus".into(),
},
watch: true,
}
}
Expand All @@ -89,7 +112,10 @@ impl TryFrom<pb::RegistryRequest> for RegistryEntry {
status,
watch,
r#type: ty,
placement: Placement::Endpoint(endpoint),
placement: Placement::Endpoint {
endpoint,
vm: "bogus".into(),
},
})
}
}
Expand All @@ -103,11 +129,17 @@ impl From<RegistryEntry> for QueryResult {
} else {
VMStatus::PoweredOff
};
let vm_name = val.vm_name().map(|s| s.to_owned());
let agent_name = val.agent_name().map(|s| s.to_owned());
QueryResult {
name: val.name,
description: val.status.description,
status,
trust_level: TrustLevel::default(),
vm_type: val.r#type.vm,
service_type: val.r#type.service,
vm_name,
agent_name,
}
}
}
26 changes: 18 additions & 8 deletions src/admin/registry.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::hash_map::HashMap;
use std::sync::{Arc, Mutex};

use super::entry::{Placement, RegistryEntry};
use super::entry::RegistryEntry;
use crate::types::*;
use anyhow::{anyhow, bail};
use givc_common::query::{Event, QueryResult};
Expand Down Expand Up @@ -50,9 +50,12 @@ impl Registry {
Some(entry) => {
let cascade: Vec<String> = state
.values()
.filter_map(|re| match &re.placement {
Placement::Managed(within) if within == name => Some(re.name.clone()),
_ => None,
.filter_map(|re| {
if re.agent_name() == Some(name) || re.vm_name() == Some(name) {
Some(re.name.clone())
} else {
None
}
})
.collect();
for each in cascade {
Expand Down Expand Up @@ -173,6 +176,7 @@ impl Registry {
#[cfg(test)]
mod tests {
use super::*;
use crate::admin::entry::Placement;
use crate::utils::naming::parse_application_name;

#[test]
Expand Down Expand Up @@ -204,11 +208,17 @@ mod tests {
let r = Registry::new();
let foo = RegistryEntry::dummy("foo".to_string());
let bar = RegistryEntry {
placement: Placement::Managed("foo".into()),
placement: Placement::Managed {
by: "foo".into(),
vm: "foo-vm".into(),
},
..RegistryEntry::dummy("bar".to_string())
};
let baz = RegistryEntry {
placement: Placement::Managed("foo".into()),
placement: Placement::Managed {
by: "foo".into(),
vm: "foo-vm".into(),
},
..RegistryEntry::dummy("baz".to_string())
};

Expand All @@ -219,12 +229,12 @@ mod tests {
assert!(r.contains("bar"));
assert!(r.contains("baz"));

r.deregister("baz");
r.deregister("baz")?;
assert!(r.contains("foo"));
assert!(r.contains("bar"));
assert!(!r.contains("baz"));

r.deregister("foo");
r.deregister("foo")?;
assert!(!r.contains("foo"));
assert!(!r.contains("bar"));

Expand Down
38 changes: 23 additions & 15 deletions src/admin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,15 @@ impl AdminServiceImpl {

pub fn endpoint(&self, entry: &RegistryEntry) -> anyhow::Result<EndpointConfig> {
let transport = match &entry.placement {
Placement::Managed(parent) => {
Placement::Managed { by: parent, .. } => {
let parent = self.registry.by_name(parent)?;
parent
.agent()
.with_context(|| "When get_remote_status()")?
.to_owned() // Fail, if parent also `Managed`
}
Placement::Endpoint(endpoint) => endpoint.clone(), // FIXME: avoid clone!
Placement::Endpoint { endpoint, .. } => endpoint.clone(), // FIXME: avoid clone!
Placement::Host => bail!("endpoint() called for Host"), // impossible, FIXME: should never happens atm
};
let tls_name = transport.tls_name.clone();
Ok(EndpointConfig {
Expand Down Expand Up @@ -223,6 +224,10 @@ impl AdminServiceImpl {
}

pub async fn handle_error(&self, entry: RegistryEntry) -> anyhow::Result<()> {
info!(
"Handling error for {} vm type {} service type {}",
entry.name, entry.r#type.vm, entry.r#type.service
);
match (entry.r#type.vm, entry.r#type.service) {
(VmType::AppVM, ServiceType::App) => {
if entry.status.is_exitted() {
Expand All @@ -232,10 +237,11 @@ impl AdminServiceImpl {
Ok(())
}
(VmType::AppVM, ServiceType::Mgr) | (VmType::SysVM, ServiceType::Mgr) => {
let name = parse_service_name(&entry.name)?;
self.start_vm(name)
.await
.with_context(|| format!("handing error, by restart VM {}", entry.name))?;
if let Placement::Managed { vm: vm_name, .. } = entry.placement {
self.start_vm(&vm_name)
.await
.with_context(|| format!("handing error, by restart VM {}", entry.name))?;
}
Ok(()) // FIXME: should use `?` from line above, why it didn't work?
}
(x, y) => {
Expand Down Expand Up @@ -305,27 +311,26 @@ impl AdminServiceImpl {
let name = req.app_name;
let vm = req.vm_name.as_deref();
let vm_name = format_vm_name(&name, vm);
let systemd_agent = format_service_name(&name, vm);
let systemd_agent_name = format_service_name(&name, vm);

info!("Starting app {name} on {vm_name}");
info!("Agent: {systemd_agent}");
info!("Starting app {name} on {vm_name} via {systemd_agent_name}");

// Entry unused in "go" code
match self.registry.by_name(&systemd_agent) {
match self.registry.by_name(&systemd_agent_name) {
std::result::Result::Ok(e) => e,
Err(_) => {
info!("Starting up VM {vm_name}");
self.start_vm(&vm_name)
.await
.with_context(|| format!("Starting vm for {name}"))?;
self.registry
.by_name(&systemd_agent)
.by_name(&systemd_agent_name)
.context("after starting VM")?
}
};
let endpoint = self.agent_endpoint(&systemd_agent)?;
let client = SystemDClient::new(endpoint.clone());
let app_name = self.registry.create_unique_entry_name(&name.to_string());
let endpoint = self.agent_endpoint(&systemd_agent_name)?;
let client = SystemDClient::new(endpoint);
let app_name = self.registry.create_unique_entry_name(&name);
client.start_application(app_name.clone(), req.args).await?;
let status = client.get_remote_status(app_name.clone()).await?;
if status.active_state != "active" {
Expand All @@ -340,7 +345,10 @@ impl AdminServiceImpl {
vm: VmType::AppVM,
service: ServiceType::App,
},
placement: Placement::Managed(systemd_agent),
placement: Placement::Managed {
by: systemd_agent_name,
vm: vm_name,
},
};
self.registry.register(app_entry);
Ok(())
Expand Down

0 comments on commit e0f702c

Please sign in to comment.