Skip to content

Commit

Permalink
Update pings collector, remove logs collector
Browse files Browse the repository at this point in the history
  • Loading branch information
kalabukdima committed Oct 24, 2024
1 parent 8f5b3ab commit 9e51a93
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 665 deletions.
19 changes: 0 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

230 changes: 13 additions & 217 deletions crates/collector-utils/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::collections::HashMap;

use async_trait::async_trait;
use clickhouse::{Client, Row};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};

use sqd_messages::{query_executed, InputAndOutput, Ping, Query, QueryExecuted, SizeAndHash};
use sqd_messages::Heartbeat;

use crate::cli::ClickhouseArgs;
use crate::timestamp_now_ms;
Expand Down Expand Up @@ -65,18 +63,10 @@ lazy_static! {

#[async_trait]
pub trait Storage {
async fn store_logs<'a, T: Iterator<Item = QueryExecutedRow> + Sized + Send>(
&self,
query_logs: T,
) -> anyhow::Result<()>;

async fn store_pings<'a, T: Iterator<Item = PingRow> + Sized + Send>(
&self,
pings: T,
) -> anyhow::Result<()>;

/// Get the sequence number & timestamp of the last stored log for each worker
async fn get_last_stored(&self) -> anyhow::Result<HashMap<String, (u64, u64)>>;
}

pub struct ClickhouseStorage(Client);
Expand Down Expand Up @@ -116,102 +106,6 @@ pub struct QueryExecutedRow {
collector_timestamp: u64,
}

impl TryFrom<QueryExecuted> for QueryExecutedRow {
type Error = &'static str;

fn try_from(query_executed: QueryExecuted) -> Result<Self, Self::Error> {
let query = query_executed.query.ok_or("Query field missing")?;
let result = query_executed.result.ok_or("Result field missing")?;
let collector_timestamp = timestamp_now_ms();
let (result, num_read_chunks, output_size, output_hash, error_msg) = match result {
query_executed::Result::Ok(res) => {
let output = res.output.ok_or("Output field missing")?;
(
QueryResult::Ok,
res.num_read_chunks,
output.size,
output.sha3_256,
"".to_string(),
)
}
query_executed::Result::BadRequest(err_msg) => (
QueryResult::BadRequest,
Some(0u32),
Some(0u32),
vec![],
err_msg,
),
query_executed::Result::ServerError(err_msg) => (
QueryResult::ServerError,
Some(0u32),
Some(0u32),
vec![],
err_msg,
),
};
Ok(Self {
client_id: query_executed.client_id,
worker_id: query_executed.worker_id,
query_id: query.query_id.ok_or("query_id field missing")?,
dataset: query.dataset.ok_or("dataset field missing")?,
query: query.query.ok_or("query field missing")?,
profiling: query.profiling.ok_or("profiling field missing")?,
client_state_json: query.client_state_json.unwrap(),
query_hash: query_executed.query_hash,
exec_time_ms: query_executed
.exec_time_ms
.ok_or("exec_time field missing")?,
result,
num_read_chunks: num_read_chunks.ok_or("num_read_chunks field missing")?,
output_size: output_size.ok_or("output_size field missing")?,
output_hash,
error_msg,
seq_no: query_executed.seq_no.ok_or("seq_no field missing")?,
client_signature: query.signature,
worker_signature: query_executed.signature,
worker_timestamp: query_executed
.timestamp_ms
.ok_or("timestamp field missing")?,
collector_timestamp,
})
}
}

impl From<QueryExecutedRow> for QueryExecuted {
fn from(row: QueryExecutedRow) -> Self {
let result = match row.result {
QueryResult::Ok => query_executed::Result::Ok(InputAndOutput {
num_read_chunks: Some(row.num_read_chunks),
output: Some(SizeAndHash {
size: Some(row.output_size),
sha3_256: row.output_hash,
}),
}),
QueryResult::BadRequest => query_executed::Result::BadRequest(row.error_msg),
QueryResult::ServerError => query_executed::Result::ServerError(row.error_msg),
};
QueryExecuted {
client_id: row.client_id,
worker_id: row.worker_id,
query: Some(Query {
query_id: Some(row.query_id),
dataset: Some(row.dataset),
query: Some(row.query),
profiling: Some(row.profiling),
client_state_json: Some(row.client_state_json),
signature: row.client_signature,
block_range: None,
}),
query_hash: row.query_hash,
exec_time_ms: Some(row.exec_time_ms),
seq_no: Some(row.seq_no),
timestamp_ms: Some(row.worker_timestamp),
signature: row.worker_signature,
result: Some(result),
}
}
}

