diff --git a/crates/hyperqueue/src/client/commands/journal/output.rs b/crates/hyperqueue/src/client/commands/journal/output.rs index e3be5cab9..1a7ee1c5e 100644 --- a/crates/hyperqueue/src/client/commands/journal/output.rs +++ b/crates/hyperqueue/src/client/commands/journal/output.rs @@ -1,8 +1,7 @@ use crate::client::output::json::format_datetime; use crate::server::event::payload::EventPayload; -use crate::server::event::{bincode_config, Event}; -use crate::transfer::messages::JobDescription; -use bincode::Options; +use crate::server::event::Event; +use crate::transfer::messages::{JobDescription, SubmitRequest}; use serde_json::json; use tako::worker::WorkerOverview; @@ -103,14 +102,12 @@ fn format_payload(event: EventPayload) -> serde_json::Value { closed_job, serialized_desc, } => { - let job_desc: JobDescription = bincode_config() - .deserialize(&serialized_desc) - .expect("Invalid job description data"); + let submit: SubmitRequest = serialized_desc.deserialize().expect("Invalid submit data"); json!({ "type": "job-created", "job": job_id, "closed_job": closed_job, - "desc": JobInfoFormatter(&job_desc).to_json(), + "desc": JobInfoFormatter(&submit.job_desc).to_json(), }) } EventPayload::JobCompleted(job_id) => json!({ diff --git a/crates/hyperqueue/src/common/mod.rs b/crates/hyperqueue/src/common/mod.rs index e3eb5fe0e..95d47186a 100644 --- a/crates/hyperqueue/src/common/mod.rs +++ b/crates/hyperqueue/src/common/mod.rs @@ -10,6 +10,7 @@ pub mod parser; pub mod parser2; pub mod placeholders; pub mod rpc; +pub mod serialization; pub mod serverdir; pub mod setup; pub mod utils; diff --git a/crates/hyperqueue/src/common/serialization.rs b/crates/hyperqueue/src/common/serialization.rs new file mode 100644 index 000000000..afc18b5ea --- /dev/null +++ b/crates/hyperqueue/src/common/serialization.rs @@ -0,0 +1,70 @@ +use bincode::Options; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use std::fmt::{Debug, Formatter}; +use std::marker::PhantomData; + +/// Helper trait to configure serialization options via separate types. +pub trait SerializationConfig { + fn config() -> impl Options; +} + +pub struct DefaultConfig; + +impl SerializationConfig for DefaultConfig { + fn config() -> impl Options { + bincode::DefaultOptions::new() + } +} + +pub struct TrailingAllowedConfig; + +impl SerializationConfig for TrailingAllowedConfig { + fn config() -> impl Options { + bincode::DefaultOptions::new().allow_trailing_bytes() + } +} + +/// Strongly typed wrapper over `` serialized with Bincode. +#[derive(Serialize, Deserialize)] +pub struct Serialized { + #[serde(with = "serde_bytes")] + data: Box<[u8]>, + _phantom: PhantomData<(T, C)>, +} + +impl Debug for Serialized { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Serialized {} ({}) byte(s)", + std::any::type_name::(), + self.data.len() + ) + } +} + +impl Clone for Serialized { + fn clone(&self) -> Self { + Self { + data: self.data.clone(), + _phantom: PhantomData, + } + } +} + +impl Serialized { + pub fn new(value: &T) -> bincode::Result { + let result = C::config().serialize(value)?; + // Check that we're not reallocating needlessly in `into_boxed_slice` + debug_assert_eq!(result.capacity(), result.len()); + Ok(Self { + data: result.into_boxed_slice(), + _phantom: Default::default(), + }) + } + + pub fn deserialize(&self) -> bincode::Result { + C::config().deserialize(&self.data) + } +} diff --git a/crates/hyperqueue/src/server/event/log/read.rs b/crates/hyperqueue/src/server/event/log/read.rs index 4283e71cd..225d80a1f 100644 --- a/crates/hyperqueue/src/server/event/log/read.rs +++ b/crates/hyperqueue/src/server/event/log/read.rs @@ -1,5 +1,6 @@ +use crate::common::serialization::SerializationConfig; use crate::server::event::log::HQ_JOURNAL_HEADER; -use crate::server::event::{bincode_config, Event}; +use crate::server::event::{Event, EventSerializationConfig}; use crate::HQ_VERSION; use anyhow::{anyhow, bail}; use bincode::Options; @@ -30,7 +31,7 @@ impl JournalReader { if header != HQ_JOURNAL_HEADER { bail!("Invalid journal format"); } - let hq_version: String = bincode_config() + let hq_version: String = EventSerializationConfig::config() .deserialize_from(&mut file) .map_err(|error| anyhow!("Cannot load HQ event log file header: {error:?}"))?; if hq_version != HQ_VERSION { @@ -61,7 +62,7 @@ impl Iterator for &mut JournalReader { if self.position == self.size { return None; } - match bincode_config().deserialize_from(&mut self.source) { + match EventSerializationConfig::config().deserialize_from(&mut self.source) { Ok(event) => Some(Ok(event)), Err(error) => match error.deref() { bincode::ErrorKind::Io(e) diff --git a/crates/hyperqueue/src/server/event/log/write.rs b/crates/hyperqueue/src/server/event/log/write.rs index 13833fb7d..25c4b6367 100644 --- a/crates/hyperqueue/src/server/event/log/write.rs +++ b/crates/hyperqueue/src/server/event/log/write.rs @@ -1,5 +1,6 @@ +use crate::common::serialization::SerializationConfig; use crate::server::event::log::HQ_JOURNAL_HEADER; -use crate::server::event::{bincode_config, Event}; +use crate::server::event::{Event, EventSerializationConfig}; use crate::HQ_VERSION; use bincode::Options; use std::fs::{File, OpenOptions}; @@ -31,7 +32,7 @@ impl JournalWriter { if position == 0 && file.stream_position()? == 0 { file.write_all(HQ_JOURNAL_HEADER)?; - bincode_config().serialize_into(&mut file, HQ_VERSION)?; + EventSerializationConfig::config().serialize_into(&mut file, HQ_VERSION)?; file.flush()?; }; @@ -39,7 +40,7 @@ impl JournalWriter { } pub fn store(&mut self, event: Event) -> anyhow::Result<()> { - bincode_config().serialize_into(&mut self.file, &event)?; + EventSerializationConfig::config().serialize_into(&mut self.file, &event)?; Ok(()) } diff --git a/crates/hyperqueue/src/server/event/mod.rs b/crates/hyperqueue/src/server/event/mod.rs index 5dc5a7fc9..9c5bd397f 100644 --- a/crates/hyperqueue/src/server/event/mod.rs +++ b/crates/hyperqueue/src/server/event/mod.rs @@ -2,7 +2,7 @@ pub mod log; pub mod payload; pub mod streamer; -use bincode::Options; +use crate::stream::StreamSerializationConfig; use chrono::serde::ts_milliseconds; use chrono::{DateTime, Utc}; use payload::EventPayload; @@ -10,14 +10,11 @@ use serde::{Deserialize, Serialize}; pub type EventId = u32; +type EventSerializationConfig = StreamSerializationConfig; + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Event { #[serde(with = "ts_milliseconds")] pub time: DateTime, pub payload: EventPayload, } - -#[inline] -pub(crate) fn bincode_config() -> impl Options { - bincode::DefaultOptions::new().allow_trailing_bytes() -} diff --git a/crates/hyperqueue/src/server/event/payload.rs b/crates/hyperqueue/src/server/event/payload.rs index 2d3a15965..bb7a5e828 100644 --- a/crates/hyperqueue/src/server/event/payload.rs +++ b/crates/hyperqueue/src/server/event/payload.rs @@ -1,6 +1,7 @@ +use crate::common::serialization::Serialized; use crate::server::autoalloc::AllocationId; use crate::server::autoalloc::QueueId; -use crate::transfer::messages::{AllocationQueueParams, JobDescription}; +use crate::transfer::messages::{AllocationQueueParams, JobDescription, SubmitRequest}; use crate::JobId; use crate::{JobTaskId, WorkerId}; use serde::{Deserialize, Serialize}; @@ -19,13 +20,13 @@ pub enum EventPayload { WorkerOverviewReceived(WorkerOverview), /// A Job was submitted by the user -- full information to reconstruct the job; /// it will be only stored into file, not held in memory - /// Vec is serialized JobDescription; the main reason is avoiding duplication of JobDescription + /// Vec is serialized SubmitRequest; the main reason is avoiding duplication of SubmitRequest /// (we serialize it before it is stripped down) /// and a nice side effect is that Events can be deserialized without deserializing a potentially large submit data Submit { job_id: JobId, closed_job: bool, - serialized_desc: Vec, + serialized_desc: Serialized, }, /// All tasks of the job have finished. JobCompleted(JobId), @@ -43,7 +44,7 @@ pub enum EventPayload { job_id: JobId, task_id: JobTaskId, }, - // Task that failed to execute + /// Task has failed to execute TaskFailed { job_id: JobId, task_id: JobTaskId, diff --git a/crates/hyperqueue/src/server/event/streamer.rs b/crates/hyperqueue/src/server/event/streamer.rs index 66d9c5c6d..c06cc4e6a 100644 --- a/crates/hyperqueue/src/server/event/streamer.rs +++ b/crates/hyperqueue/src/server/event/streamer.rs @@ -1,10 +1,10 @@ +use crate::common::serialization::Serialized; use crate::server::autoalloc::{AllocationId, QueueId}; use crate::server::event::log::{EventStreamMessage, EventStreamSender}; use crate::server::event::payload::EventPayload; -use crate::server::event::{bincode_config, Event}; +use crate::server::event::Event; use crate::transfer::messages::{AllocationQueueParams, JobDescription, SubmitRequest}; use crate::{JobId, JobTaskId, WorkerId}; -use bincode::Options; use chrono::Utc; use smallvec::SmallVec; use tako::gateway::LostWorkerReason; @@ -64,7 +64,7 @@ impl EventStreamer { self.send_event(EventPayload::Submit { job_id, closed_job: submit_request.job_id.is_none(), - serialized_desc: bincode_config().serialize(submit_request)?, + serialized_desc: Serialized::new(submit_request)?, }); Ok(()) } diff --git a/crates/hyperqueue/src/server/restore.rs b/crates/hyperqueue/src/server/restore.rs index 38790fa6e..4325640e0 100644 --- a/crates/hyperqueue/src/server/restore.rs +++ b/crates/hyperqueue/src/server/restore.rs @@ -1,6 +1,5 @@ use crate::server::autoalloc::QueueId; use crate::server::client::submit_job_desc; -use crate::server::event::bincode_config; use crate::server::event::log::JournalReader; use crate::server::event::payload::EventPayload; use crate::server::job::{Job, JobTaskState, StartedTaskData}; @@ -10,7 +9,6 @@ use crate::transfer::messages::{ }; use crate::worker::start::RunningTaskContext; use crate::{JobId, JobTaskId, Map}; -use bincode::Options; use std::path::Path; use tako::gateway::NewTasksMessage; use tako::{ItemId, WorkerId}; @@ -177,8 +175,7 @@ impl StateRestorer { serialized_desc, } => { log::debug!("Replaying: JobTasksCreated {job_id}"); - let submit_request: SubmitRequest = - bincode_config().deserialize(&serialized_desc)?; + let submit_request: SubmitRequest = serialized_desc.deserialize()?; if closed_job { let mut job = RestorerJob::new(submit_request.job_desc, false); job.add_submit(submit_request.submit_desc); diff --git a/crates/hyperqueue/src/stream/mod.rs b/crates/hyperqueue/src/stream/mod.rs index 1077754f1..4f570c054 100644 --- a/crates/hyperqueue/src/stream/mod.rs +++ b/crates/hyperqueue/src/stream/mod.rs @@ -1 +1,5 @@ +use crate::common::serialization::TrailingAllowedConfig; + pub mod reader; + +pub type StreamSerializationConfig = TrailingAllowedConfig; diff --git a/crates/hyperqueue/src/stream/reader/outputlog.rs b/crates/hyperqueue/src/stream/reader/outputlog.rs index ef2ccd943..e8a313f1d 100644 --- a/crates/hyperqueue/src/stream/reader/outputlog.rs +++ b/crates/hyperqueue/src/stream/reader/outputlog.rs @@ -1,7 +1,8 @@ use crate::client::commands::outputlog::{CatOpts, Channel, ExportOpts, ShowOpts}; use crate::common::arraydef::IntArray; use crate::common::error::HqError; -use crate::server::event::bincode_config; +use crate::common::serialization::SerializationConfig; +use crate::stream::StreamSerializationConfig; use crate::transfer::stream::{ChannelId, StreamChunkHeader}; use crate::worker::streamer::{StreamFileHeader, STREAM_FILE_HEADER, STREAM_FILE_SUFFIX}; use crate::{JobId, JobTaskId, Set}; @@ -156,7 +157,7 @@ impl OutputLog { } fn read_chunk(file: &mut BufReader) -> crate::Result> { - match bincode_config().deserialize_from(file) { + match StreamSerializationConfig::config().deserialize_from(file) { Ok(event) => Ok(Some(event)), Err(error) => match error.deref() { bincode::ErrorKind::Io(e) @@ -217,7 +218,7 @@ impl OutputLog { if header != STREAM_FILE_HEADER { anyhow::bail!("Invalid file format"); } - Ok(bincode_config().deserialize_from(file)?) + Ok(StreamSerializationConfig::config().deserialize_from(file)?) } pub fn summary(&self) -> Summary { diff --git a/crates/hyperqueue/src/worker/streamer.rs b/crates/hyperqueue/src/worker/streamer.rs index d27d0470f..1b7f343fb 100644 --- a/crates/hyperqueue/src/worker/streamer.rs +++ b/crates/hyperqueue/src/worker/streamer.rs @@ -1,5 +1,6 @@ use crate::common::error::HqError; -use crate::server::event::bincode_config; +use crate::common::serialization::SerializationConfig; +use crate::stream::StreamSerializationConfig; use crate::transfer::stream::{ChannelId, StreamChunkHeader}; use crate::WrappedRcRefCell; use crate::{JobId, JobTaskId, Map}; @@ -186,7 +187,7 @@ async fn stream_writer( server_uid: Cow::Borrowed(&streamer.server_uid), worker_id: streamer.worker_id, }; - bincode_config().serialize_into(&mut buffer, &header)?; + StreamSerializationConfig::config().serialize_into(&mut buffer, &header)?; }; file.write_all(&buffer).await?; while let Some(message) = receiver.recv().await { @@ -194,7 +195,7 @@ async fn stream_writer( StreamerMessage::Write { header, data } => { log::debug!("Waiting data chunk into stream file"); buffer.clear(); - bincode_config().serialize_into(&mut buffer, &header)?; + StreamSerializationConfig::config().serialize_into(&mut buffer, &header)?; file.write_all(&buffer).await?; if !data.is_empty() { file.write_all(&data).await?