diff --git a/formica.Dockerfile b/formica.Dockerfile index 36aee48..462e822 100644 --- a/formica.Dockerfile +++ b/formica.Dockerfile @@ -44,7 +44,9 @@ RUN apt-get -y update && apt-get -y install nginx # Run the node CMD ["sh", "-c", \ "echo \"server { listen ${METRICS_PORT}; server_name localhost; location /metrics { proxy_pass http://127.0.0.1:9090/metrics; include /etc/nginx/proxy_params; } }\" > /etc/nginx/sites-available/default \ - && nginx && /app/antnode --home-network \ + && nginx \ + && if [ -e '/app/node_data/secret-key-recycle' ]; then rm -f /app/node_data/secret-key*; fi \ + && /app/antnode --home-network \ --port ${NODE_PORT} \ --metrics-server-port 9090 \ --root-dir /app/node_data \ diff --git a/src/bg_tasks.rs b/src/bg_tasks.rs index 6c0891f..b3ac507 100644 --- a/src/bg_tasks.rs +++ b/src/bg_tasks.rs @@ -339,9 +339,7 @@ async fn latest_version_available() -> Option { } // Fetch up to date information for each active node instance -// from different sources caching them in global context: -// - Nodes' RPC API to get binary version and peer id. -// - Nodes' exposed metrics server to obtain stats. +// from nodes' exposed metrics server caching them in global context. async fn update_nodes_info( docker_client: &DockerClient, nodes_metrics: &Arc>, diff --git a/src/docker_client.rs b/src/docker_client.rs index 8f90b3a..de248fb 100644 --- a/src/docker_client.rs +++ b/src/docker_client.rs @@ -59,8 +59,8 @@ pub enum DockerClientError { CointainerNotFound(ContainerId), #[error("Image not found locally")] ImageNotFound, - #[error("Docker server error: {0}")] - DockerServerError(String), + #[error("Docker server error: {} - {}", 0.0, 0.1)] + DockerServerError((u16, String)), #[error(transparent)] SerdeJson(#[from] serde_json::Error), #[error(transparent)] @@ -73,10 +73,21 @@ pub enum DockerClientError { #[derive(Clone)] enum ReqMethod { Get, - Post, + Post(String), + Put(Vec), Delete, } +impl ReqMethod { + fn post(body: &T) -> Result { + Ok(Self::Post(serde_json::to_string(body)?)) + } + + fn post_empty_body() -> Self { + Self::Post("".to_string()) + } +} + // Client to send requests to a Docker server's API #[derive(Clone, Debug)] pub struct DockerClient { @@ -148,7 +159,7 @@ impl DockerClient { ("all", all_str.as_str()), ("filters", &serde_json::to_string(filters)?), ]; - let resp_bytes = self.send_request(ReqMethod::Get, &url, query, &()).await?; + let resp_bytes = self.send_request(ReqMethod::Get, &url, query).await?; let containers: Vec = serde_json::from_slice(&resp_bytes)?; Ok(containers) } @@ -158,8 +169,7 @@ impl DockerClient { let url = format!("{DOCKER_CONTAINERS_API}/{id}"); logging::log!("[DELETE] Sending Docker request to DELETE containers: {url} ..."); let query = &[("force", "true")]; - self.send_request(ReqMethod::Delete, &url, query, &()) - .await?; + self.send_request(ReqMethod::Delete, &url, query).await?; Ok(()) } @@ -170,7 +180,8 @@ impl DockerClient { ) -> Result<(Option, Option), DockerClientError> { let url = format!("{DOCKER_CONTAINERS_API}/{id}/start"); logging::log!("[START] Sending Docker request to START a container: {url} ..."); - self.send_request(ReqMethod::Post, &url, &[], &()).await?; + self.send_request(ReqMethod::post_empty_body(), &url, &[]) + .await?; let url = format!("{DOCKER_CONTAINERS_API}/{id}/update"); logging::log!( @@ -182,10 +193,10 @@ impl DockerClient { MaximumRetryCount: Some(5), }), }; - self.send_request(ReqMethod::Post, &url, &[], &container_update_req) + self.send_request(ReqMethod::post(&container_update_req)?, &url, &[]) .await?; - // let's try to retrieve new version, forget it if there is any error + // let's try to retrieve new version self.get_node_version_and_peer_id(id).await } @@ -193,7 +204,9 @@ impl DockerClient { pub async fn stop_container(&self, id: &ContainerId) -> Result<(), DockerClientError> { let url = format!("{DOCKER_CONTAINERS_API}/{id}/stop"); logging::log!("[STOP] Sending Docker request to STOP a container: {url} ..."); - self.send_request(ReqMethod::Post, &url, &[], &()).await?; + self.send_request(ReqMethod::post_empty_body(), &url, &[]) + .await?; + Ok(()) } @@ -261,10 +274,9 @@ impl DockerClient { ); let resp_bytes = self .send_request( - ReqMethod::Post, + ReqMethod::post(&container_create_req)?, &url, &[("name", &random_name)], - &container_create_req, ) .await?; let container: ContainerCreateExecSuccess = serde_json::from_slice(&resp_bytes)?; @@ -292,7 +304,7 @@ impl DockerClient { Tty: Some(false), }; let resp_bytes = self - .send_request(ReqMethod::Post, &url, &[], &exec_cmd) + .send_request(ReqMethod::post(&exec_cmd)?, &url, &[]) .await?; let exec_result: ContainerCreateExecSuccess = serde_json::from_slice(&resp_bytes)?; logging::log!("Cmd to stream logs created successfully: {exec_result:#?}"); @@ -305,7 +317,7 @@ impl DockerClient { Tty: Some(true), }; - self.send_request_and_return_stream(ReqMethod::Post, &url, &[], &opts) + self.send_request_and_return_stream(ReqMethod::post(&opts)?, &url, &[]) .await } @@ -327,13 +339,13 @@ impl DockerClient { // let's check its exit code let url = format!("{DOCKER_EXEC_API}/{exec_id}/json"); - let resp_bytes = self.send_request(ReqMethod::Get, &url, &[], &()).await?; + let resp_bytes = self.send_request(ReqMethod::Get, &url, &[]).await?; let exec: ContainerExecJson = serde_json::from_slice(&resp_bytes)?; logging::log!("Container exec: {exec:#?}"); if exec.ExitCode != 0 { let error_msg = format!("Failed to upgrade node, exit code: {}", exec.ExitCode); logging::log!("{error_msg}"); - return Err(DockerClientError::DockerServerError(error_msg)); + return Err(DockerClientError::DockerServerError((exec.ExitCode.into(), error_msg))); } } } @@ -345,15 +357,13 @@ impl DockerClient { .unwrap_or_default(); // restart container to run with new node version - let url = format!("{DOCKER_CONTAINERS_API}/{id}/restart"); - logging::log!("[RESTART] Sending Docker request to RESTART a container: {url} ..."); - self.send_request(ReqMethod::Post, &url, &[], &()).await?; + self.restart_container(id).await?; Ok(new_version) } // Retrieve version of the node binary and its peer id - async fn get_node_version_and_peer_id( + pub async fn get_node_version_and_peer_id( &self, id: &ContainerId, ) -> Result<(Option, Option), DockerClientError> { @@ -385,18 +395,43 @@ impl DockerClient { Ok((version, peer_id)) } - // Clears the node's PeerId within the containver with given id - pub async fn clear_peer_id_in_container( + // Clears the node's PeerId within the containver and restarts it + pub async fn regenerate_peer_id_in_container( &self, id: &ContainerId, - ) -> Result<(), DockerClientError> { + ) -> Result<(Option, Option), DockerClientError> { logging::log!("[RECYCLE] Recycling container by clearing node's peer-id ..."); - let cmd = "rm ./node_data/secret-key".to_string(); - let _ = self - .exec_in_container(id, cmd, "clear node peer-id") + // we write an empty file at '/app/node_data/secret-key-recycle' so the container removes + // existing secret-key file before invoking the node binary upon restarting the container. + let url = format!("{DOCKER_CONTAINERS_API}/{id}/archive"); + let query = &[("path", "/app/node_data")]; + let empty_file_tar_bzip2 = vec![ + 66, 90, 104, 57, 49, 65, 89, 38, 83, 89, 124, 173, 200, 126, 0, 0, 128, 125, 128, 192, + 128, 2, 0, 64, 2, 127, 128, 0, 1, 122, 76, 158, 32, 16, 8, 32, 0, 116, 26, 9, 54, 166, + 129, 161, 233, 1, 166, 130, 74, 105, 169, 160, 0, 208, 0, 222, 116, 252, 36, 17, 116, + 33, 23, 57, 11, 41, 139, 149, 193, 36, 13, 6, 229, 187, 245, 242, 36, 41, 201, 48, 51, + 18, 125, 214, 6, 50, 131, 36, 93, 82, 1, 224, 204, 194, 33, 71, 99, 52, 125, 132, 249, + 20, 30, 214, 130, 181, 30, 25, 84, 94, 171, 107, 97, 43, 37, 211, 34, 32, 63, 23, 114, + 69, 56, 80, 144, 124, 173, 200, 126, + ]; + self.send_request(ReqMethod::Put(empty_file_tar_bzip2), &url, query) + .await?; + + // restart container to obtain a new peer-id + self.restart_container(id).await?; + + logging::log!("Finished recycling node container: {id}"); + + self.get_node_version_and_peer_id(id).await + } + + // Restart the container wich has given id + async fn restart_container(&self, id: &ContainerId) -> Result<(), DockerClientError> { + let url = format!("{DOCKER_CONTAINERS_API}/{id}/restart"); + logging::log!("[RESTART] Sending Docker request to RESTART a container: {url} ..."); + self.send_request(ReqMethod::post_empty_body(), &url, &[]) .await?; - logging::log!("Finished removing node's peer-id in container: {id}"); Ok(()) } @@ -416,7 +451,7 @@ impl DockerClient { Tty: Some(false), }; let resp_bytes = self - .send_request(ReqMethod::Post, &url, &[], &exec_cmd) + .send_request(ReqMethod::post(&exec_cmd)?, &url, &[]) .await?; let exec_result: ContainerCreateExecSuccess = serde_json::from_slice(&resp_bytes)?; logging::log!("Cmd to {cmd_desc} created successfully: {exec_result:#?}"); @@ -427,7 +462,9 @@ impl DockerClient { Detach: Some(false), Tty: Some(true), }; - let resp_bytes = self.send_request(ReqMethod::Post, &url, &[], &opts).await?; + let resp_bytes = self + .send_request(ReqMethod::post(&opts)?, &url, &[]) + .await?; let resp_str = String::from_utf8_lossy(&resp_bytes).to_string(); Ok((exec_id, resp_str)) @@ -435,17 +472,13 @@ impl DockerClient { // Send request to Docker server, pulling the formica image // if necessary before retrying. - async fn send_request( + async fn send_request( &self, method: ReqMethod, url: &str, query: &[(&str, &str)], - body: &T, ) -> Result, DockerClientError> { - let resp = match self - .try_send_request(method.clone(), url, query, body) - .await - { + let resp = match self.try_send_request(&method, url, query).await { Err(DockerClientError::ImageNotFound) => { logging::log!( "We need to pull the formica image: {}.", @@ -453,7 +486,7 @@ impl DockerClient { ); // let's pull the image before retrying self.pull_formica_image().await?; - self.try_send_request(method, url, query, body).await + self.try_send_request(&method, url, query).await } other => other, }?; @@ -463,17 +496,13 @@ impl DockerClient { // Send request to Docker server, pulling the formica image // if necessary before retrying, and returning the response as a stream. - async fn send_request_and_return_stream( + async fn send_request_and_return_stream( &self, method: ReqMethod, url: &str, query: &[(&str, &str)], - body: &T, ) -> Result>, DockerClientError> { - let resp = match self - .try_send_request(method.clone(), url, query, body) - .await - { + let resp = match self.try_send_request(&method, url, query).await { Err(DockerClientError::ImageNotFound) => { logging::log!( "We need to pull the formica image: {} ...", @@ -481,7 +510,7 @@ impl DockerClient { ); // let's pull the image before retrying self.pull_formica_image().await?; - self.try_send_request(method, url, query, body).await + self.try_send_request(&method, url, query).await } other => other, }?; @@ -502,7 +531,7 @@ impl DockerClient { ("tag", self.node_image_tag.as_str()), ]; let resp = self - .try_send_request(ReqMethod::Post, &url, query, &()) + .try_send_request(&ReqMethod::post_empty_body(), &url, query) .await?; // consume and await end of response stream, discarding the bytes @@ -514,12 +543,11 @@ impl DockerClient { } // Send request to Docker server - async fn try_send_request( + async fn try_send_request( &self, - method: ReqMethod, + method: &ReqMethod, base_url: &str, query_params: &[(&str, &str)], - body: &T, ) -> Result, DockerClientError> { let unix_stream = UnixStream::connect(&self.socket_path) .await @@ -542,21 +570,31 @@ impl DockerClient { .extend_pairs(query_params) .finish(); - // Serialize the body to JSON - let json_body = serde_json::to_string(body)?; // Construct the full URL with query parameters - let full_url = format!("{}?{}", base_url, query_string); + let full_url = format!("{base_url}?{query_string}"); let req_builder = Request::builder() .uri(full_url) - .header("Host", "localhost") // Host added just because http1 requires it - .header("Content-Type", "application/json"); - let req_builder = match method { - ReqMethod::Post => req_builder.method(Method::POST), - ReqMethod::Get => req_builder.method(Method::GET), - ReqMethod::Delete => req_builder.method(Method::DELETE), + // Host added just because http1 requires it + .header("Host", "localhost"); + + let req = match method { + ReqMethod::Post(body_str) => req_builder + .method(Method::POST) + .header("Content-Type", "application/json") + .body(axum::body::Body::from(body_str.clone()))?, + ReqMethod::Put(bytes) => req_builder + .header(hyper::header::CONTENT_TYPE, "application/octet-stream") + .header(hyper::header::CONTENT_LENGTH, bytes.len()) + .method(Method::PUT) + .body(axum::body::Body::from(bytes.clone()))?, + ReqMethod::Get => req_builder + .method(Method::GET) + .body(axum::body::Body::from(()))?, + ReqMethod::Delete => req_builder + .method(Method::DELETE) + .body(axum::body::Body::from(()))?, }; - let req = req_builder.body(json_body)?; let resp = docker_reqs_sender.send_request(req).await?; @@ -571,7 +609,10 @@ impl DockerClient { if msg.message.starts_with("No such image") { Err(DockerClientError::ImageNotFound) } else { - Err(DockerClientError::DockerServerError(msg.message)) + Err(DockerClientError::DockerServerError(( + StatusCode::NOT_FOUND.into(), + msg.message, + ))) } } other => { @@ -581,7 +622,7 @@ impl DockerClient { Err(_) => String::from_utf8_lossy(&resp_bytes).to_string(), }; logging::log!("ERROR: {other:?} - {msg}"); - Err(DockerClientError::DockerServerError(msg)) + Err(DockerClientError::DockerServerError((other.into(), msg))) } } } diff --git a/src/node_actions.rs b/src/node_actions.rs index 351231e..cc45a30 100644 --- a/src/node_actions.rs +++ b/src/node_actions.rs @@ -78,7 +78,7 @@ impl NodeAction { res } Self::Recycle => { - if !previous_status.is_active() { + if previous_status.is_transitioning() || info.read_untracked().peer_id.is_none() { return; } info.update(|node| node.status = NodeStatus::Recycling); diff --git a/src/nodes_list_view.rs b/src/nodes_list_view.rs index 40bc340..4f59da5 100644 --- a/src/nodes_list_view.rs +++ b/src/nodes_list_view.rs @@ -649,7 +649,9 @@ fn ButtonRecycle(info: RwSignal) -> impl IntoView {