#[derive(Row, Debug, Clone, Serialize, Deserialize)]
pub struct PingRow {
timestamp: u64,
Expand All @@ -220,26 +114,17 @@ pub struct PingRow {
version: String,
}

impl TryFrom<Ping> for PingRow {
type Error = &'static str;

fn try_from(ping: Ping) -> Result<Self, Self::Error> {
impl PingRow {
pub fn new(heartbeat: Heartbeat, worker_id: String) -> Result<Self, &'static str> {
Ok(Self {
stored_bytes: ping.stored_bytes(),
worker_id: ping.worker_id.ok_or("worker_id missing")?,
version: ping.version.ok_or("version missing")?,
stored_bytes: heartbeat.stored_bytes(),
worker_id,
version: heartbeat.version,
timestamp: timestamp_now_ms(),
})
}
}

#[derive(Row, Debug, Deserialize)]
struct SeqNoRow {
worker_id: String,
seq_no: u64,
timestamp: u64,
}

impl ClickhouseStorage {
pub async fn new(args: ClickhouseArgs) -> anyhow::Result<Self> {
let client = Client::default()
Expand All @@ -255,20 +140,6 @@ impl ClickhouseStorage {

#[async_trait]
impl Storage for ClickhouseStorage {
async fn store_logs<'a, T: Iterator<Item = QueryExecutedRow> + Sized + Send>(
&self,
query_logs: T,
) -> anyhow::Result<()> {
log::debug!("Storing logs in clickhouse");
let mut insert = self.0.insert(&LOGS_TABLE)?;
for row in query_logs {
log::trace!("Storing query log {:?}", row);
insert.write(&row).await?;
}
insert.end().await?;
Ok(())
}

async fn store_pings<'a, T: Iterator<Item = PingRow> + Sized + Send>(
&self,
pings: T,
Expand All @@ -282,29 +153,10 @@ impl Storage for ClickhouseStorage {
insert.end().await?;
Ok(())
}

async fn get_last_stored(&self) -> anyhow::Result<HashMap<String, (u64, u64)>> {
log::debug!("Retrieving latest sequence from clickhouse");
let mut cursor = self
.0
.query(&format!(
"SELECT worker_id, MAX(seq_no), MAX(worker_timestamp) FROM {} GROUP BY worker_id",
&*LOGS_TABLE
))
.fetch::<SeqNoRow>()?;
let mut result = HashMap::new();
while let Some(row) = cursor.next().await? {
result.insert(row.worker_id, (row.seq_no, row.timestamp));
}
log::debug!("Retrieved sequence numbers: {:?}", result);
Ok(result)
}
}

#[cfg(test)]
mod tests {
use sqd_messages::signatures::SignedMessage;
use sqd_messages::{InputAndOutput, Query, SizeAndHash};
use sqd_network_transport::{Keypair, PeerId};

use super::*;
Expand Down Expand Up @@ -345,7 +197,7 @@ mod tests {
.await
.unwrap();

let client_keypair = Keypair::from_protobuf_encoding(&[
let _client_keypair = Keypair::from_protobuf_encoding(&[
8, 1, 18, 64, 246, 13, 52, 78, 165, 229, 195, 19, 180, 208, 225, 55, 240, 115, 159, 6,
9, 123, 239, 172, 245, 55, 141, 57, 41, 185, 78, 191, 141, 74, 8, 242, 152, 79, 38,
199, 39, 192, 209, 175, 147, 85, 150, 22, 192, 22, 89, 173, 61, 11, 207, 219, 48, 43,
Expand All @@ -360,75 +212,20 @@ mod tests {
])
.unwrap();

let client_id = PeerId::from_public_key(&client_keypair.public());
let worker_id = PeerId::from_public_key(&worker_keypair.public());

let mut query = Query {
query_id: Some("query_id".to_string()),
dataset: Some("dataset".to_string()),
query: Some("{\"from\": \"0xdeadbeef\"}".to_string()),
profiling: Some(false),
client_state_json: Some("".to_string()),
signature: vec![],
block_range: None
};
query.sign(&client_keypair);

let mut query_log = QueryExecuted {
client_id: client_id.to_string(),
worker_id: worker_id.to_string(),
query: Some(query),
query_hash: vec![0xde, 0xad, 0xbe, 0xef],
exec_time_ms: Some(2137),
seq_no: Some(69),
timestamp_ms: Some(123456789000),
signature: vec![],
result: Some(query_executed::Result::Ok(InputAndOutput {
num_read_chunks: Some(10),
output: Some(SizeAndHash {
size: Some(666),
sha3_256: vec![0xbe, 0xbe, 0xf0, 0x00],
}),
})),
};
query_log.sign(&worker_keypair);

storage
.store_logs(std::iter::once(query_log.clone().try_into().unwrap()))
.await
.unwrap();

// Verify the last stored sequence number and timestamp
let last_stored = storage.get_last_stored().await.unwrap();
assert_eq!(
last_stored.get(&worker_id.to_string()),
Some(&(69, 123456789000))
);

// Verify the signatures
let mut cursor = storage
.0
.query(&format!("SELECT * FROM {}", &*LOGS_TABLE))
.fetch::<QueryExecutedRow>()
.unwrap();
let row = cursor.next().await.unwrap().unwrap();
let mut saved_log: QueryExecuted = row.into();
assert_eq!(query_log, saved_log);
assert!(saved_log.verify_signature(&worker_id));

// Check pings storing
let ping = Ping {
worker_id: Some("worker_id".to_string()),
version: Some("1.0.0".to_string()),
let ping = Heartbeat {
version: "1.0.0".to_string(),
stored_bytes: Some(1024),
stored_ranges: vec![],
signature: vec![],
assignment_id: Default::default(),
missing_chunks: Default::default(),
};
let ts = timestamp_now_ms();
storage
.store_pings(std::iter::once(ping.clone().try_into().unwrap()))
.store_pings(std::iter::once(
PingRow::new(ping.clone(), worker_id.to_string()).unwrap(),
))
.await
.unwrap();

Expand All @@ -438,8 +235,7 @@ mod tests {
.fetch::<PingRow>()
.unwrap();
let row = cursor.next().await.unwrap().unwrap();
assert_eq!(ping.worker_id.unwrap(), row.worker_id);
assert_eq!(ping.version.unwrap(), row.version);
assert_eq!(ping.version, row.version);
assert_eq!(ping.stored_bytes.unwrap(), row.stored_bytes);
assert!(row.timestamp >= ts);
assert!(row.timestamp <= timestamp_now_ms());
Expand Down
23 changes: 0 additions & 23 deletions crates/logs-collector/Cargo.toml

This file was deleted.

15 changes: 0 additions & 15 deletions crates/logs-collector/healthcheck.sh

This file was deleted.

Loading

0 comments on commit 9e51a93

Please sign in to comment.