Skip to content

Commit

Permalink
Merge branch 'master' into WT-MM-patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
WT-MM authored Dec 18, 2024
2 parents 147cb15 + 986528f commit 46bb601
Show file tree
Hide file tree
Showing 23 changed files with 759 additions and 261 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,7 @@ build/
dist/
*.so
out*/

# Dev
*.patch
robstridev2/
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1 +1 @@
recursive-include kos_protos/ *.py *.txt py.typed MANIFEST.in py.typed Makefile
recursive-include kos_protos/ *.py *.txt py.typed MANIFEST.in py.typed Makefile pykos/Makefile
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ Reference the existing platforms / features in [platforms](platforms).

You essentially create another package (Cargo.toml, lib.rs, etc) with the necessary actuator and imu implementations according to the specifications in [kos-core](kos-core/src/services)


To save trace logs to a file, pass the `--log` flag:
```bash
cargo run --features stub -- --log


## Contributing
- Use `cargo fmt --all` to format the code.
- Use `cargo clippy` to check for lint errors.
Expand Down
5 changes: 5 additions & 0 deletions daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ tower = "0.5"
gstreamer = "0.23"
gstreamer-base = "0.23"
glib = "0.17"
tracing-appender = "0.2"
clap = { version = "4.0", features = ["derive"] }
chrono = "0.4"
directories = "5.0"
flate2 = "1.0"

kos-sim = { version = "0.1.0", path = "../platforms/sim", optional = true }
kos-stub = { version = "0.1.0", path = "../platforms/stub", optional = true }
Expand Down
176 changes: 176 additions & 0 deletions daemon/src/file_logging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use chrono::Local;
use directories::BaseDirs;
use eyre::Result;
use flate2::write::GzEncoder;
use flate2::Compression;
use std::fs::File;
use std::io::{self, BufWriter, Write};
use std::path::PathBuf;
use tracing::{info, error};
use tracing_subscriber::prelude::*;
use tracing_subscriber::{filter::EnvFilter, Layer};

pub struct CompressedWriter {
encoder: Option<GzEncoder<BufWriter<File>>>,
path: PathBuf,
}

impl CompressedWriter {
pub fn new(path: impl AsRef<std::path::Path>) -> io::Result<Self> {
let file = File::create(path.as_ref())?;
let buffered = BufWriter::new(file);
Ok(Self {
encoder: Some(GzEncoder::new(buffered, Compression::new(6))),
path: path.as_ref().to_path_buf(),
})
}

fn sync(&mut self) -> io::Result<()> {
if let Some(encoder) = &mut self.encoder {
encoder.flush()?;
let buf_writer = encoder.get_mut();
buf_writer.flush()?;
let file = buf_writer.get_mut();
file.sync_all()?;
}
Ok(())
}

pub fn finalize(&mut self) -> io::Result<()> {
if let Some(encoder) = self.encoder.take() {
info!("Finalizing compressed log {}", self.path.display());
// Finish the compression and get the BufWriter back
let mut buf_writer = encoder.finish()?;
// Flush the buffer
buf_writer.flush()?;
// Sync to disk
buf_writer.get_mut().sync_all()?;
info!("Successfully finalized log {}", self.path.display());
}
Ok(())
}
}

impl Write for CompressedWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Some(encoder) = &mut self.encoder {
match encoder.write(buf) {
Ok(size) => {
if size > 0 && buf.contains(&b'\n') {
self.sync()?;
}
Ok(size)
}
Err(e) => {
error!(
"Failed to write to compressed log {}: {}",
self.path.display(),
e
);
Err(e)
}
}
} else {
error!(
"Attempted to write to finalized log {}",
self.path.display()
);
Err(io::Error::new(
io::ErrorKind::Other,
"Writer has been finalized",
))
}
}

fn flush(&mut self) -> io::Result<()> {
self.sync()
}
}

impl Drop for CompressedWriter {
fn drop(&mut self) {
if let Err(e) = self.finalize() {
error!(
"Failed to finalize compressed log {}: {}",
self.path.display(),
e
);
}
}
}

pub fn setup_logging(
enable_file_logging: bool,
log_level: &str,
) -> Result<Option<tracing_appender::non_blocking::WorkerGuard>> {
let level_filter = match log_level.to_lowercase().as_str() {
"trace" => tracing_subscriber::filter::LevelFilter::TRACE,
"debug" => tracing_subscriber::filter::LevelFilter::DEBUG,
"info" => tracing_subscriber::filter::LevelFilter::INFO,
"warn" => tracing_subscriber::filter::LevelFilter::WARN,
"error" => tracing_subscriber::filter::LevelFilter::ERROR,
_ => {
eprintln!("Invalid log level '{}', defaulting to 'info'", log_level);
tracing_subscriber::filter::LevelFilter::INFO
}
};

let subscriber = tracing_subscriber::registry();

// Update stdout layer to use the specified level
let stdout_layer = tracing_subscriber::fmt::layer()
.with_writer(std::io::stdout)
.with_filter(
EnvFilter::from_default_env()
.add_directive(format!("kos={}", log_level).parse().unwrap())
.add_directive("h2=error".parse().unwrap())
.add_directive("grpc=error".parse().unwrap())
.add_directive("rumqttc=error".parse().unwrap())
.add_directive("kos_core::telemetry=error".parse().unwrap())
.add_directive("polling=error".parse().unwrap())
.add_directive("async_io=error".parse().unwrap())
.add_directive("krec=error".parse().unwrap()),
);

let subscriber = subscriber.with(stdout_layer);

if enable_file_logging {
let log_dir = if let Some(base_dirs) = BaseDirs::new() {
base_dirs.data_local_dir().join("kos").join("logs")
} else {
PathBuf::from("~/.local/share/kos/logs")
};

std::fs::create_dir_all(&log_dir)?;

let timestamp = Local::now().format("%Y%m%d_%H%M%S");
let final_name = format!("kos-daemon_{}.log.gz", timestamp);
let log_path = log_dir.join(&final_name);

let compressed_writer = CompressedWriter::new(&log_path)?;
let (non_blocking, guard) = tracing_appender::non_blocking(compressed_writer);

let file_layer = tracing_subscriber::fmt::layer()
.with_thread_ids(true)
.with_target(true)
.with_file(true)
.with_line_number(true)
.with_writer(non_blocking)
.with_filter(level_filter);

subscriber.with(file_layer).init();
Ok(Some(guard))
} else {
subscriber.init();
Ok(None)
}
}

pub fn cleanup_logging(guard: Option<tracing_appender::non_blocking::WorkerGuard>) {
if let Some(guard) = guard {
// Ensure we flush any pending writes before dropping
drop(guard);
// Give a small amount of time for the worker to finish
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
84 changes: 70 additions & 14 deletions daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// This will run the gRPC server and, if applicable, a runtime loop
// (e.g., actuator polling, loaded model inference).

use clap::Parser;
use eyre::Result;
use kos_core::google_proto::longrunning::operations_server::OperationsServer;
use kos_core::services::OperationsServiceImpl;
Expand All @@ -10,10 +11,13 @@ use kos_core::Platform;
use kos_core::ServiceEnum;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::signal;
use tokio::sync::Mutex;
use tonic::transport::Server;
use tracing::{debug, error, info};
use tracing_subscriber::filter::EnvFilter;
use tracing_subscriber::prelude::*;
use tracing_subscriber::Layer;

#[cfg(not(any(feature = "kos-sim", feature = "kos-zeroth-01", feature = "kos-kbot")))]
use kos_stub::StubPlatform as PlatformImpl;
Expand All @@ -27,6 +31,21 @@ use kos_zeroth_01::Zeroth01Platform as PlatformImpl;
#[cfg(feature = "kos-kbot")]
use kos_kbot::KbotPlatform as PlatformImpl;

mod file_logging;
use file_logging::{setup_logging, cleanup_logging};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Enable file logging
#[arg(long, default_value_t = false)]
log: bool,

/// Log level (trace, debug, info, warn, error)
#[arg(long, default_value = "info")]
log_level: String,
}

fn add_service_to_router(
router: tonic::transport::server::Router,
service: ServiceEnum,
Expand All @@ -46,7 +65,7 @@ async fn run_server(
let addr = "0.0.0.0:50051".parse()?;
let mut server_builder = Server::builder();

let services = platform.create_services(operations_service.clone())?;
let services = platform.create_services(operations_service.clone()).await?;

let operations_service = OperationsServer::new(operations_service);

Expand All @@ -63,24 +82,53 @@ async fn run_server(
Ok(())
}

struct DaemonState {
_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
platform: PlatformImpl,
}

#[tokio::main]
async fn main() -> Result<()> {
// logging
tracing_subscriber::fmt()
.with_env_filter(
let args = Args::parse();

// tracing
let subscriber = tracing_subscriber::registry();

// Always add stdout layer
let stdout_layer = tracing_subscriber::fmt::layer()
.with_writer(std::io::stdout)
.with_filter(
EnvFilter::from_default_env()
.add_directive("h2=error".parse().unwrap())
.add_directive("grpc=error".parse().unwrap())
.add_directive("rumqttc=error".parse().unwrap())
.add_directive("kos_core::telemetry=error".parse().unwrap()),
)
.init();
.add_directive("kos_core::telemetry=error".parse().unwrap())
.add_directive("polling=error".parse().unwrap())
.add_directive("async_io=error".parse().unwrap())
.add_directive("krec=error".parse().unwrap()),
);

let _subscriber = subscriber.with(stdout_layer);

let guard = setup_logging(args.log, &args.log_level)?;

let mut platform = PlatformImpl::new();
let mut state = DaemonState {
_guard: guard,
platform: PlatformImpl::new(),
};

// Setup signal handler
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();

tokio::spawn(async move {
if let Ok(()) = signal::ctrl_c().await {
let _ = shutdown_tx.send(());
}
});

// telemetry
Telemetry::initialize(
format!("{}-{}", platform.name(), platform.serial()).as_str(),
format!("{}-{}", state.platform.name(), state.platform.serial()).as_str(),
"localhost",
1883,
)
Expand All @@ -89,11 +137,19 @@ async fn main() -> Result<()> {
let operations_store = Arc::new(Mutex::new(HashMap::new()));
let operations_service = Arc::new(OperationsServiceImpl::new(operations_store));

platform.initialize(operations_service.clone())?;

if let Err(e) = run_server(&platform, operations_service).await {
error!("Server error: {:?}", e);
std::process::exit(1);
state.platform.initialize(operations_service.clone())?;

tokio::select! {
res = run_server(&state.platform, operations_service) => {
if let Err(e) = res {
error!("Server error: {:?}", e);
std::process::exit(1);
}
}
_ = shutdown_rx => {
info!("Received shutdown signal, cleaning up...");
cleanup_logging(state._guard.take());
}
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion kos_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ eyre = "0.6"
hyper = "0.14"
tracing = "0.1"
lazy_static = "1.4"
krec = "0.1"
krec = "0.2"

[build-dependencies]
tonic-build = "0.12"
Expand Down
1 change: 1 addition & 0 deletions kos_core/proto/kos/actuator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ message ConfigureActuatorRequest {
optional float protection_time = 7; // Protection time in seconds
optional bool torque_enabled = 8; // Torque enabled flag
optional uint32 new_actuator_id = 9; // New actuator ID
optional bool zero_position = 10; // Instant zero position
}

// Request message for CalibrateActuator.
Expand Down
Loading

0 comments on commit 46bb601

Please sign in to comment.