Skip to content

Commit

Permalink
feat: capability to recycle a node even when inactive/stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
bochaco committed Dec 20, 2024
1 parent d02f5af commit 0aa057d
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 77 deletions.
4 changes: 3 additions & 1 deletion formica.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ RUN apt-get -y update && apt-get -y install nginx
# Run the node
CMD ["sh", "-c", \
"echo \"server { listen ${METRICS_PORT}; server_name localhost; location /metrics { proxy_pass http://127.0.0.1:9090/metrics; include /etc/nginx/proxy_params; } }\" > /etc/nginx/sites-available/default \
&& nginx && /app/antnode --home-network \
&& nginx \
&& if [ -e '/app/node_data/secret-key-recycle' ]; then rm -f /app/node_data/secret-key*; fi \
&& /app/antnode --home-network \
--port ${NODE_PORT} \
--metrics-server-port 9090 \
--root-dir /app/node_data \
Expand Down
4 changes: 1 addition & 3 deletions src/bg_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,7 @@ async fn latest_version_available() -> Option<String> {
}

// Fetch up to date information for each active node instance
// from different sources caching them in global context:
// - Nodes' RPC API to get binary version and peer id.
// - Nodes' exposed metrics server to obtain stats.
// from nodes' exposed metrics server caching them in global context.
async fn update_nodes_info(
docker_client: &DockerClient,
nodes_metrics: &Arc<Mutex<NodesMetrics>>,
Expand Down
159 changes: 100 additions & 59 deletions src/docker_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ pub enum DockerClientError {
CointainerNotFound(ContainerId),
#[error("Image not found locally")]
ImageNotFound,
#[error("Docker server error: {0}")]
DockerServerError(String),
#[error("Docker server error: {} - {}", 0.0, 0.1)]
DockerServerError((u16, String)),
#[error(transparent)]
SerdeJson(#[from] serde_json::Error),
#[error(transparent)]
Expand All @@ -73,10 +73,21 @@ pub enum DockerClientError {
#[derive(Clone)]
enum ReqMethod {
Get,
Post,
Post(String),
Put(Vec<u8>),
Delete,
}

impl ReqMethod {
fn post<T: Serialize>(body: &T) -> Result<Self, DockerClientError> {
Ok(Self::Post(serde_json::to_string(body)?))
}

fn post_empty_body() -> Self {
Self::Post("".to_string())
}
}

// Client to send requests to a Docker server's API
#[derive(Clone, Debug)]
pub struct DockerClient {
Expand Down Expand Up @@ -148,7 +159,7 @@ impl DockerClient {
("all", all_str.as_str()),
("filters", &serde_json::to_string(filters)?),
];
let resp_bytes = self.send_request(ReqMethod::Get, &url, query, &()).await?;
let resp_bytes = self.send_request(ReqMethod::Get, &url, query).await?;
let containers: Vec<Container> = serde_json::from_slice(&resp_bytes)?;
Ok(containers)
}
Expand All @@ -158,8 +169,7 @@ impl DockerClient {
let url = format!("{DOCKER_CONTAINERS_API}/{id}");
logging::log!("[DELETE] Sending Docker request to DELETE containers: {url} ...");
let query = &[("force", "true")];
self.send_request(ReqMethod::Delete, &url, query, &())
.await?;
self.send_request(ReqMethod::Delete, &url, query).await?;
Ok(())
}

Expand All @@ -170,7 +180,8 @@ impl DockerClient {
) -> Result<(Option<String>, Option<String>), DockerClientError> {
let url = format!("{DOCKER_CONTAINERS_API}/{id}/start");
logging::log!("[START] Sending Docker request to START a container: {url} ...");
self.send_request(ReqMethod::Post, &url, &[], &()).await?;
self.send_request(ReqMethod::post_empty_body(), &url, &[])
.await?;

let url = format!("{DOCKER_CONTAINERS_API}/{id}/update");
logging::log!(
Expand All @@ -182,18 +193,20 @@ impl DockerClient {
MaximumRetryCount: Some(5),
}),
};
self.send_request(ReqMethod::Post, &url, &[], &container_update_req)
self.send_request(ReqMethod::post(&container_update_req)?, &url, &[])
.await?;

// let's try to retrieve new version, forget it if there is any error
// let's try to retrieve new version
self.get_node_version_and_peer_id(id).await
}

// Request the Docker server to STOP a container matching the given id
pub async fn stop_container(&self, id: &ContainerId) -> Result<(), DockerClientError> {
let url = format!("{DOCKER_CONTAINERS_API}/{id}/stop");
logging::log!("[STOP] Sending Docker request to STOP a container: {url} ...");
self.send_request(ReqMethod::Post, &url, &[], &()).await?;
self.send_request(ReqMethod::post_empty_body(), &url, &[])
.await?;

Ok(())
}

Expand Down Expand Up @@ -261,10 +274,9 @@ impl DockerClient {
);
let resp_bytes = self
.send_request(
ReqMethod::Post,
ReqMethod::post(&container_create_req)?,
&url,
&[("name", &random_name)],
&container_create_req,
)
.await?;
let container: ContainerCreateExecSuccess = serde_json::from_slice(&resp_bytes)?;
Expand Down Expand Up @@ -292,7 +304,7 @@ impl DockerClient {
Tty: Some(false),
};
let resp_bytes = self
.send_request(ReqMethod::Post, &url, &[], &exec_cmd)
.send_request(ReqMethod::post(&exec_cmd)?, &url, &[])
.await?;
let exec_result: ContainerCreateExecSuccess = serde_json::from_slice(&resp_bytes)?;
logging::log!("Cmd to stream logs created successfully: {exec_result:#?}");
Expand All @@ -305,7 +317,7 @@ impl DockerClient {
Tty: Some(true),
};

self.send_request_and_return_stream(ReqMethod::Post, &url, &[], &opts)
self.send_request_and_return_stream(ReqMethod::post(&opts)?, &url, &[])
.await
}

Expand All @@ -327,13 +339,13 @@ impl DockerClient {

// let's check its exit code
let url = format!("{DOCKER_EXEC_API}/{exec_id}/json");
let resp_bytes = self.send_request(ReqMethod::Get, &url, &[], &()).await?;
let resp_bytes = self.send_request(ReqMethod::Get, &url, &[]).await?;
let exec: ContainerExecJson = serde_json::from_slice(&resp_bytes)?;
logging::log!("Container exec: {exec:#?}");
if exec.ExitCode != 0 {
let error_msg = format!("Failed to upgrade node, exit code: {}", exec.ExitCode);
logging::log!("{error_msg}");
return Err(DockerClientError::DockerServerError(error_msg));
return Err(DockerClientError::DockerServerError((exec.ExitCode.into(), error_msg)));
}
}
}
Expand All @@ -345,15 +357,13 @@ impl DockerClient {
.unwrap_or_default();

// restart container to run with new node version
let url = format!("{DOCKER_CONTAINERS_API}/{id}/restart");
logging::log!("[RESTART] Sending Docker request to RESTART a container: {url} ...");
self.send_request(ReqMethod::Post, &url, &[], &()).await?;
self.restart_container(id).await?;

Ok(new_version)
}

// Retrieve version of the node binary and its peer id
async fn get_node_version_and_peer_id(
pub async fn get_node_version_and_peer_id(
&self,
id: &ContainerId,
) -> Result<(Option<String>, Option<String>), DockerClientError> {
Expand Down Expand Up @@ -385,18 +395,43 @@ impl DockerClient {
Ok((version, peer_id))
}

// Clears the node's PeerId within the containver with given id
pub async fn clear_peer_id_in_container(
// Clears the node's PeerId within the containver and restarts it
pub async fn regenerate_peer_id_in_container(
&self,
id: &ContainerId,
) -> Result<(), DockerClientError> {
) -> Result<(Option<String>, Option<String>), DockerClientError> {
logging::log!("[RECYCLE] Recycling container by clearing node's peer-id ...");

let cmd = "rm ./node_data/secret-key".to_string();
let _ = self
.exec_in_container(id, cmd, "clear node peer-id")
// we write an empty file at '/app/node_data/secret-key-recycle' so the container removes
// existing secret-key file before invoking the node binary upon restarting the container.
let url = format!("{DOCKER_CONTAINERS_API}/{id}/archive");
let query = &[("path", "/app/node_data")];
let empty_file_tar_bzip2 = vec![
66, 90, 104, 57, 49, 65, 89, 38, 83, 89, 124, 173, 200, 126, 0, 0, 128, 125, 128, 192,
128, 2, 0, 64, 2, 127, 128, 0, 1, 122, 76, 158, 32, 16, 8, 32, 0, 116, 26, 9, 54, 166,
129, 161, 233, 1, 166, 130, 74, 105, 169, 160, 0, 208, 0, 222, 116, 252, 36, 17, 116,
33, 23, 57, 11, 41, 139, 149, 193, 36, 13, 6, 229, 187, 245, 242, 36, 41, 201, 48, 51,
18, 125, 214, 6, 50, 131, 36, 93, 82, 1, 224, 204, 194, 33, 71, 99, 52, 125, 132, 249,
20, 30, 214, 130, 181, 30, 25, 84, 94, 171, 107, 97, 43, 37, 211, 34, 32, 63, 23, 114,
69, 56, 80, 144, 124, 173, 200, 126,
];
self.send_request(ReqMethod::Put(empty_file_tar_bzip2), &url, query)
.await?;

// restart container to obtain a new peer-id
self.restart_container(id).await?;

logging::log!("Finished recycling node container: {id}");

self.get_node_version_and_peer_id(id).await
}

// Restart the container wich has given id
async fn restart_container(&self, id: &ContainerId) -> Result<(), DockerClientError> {
let url = format!("{DOCKER_CONTAINERS_API}/{id}/restart");
logging::log!("[RESTART] Sending Docker request to RESTART a container: {url} ...");
self.send_request(ReqMethod::post_empty_body(), &url, &[])
.await?;
logging::log!("Finished removing node's peer-id in container: {id}");
Ok(())
}

Expand All @@ -416,7 +451,7 @@ impl DockerClient {
Tty: Some(false),
};
let resp_bytes = self
.send_request(ReqMethod::Post, &url, &[], &exec_cmd)
.send_request(ReqMethod::post(&exec_cmd)?, &url, &[])
.await?;
let exec_result: ContainerCreateExecSuccess = serde_json::from_slice(&resp_bytes)?;
logging::log!("Cmd to {cmd_desc} created successfully: {exec_result:#?}");
Expand All @@ -427,33 +462,31 @@ impl DockerClient {
Detach: Some(false),
Tty: Some(true),
};
let resp_bytes = self.send_request(ReqMethod::Post, &url, &[], &opts).await?;
let resp_bytes = self
.send_request(ReqMethod::post(&opts)?, &url, &[])
.await?;
let resp_str = String::from_utf8_lossy(&resp_bytes).to_string();

Ok((exec_id, resp_str))
}

// Send request to Docker server, pulling the formica image
// if necessary before retrying.
async fn send_request<T: Serialize + ?Sized>(
async fn send_request(
&self,
method: ReqMethod,
url: &str,
query: &[(&str, &str)],
body: &T,
) -> Result<Vec<u8>, DockerClientError> {
let resp = match self
.try_send_request(method.clone(), url, query, body)
.await
{
let resp = match self.try_send_request(&method, url, query).await {
Err(DockerClientError::ImageNotFound) => {
logging::log!(
"We need to pull the formica image: {}.",
self.node_image_name
);
// let's pull the image before retrying
self.pull_formica_image().await?;
self.try_send_request(method, url, query, body).await
self.try_send_request(&method, url, query).await
}
other => other,
}?;
Expand All @@ -463,25 +496,21 @@ impl DockerClient {

// Send request to Docker server, pulling the formica image
// if necessary before retrying, and returning the response as a stream.
async fn send_request_and_return_stream<T: Serialize + ?Sized>(
async fn send_request_and_return_stream(
&self,
method: ReqMethod,
url: &str,
query: &[(&str, &str)],
body: &T,
) -> Result<impl Stream<Item = Result<Bytes, DockerClientError>>, DockerClientError> {
let resp = match self
.try_send_request(method.clone(), url, query, body)
.await
{
let resp = match self.try_send_request(&method, url, query).await {
Err(DockerClientError::ImageNotFound) => {
logging::log!(
"We need to pull the formica image: {} ...",
self.node_image_name
);
// let's pull the image before retrying
self.pull_formica_image().await?;
self.try_send_request(method, url, query, body).await
self.try_send_request(&method, url, query).await
}
other => other,
}?;
Expand All @@ -502,7 +531,7 @@ impl DockerClient {
("tag", self.node_image_tag.as_str()),
];
let resp = self
.try_send_request(ReqMethod::Post, &url, query, &())
.try_send_request(&ReqMethod::post_empty_body(), &url, query)
.await?;

// consume and await end of response stream, discarding the bytes
Expand All @@ -514,12 +543,11 @@ impl DockerClient {
}

// Send request to Docker server
async fn try_send_request<T: Serialize + ?Sized>(
async fn try_send_request(
&self,
method: ReqMethod,
method: &ReqMethod,
base_url: &str,
query_params: &[(&str, &str)],
body: &T,
) -> Result<Response<Incoming>, DockerClientError> {
let unix_stream = UnixStream::connect(&self.socket_path)
.await
Expand All @@ -542,21 +570,31 @@ impl DockerClient {
.extend_pairs(query_params)
.finish();

// Serialize the body to JSON
let json_body = serde_json::to_string(body)?;
// Construct the full URL with query parameters
let full_url = format!("{}?{}", base_url, query_string);
let full_url = format!("{base_url}?{query_string}");

let req_builder = Request::builder()
.uri(full_url)
.header("Host", "localhost") // Host added just because http1 requires it
.header("Content-Type", "application/json");
let req_builder = match method {
ReqMethod::Post => req_builder.method(Method::POST),
ReqMethod::Get => req_builder.method(Method::GET),
ReqMethod::Delete => req_builder.method(Method::DELETE),
// Host added just because http1 requires it
.header("Host", "localhost");

let req = match method {
ReqMethod::Post(body_str) => req_builder
.method(Method::POST)
.header("Content-Type", "application/json")
.body(axum::body::Body::from(body_str.clone()))?,
ReqMethod::Put(bytes) => req_builder
.header(hyper::header::CONTENT_TYPE, "application/octet-stream")
.header(hyper::header::CONTENT_LENGTH, bytes.len())
.method(Method::PUT)
.body(axum::body::Body::from(bytes.clone()))?,
ReqMethod::Get => req_builder
.method(Method::GET)
.body(axum::body::Body::from(()))?,
ReqMethod::Delete => req_builder
.method(Method::DELETE)
.body(axum::body::Body::from(()))?,
};
let req = req_builder.body(json_body)?;

let resp = docker_reqs_sender.send_request(req).await?;

Expand All @@ -571,7 +609,10 @@ impl DockerClient {
if msg.message.starts_with("No such image") {
Err(DockerClientError::ImageNotFound)
} else {
Err(DockerClientError::DockerServerError(msg.message))
Err(DockerClientError::DockerServerError((
StatusCode::NOT_FOUND.into(),
msg.message,
)))
}
}
other => {
Expand All @@ -581,7 +622,7 @@ impl DockerClient {
Err(_) => String::from_utf8_lossy(&resp_bytes).to_string(),
};
logging::log!("ERROR: {other:?} - {msg}");
Err(DockerClientError::DockerServerError(msg))
Err(DockerClientError::DockerServerError((other.into(), msg)))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/node_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl NodeAction {
res
}
Self::Recycle => {
if !previous_status.is_active() {
if previous_status.is_transitioning() || info.read_untracked().peer_id.is_none() {
return;
}
info.update(|node| node.status = NodeStatus::Recycling);
Expand Down
Loading

0 comments on commit 0aa057d

Please sign in to comment.