From 7483a899f98f12b17a0bab19bac9c9380575bb0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 21 Oct 2024 09:04:16 +0200 Subject: [PATCH 1/5] Add Serialized wrapper to make it harder to accidentally deserialize the wrong type --- .../src/client/commands/journal/output.rs | 11 +++---- crates/hyperqueue/src/server/event/mod.rs | 32 +++++++++++++++++++ crates/hyperqueue/src/server/event/payload.rs | 9 +++--- .../hyperqueue/src/server/event/streamer.rs | 5 ++- crates/hyperqueue/src/server/restore.rs | 5 +-- 5 files changed, 44 insertions(+), 18 deletions(-) diff --git a/crates/hyperqueue/src/client/commands/journal/output.rs b/crates/hyperqueue/src/client/commands/journal/output.rs index e3be5cab9..1a7ee1c5e 100644 --- a/crates/hyperqueue/src/client/commands/journal/output.rs +++ b/crates/hyperqueue/src/client/commands/journal/output.rs @@ -1,8 +1,7 @@ use crate::client::output::json::format_datetime; use crate::server::event::payload::EventPayload; -use crate::server::event::{bincode_config, Event}; -use crate::transfer::messages::JobDescription; -use bincode::Options; +use crate::server::event::Event; +use crate::transfer::messages::{JobDescription, SubmitRequest}; use serde_json::json; use tako::worker::WorkerOverview; @@ -103,14 +102,12 @@ fn format_payload(event: EventPayload) -> serde_json::Value { closed_job, serialized_desc, } => { - let job_desc: JobDescription = bincode_config() - .deserialize(&serialized_desc) - .expect("Invalid job description data"); + let submit: SubmitRequest = serialized_desc.deserialize().expect("Invalid submit data"); json!({ "type": "job-created", "job": job_id, "closed_job": closed_job, - "desc": JobInfoFormatter(&job_desc).to_json(), + "desc": JobInfoFormatter(&submit.job_desc).to_json(), }) } EventPayload::JobCompleted(job_id) => json!({ diff --git a/crates/hyperqueue/src/server/event/mod.rs b/crates/hyperqueue/src/server/event/mod.rs index 5dc5a7fc9..82f449d71 100644 --- a/crates/hyperqueue/src/server/event/mod.rs +++ b/crates/hyperqueue/src/server/event/mod.rs @@ -6,7 +6,9 @@ use bincode::Options; use chrono::serde::ts_milliseconds; use chrono::{DateTime, Utc}; use payload::EventPayload; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use std::marker::PhantomData; pub type EventId = u32; @@ -21,3 +23,33 @@ pub struct Event { pub(crate) fn bincode_config() -> impl Options { bincode::DefaultOptions::new().allow_trailing_bytes() } + +/// Strongly typed wrapper over serialized with Bincode. +#[derive(Serialize, Deserialize, Debug)] +pub struct Serialized { + #[serde(with = "serde_bytes")] + data: Vec, + _phantom: PhantomData, +} + +impl Clone for Serialized { + fn clone(&self) -> Self { + Self { + data: self.data.clone(), + _phantom: PhantomData, + } + } +} + +impl Serialized { + pub fn new(value: &T) -> bincode::Result { + Ok(Self { + data: bincode_config().serialize(value)?, + _phantom: Default::default(), + }) + } + + pub fn deserialize(&self) -> bincode::Result { + bincode_config().deserialize(&self.data) + } +} diff --git a/crates/hyperqueue/src/server/event/payload.rs b/crates/hyperqueue/src/server/event/payload.rs index 2d3a15965..3fd3811e9 100644 --- a/crates/hyperqueue/src/server/event/payload.rs +++ b/crates/hyperqueue/src/server/event/payload.rs @@ -1,6 +1,7 @@ use crate::server::autoalloc::AllocationId; use crate::server::autoalloc::QueueId; -use crate::transfer::messages::{AllocationQueueParams, JobDescription}; +use crate::server::event::Serialized; +use crate::transfer::messages::{AllocationQueueParams, JobDescription, SubmitRequest}; use crate::JobId; use crate::{JobTaskId, WorkerId}; use serde::{Deserialize, Serialize}; @@ -19,13 +20,13 @@ pub enum EventPayload { WorkerOverviewReceived(WorkerOverview), /// A Job was submitted by the user -- full information to reconstruct the job; /// it will be only stored into file, not held in memory - /// Vec is serialized JobDescription; the main reason is avoiding duplication of JobDescription + /// Vec is serialized SubmitRequest; the main reason is avoiding duplication of SubmitRequest /// (we serialize it before it is stripped down) /// and a nice side effect is that Events can be deserialized without deserializing a potentially large submit data Submit { job_id: JobId, closed_job: bool, - serialized_desc: Vec, + serialized_desc: Serialized, }, /// All tasks of the job have finished. JobCompleted(JobId), @@ -43,7 +44,7 @@ pub enum EventPayload { job_id: JobId, task_id: JobTaskId, }, - // Task that failed to execute + /// Task has failed to execute TaskFailed { job_id: JobId, task_id: JobTaskId, diff --git a/crates/hyperqueue/src/server/event/streamer.rs b/crates/hyperqueue/src/server/event/streamer.rs index 66d9c5c6d..245bc940f 100644 --- a/crates/hyperqueue/src/server/event/streamer.rs +++ b/crates/hyperqueue/src/server/event/streamer.rs @@ -1,10 +1,9 @@ use crate::server::autoalloc::{AllocationId, QueueId}; use crate::server::event::log::{EventStreamMessage, EventStreamSender}; use crate::server::event::payload::EventPayload; -use crate::server::event::{bincode_config, Event}; +use crate::server::event::{Event, Serialized}; use crate::transfer::messages::{AllocationQueueParams, JobDescription, SubmitRequest}; use crate::{JobId, JobTaskId, WorkerId}; -use bincode::Options; use chrono::Utc; use smallvec::SmallVec; use tako::gateway::LostWorkerReason; @@ -64,7 +63,7 @@ impl EventStreamer { self.send_event(EventPayload::Submit { job_id, closed_job: submit_request.job_id.is_none(), - serialized_desc: bincode_config().serialize(submit_request)?, + serialized_desc: Serialized::new(submit_request)?, }); Ok(()) } diff --git a/crates/hyperqueue/src/server/restore.rs b/crates/hyperqueue/src/server/restore.rs index 38790fa6e..4325640e0 100644 --- a/crates/hyperqueue/src/server/restore.rs +++ b/crates/hyperqueue/src/server/restore.rs @@ -1,6 +1,5 @@ use crate::server::autoalloc::QueueId; use crate::server::client::submit_job_desc; -use crate::server::event::bincode_config; use crate::server::event::log::JournalReader; use crate::server::event::payload::EventPayload; use crate::server::job::{Job, JobTaskState, StartedTaskData}; @@ -10,7 +9,6 @@ use crate::transfer::messages::{ }; use crate::worker::start::RunningTaskContext; use crate::{JobId, JobTaskId, Map}; -use bincode::Options; use std::path::Path; use tako::gateway::NewTasksMessage; use tako::{ItemId, WorkerId}; @@ -177,8 +175,7 @@ impl StateRestorer { serialized_desc, } => { log::debug!("Replaying: JobTasksCreated {job_id}"); - let submit_request: SubmitRequest = - bincode_config().deserialize(&serialized_desc)?; + let submit_request: SubmitRequest = serialized_desc.deserialize()?; if closed_job { let mut job = RestorerJob::new(submit_request.job_desc, false); job.add_submit(submit_request.submit_desc); From 4f1b1253a08618311fa8ac1ea0ec259214fed4a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 21 Oct 2024 09:06:25 +0200 Subject: [PATCH 2/5] Move `Serialized` and `bincode_config` a separate module --- crates/hyperqueue/src/common/mod.rs | 1 + crates/hyperqueue/src/common/serialization.rs | 39 +++++++++++++++++++ .../hyperqueue/src/server/event/log/read.rs | 3 +- .../hyperqueue/src/server/event/log/write.rs | 3 +- crates/hyperqueue/src/server/event/mod.rs | 38 ------------------ crates/hyperqueue/src/server/event/payload.rs | 2 +- .../hyperqueue/src/server/event/streamer.rs | 3 +- .../hyperqueue/src/stream/reader/outputlog.rs | 2 +- crates/hyperqueue/src/worker/streamer.rs | 2 +- 9 files changed, 49 insertions(+), 44 deletions(-) create mode 100644 crates/hyperqueue/src/common/serialization.rs diff --git a/crates/hyperqueue/src/common/mod.rs b/crates/hyperqueue/src/common/mod.rs index e3eb5fe0e..95d47186a 100644 --- a/crates/hyperqueue/src/common/mod.rs +++ b/crates/hyperqueue/src/common/mod.rs @@ -10,6 +10,7 @@ pub mod parser; pub mod parser2; pub mod placeholders; pub mod rpc; +pub mod serialization; pub mod serverdir; pub mod setup; pub mod utils; diff --git a/crates/hyperqueue/src/common/serialization.rs b/crates/hyperqueue/src/common/serialization.rs new file mode 100644 index 000000000..9d0d31dd4 --- /dev/null +++ b/crates/hyperqueue/src/common/serialization.rs @@ -0,0 +1,39 @@ +use bincode::Options; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use std::marker::PhantomData; + +#[inline] +pub(crate) fn bincode_config() -> impl Options { + bincode::DefaultOptions::new().allow_trailing_bytes() +} + +/// Strongly typed wrapper over serialized with Bincode. +#[derive(Serialize, Deserialize, Debug)] +pub struct Serialized { + #[serde(with = "serde_bytes")] + data: Vec, + _phantom: PhantomData, +} + +impl Clone for Serialized { + fn clone(&self) -> Self { + Self { + data: self.data.clone(), + _phantom: PhantomData, + } + } +} + +impl Serialized { + pub fn new(value: &T) -> bincode::Result { + Ok(Self { + data: bincode_config().serialize(value)?, + _phantom: Default::default(), + }) + } + + pub fn deserialize(&self) -> bincode::Result { + bincode_config().deserialize(&self.data) + } +} diff --git a/crates/hyperqueue/src/server/event/log/read.rs b/crates/hyperqueue/src/server/event/log/read.rs index 4283e71cd..89d91064f 100644 --- a/crates/hyperqueue/src/server/event/log/read.rs +++ b/crates/hyperqueue/src/server/event/log/read.rs @@ -1,5 +1,6 @@ +use crate::common::serialization::bincode_config; use crate::server::event::log::HQ_JOURNAL_HEADER; -use crate::server::event::{bincode_config, Event}; +use crate::server::event::Event; use crate::HQ_VERSION; use anyhow::{anyhow, bail}; use bincode::Options; diff --git a/crates/hyperqueue/src/server/event/log/write.rs b/crates/hyperqueue/src/server/event/log/write.rs index 13833fb7d..c8fe8ea92 100644 --- a/crates/hyperqueue/src/server/event/log/write.rs +++ b/crates/hyperqueue/src/server/event/log/write.rs @@ -1,5 +1,6 @@ +use crate::common::serialization::bincode_config; use crate::server::event::log::HQ_JOURNAL_HEADER; -use crate::server::event::{bincode_config, Event}; +use crate::server::event::Event; use crate::HQ_VERSION; use bincode::Options; use std::fs::{File, OpenOptions}; diff --git a/crates/hyperqueue/src/server/event/mod.rs b/crates/hyperqueue/src/server/event/mod.rs index 82f449d71..987e45b4b 100644 --- a/crates/hyperqueue/src/server/event/mod.rs +++ b/crates/hyperqueue/src/server/event/mod.rs @@ -2,13 +2,10 @@ pub mod log; pub mod payload; pub mod streamer; -use bincode::Options; use chrono::serde::ts_milliseconds; use chrono::{DateTime, Utc}; use payload::EventPayload; -use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use std::marker::PhantomData; pub type EventId = u32; @@ -18,38 +15,3 @@ pub struct Event { pub time: DateTime, pub payload: EventPayload, } - -#[inline] -pub(crate) fn bincode_config() -> impl Options { - bincode::DefaultOptions::new().allow_trailing_bytes() -} - -/// Strongly typed wrapper over serialized with Bincode. -#[derive(Serialize, Deserialize, Debug)] -pub struct Serialized { - #[serde(with = "serde_bytes")] - data: Vec, - _phantom: PhantomData, -} - -impl Clone for Serialized { - fn clone(&self) -> Self { - Self { - data: self.data.clone(), - _phantom: PhantomData, - } - } -} - -impl Serialized { - pub fn new(value: &T) -> bincode::Result { - Ok(Self { - data: bincode_config().serialize(value)?, - _phantom: Default::default(), - }) - } - - pub fn deserialize(&self) -> bincode::Result { - bincode_config().deserialize(&self.data) - } -} diff --git a/crates/hyperqueue/src/server/event/payload.rs b/crates/hyperqueue/src/server/event/payload.rs index 3fd3811e9..bb7a5e828 100644 --- a/crates/hyperqueue/src/server/event/payload.rs +++ b/crates/hyperqueue/src/server/event/payload.rs @@ -1,6 +1,6 @@ +use crate::common::serialization::Serialized; use crate::server::autoalloc::AllocationId; use crate::server::autoalloc::QueueId; -use crate::server::event::Serialized; use crate::transfer::messages::{AllocationQueueParams, JobDescription, SubmitRequest}; use crate::JobId; use crate::{JobTaskId, WorkerId}; diff --git a/crates/hyperqueue/src/server/event/streamer.rs b/crates/hyperqueue/src/server/event/streamer.rs index 245bc940f..c06cc4e6a 100644 --- a/crates/hyperqueue/src/server/event/streamer.rs +++ b/crates/hyperqueue/src/server/event/streamer.rs @@ -1,7 +1,8 @@ +use crate::common::serialization::Serialized; use crate::server::autoalloc::{AllocationId, QueueId}; use crate::server::event::log::{EventStreamMessage, EventStreamSender}; use crate::server::event::payload::EventPayload; -use crate::server::event::{Event, Serialized}; +use crate::server::event::Event; use crate::transfer::messages::{AllocationQueueParams, JobDescription, SubmitRequest}; use crate::{JobId, JobTaskId, WorkerId}; use chrono::Utc; diff --git a/crates/hyperqueue/src/stream/reader/outputlog.rs b/crates/hyperqueue/src/stream/reader/outputlog.rs index ef2ccd943..1b7ff6f11 100644 --- a/crates/hyperqueue/src/stream/reader/outputlog.rs +++ b/crates/hyperqueue/src/stream/reader/outputlog.rs @@ -1,7 +1,7 @@ use crate::client::commands::outputlog::{CatOpts, Channel, ExportOpts, ShowOpts}; use crate::common::arraydef::IntArray; use crate::common::error::HqError; -use crate::server::event::bincode_config; +use crate::common::serialization::bincode_config; use crate::transfer::stream::{ChannelId, StreamChunkHeader}; use crate::worker::streamer::{StreamFileHeader, STREAM_FILE_HEADER, STREAM_FILE_SUFFIX}; use crate::{JobId, JobTaskId, Set}; diff --git a/crates/hyperqueue/src/worker/streamer.rs b/crates/hyperqueue/src/worker/streamer.rs index d27d0470f..39e97aaa8 100644 --- a/crates/hyperqueue/src/worker/streamer.rs +++ b/crates/hyperqueue/src/worker/streamer.rs @@ -1,5 +1,5 @@ use crate::common::error::HqError; -use crate::server::event::bincode_config; +use crate::common::serialization::bincode_config; use crate::transfer::stream::{ChannelId, StreamChunkHeader}; use crate::WrappedRcRefCell; use crate::{JobId, JobTaskId, Map}; From be7faa516a6e5e10392069b5752075e6c39f39c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 21 Oct 2024 09:16:13 +0200 Subject: [PATCH 3/5] Add serialization config --- crates/hyperqueue/src/common/serialization.rs | 51 +++++++++++++++---- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/crates/hyperqueue/src/common/serialization.rs b/crates/hyperqueue/src/common/serialization.rs index 9d0d31dd4..a06e2fe9f 100644 --- a/crates/hyperqueue/src/common/serialization.rs +++ b/crates/hyperqueue/src/common/serialization.rs @@ -1,22 +1,55 @@ use bincode::Options; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use std::fmt::{Debug, Formatter}; use std::marker::PhantomData; #[inline] -pub(crate) fn bincode_config() -> impl Options { +pub fn bincode_config() -> impl Options { bincode::DefaultOptions::new().allow_trailing_bytes() } -/// Strongly typed wrapper over serialized with Bincode. -#[derive(Serialize, Deserialize, Debug)] -pub struct Serialized { +/// Helper trait to configure serialization options via separate types. +pub trait SerializationConfig { + fn config() -> impl Options; +} + +pub struct DefaultConfig; + +impl SerializationConfig for DefaultConfig { + fn config() -> impl Options { + bincode::DefaultOptions::new() + } +} + +pub struct TrailingAllowedConfig; + +impl SerializationConfig for TrailingAllowedConfig { + fn config() -> impl Options { + bincode::DefaultOptions::new().allow_trailing_bytes() + } +} + +/// Strongly typed wrapper over `` serialized with Bincode. +#[derive(Serialize, Deserialize)] +pub struct Serialized { #[serde(with = "serde_bytes")] data: Vec, - _phantom: PhantomData, + _phantom: PhantomData<(T, C)>, +} + +impl Debug for Serialized { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Serialized {} ({}) byte(s)", + std::any::type_name::(), + self.data.len() + ) + } } -impl Clone for Serialized { +impl Clone for Serialized { fn clone(&self) -> Self { Self { data: self.data.clone(), @@ -25,15 +58,15 @@ impl Clone for Serialized { } } -impl Serialized { +impl Serialized { pub fn new(value: &T) -> bincode::Result { Ok(Self { - data: bincode_config().serialize(value)?, + data: C::config().serialize(value)?, _phantom: Default::default(), }) } pub fn deserialize(&self) -> bincode::Result { - bincode_config().deserialize(&self.data) + C::config().deserialize(&self.data) } } From fefac7d542d9f293318dde6bd7fc4b1dd9338e62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 21 Oct 2024 09:22:09 +0200 Subject: [PATCH 4/5] Remove usages of the `bincode_config` function --- crates/hyperqueue/src/common/serialization.rs | 5 ----- crates/hyperqueue/src/server/event/log/read.rs | 8 ++++---- crates/hyperqueue/src/server/event/log/write.rs | 8 ++++---- crates/hyperqueue/src/server/event/mod.rs | 3 +++ crates/hyperqueue/src/stream/mod.rs | 4 ++++ crates/hyperqueue/src/stream/reader/outputlog.rs | 7 ++++--- crates/hyperqueue/src/worker/streamer.rs | 7 ++++--- 7 files changed, 23 insertions(+), 19 deletions(-) diff --git a/crates/hyperqueue/src/common/serialization.rs b/crates/hyperqueue/src/common/serialization.rs index a06e2fe9f..dc4ae881b 100644 --- a/crates/hyperqueue/src/common/serialization.rs +++ b/crates/hyperqueue/src/common/serialization.rs @@ -4,11 +4,6 @@ use serde::{Deserialize, Serialize}; use std::fmt::{Debug, Formatter}; use std::marker::PhantomData; -#[inline] -pub fn bincode_config() -> impl Options { - bincode::DefaultOptions::new().allow_trailing_bytes() -} - /// Helper trait to configure serialization options via separate types. pub trait SerializationConfig { fn config() -> impl Options; diff --git a/crates/hyperqueue/src/server/event/log/read.rs b/crates/hyperqueue/src/server/event/log/read.rs index 89d91064f..225d80a1f 100644 --- a/crates/hyperqueue/src/server/event/log/read.rs +++ b/crates/hyperqueue/src/server/event/log/read.rs @@ -1,6 +1,6 @@ -use crate::common::serialization::bincode_config; +use crate::common::serialization::SerializationConfig; use crate::server::event::log::HQ_JOURNAL_HEADER; -use crate::server::event::Event; +use crate::server::event::{Event, EventSerializationConfig}; use crate::HQ_VERSION; use anyhow::{anyhow, bail}; use bincode::Options; @@ -31,7 +31,7 @@ impl JournalReader { if header != HQ_JOURNAL_HEADER { bail!("Invalid journal format"); } - let hq_version: String = bincode_config() + let hq_version: String = EventSerializationConfig::config() .deserialize_from(&mut file) .map_err(|error| anyhow!("Cannot load HQ event log file header: {error:?}"))?; if hq_version != HQ_VERSION { @@ -62,7 +62,7 @@ impl Iterator for &mut JournalReader { if self.position == self.size { return None; } - match bincode_config().deserialize_from(&mut self.source) { + match EventSerializationConfig::config().deserialize_from(&mut self.source) { Ok(event) => Some(Ok(event)), Err(error) => match error.deref() { bincode::ErrorKind::Io(e) diff --git a/crates/hyperqueue/src/server/event/log/write.rs b/crates/hyperqueue/src/server/event/log/write.rs index c8fe8ea92..25c4b6367 100644 --- a/crates/hyperqueue/src/server/event/log/write.rs +++ b/crates/hyperqueue/src/server/event/log/write.rs @@ -1,6 +1,6 @@ -use crate::common::serialization::bincode_config; +use crate::common::serialization::SerializationConfig; use crate::server::event::log::HQ_JOURNAL_HEADER; -use crate::server::event::Event; +use crate::server::event::{Event, EventSerializationConfig}; use crate::HQ_VERSION; use bincode::Options; use std::fs::{File, OpenOptions}; @@ -32,7 +32,7 @@ impl JournalWriter { if position == 0 && file.stream_position()? == 0 { file.write_all(HQ_JOURNAL_HEADER)?; - bincode_config().serialize_into(&mut file, HQ_VERSION)?; + EventSerializationConfig::config().serialize_into(&mut file, HQ_VERSION)?; file.flush()?; }; @@ -40,7 +40,7 @@ impl JournalWriter { } pub fn store(&mut self, event: Event) -> anyhow::Result<()> { - bincode_config().serialize_into(&mut self.file, &event)?; + EventSerializationConfig::config().serialize_into(&mut self.file, &event)?; Ok(()) } diff --git a/crates/hyperqueue/src/server/event/mod.rs b/crates/hyperqueue/src/server/event/mod.rs index 987e45b4b..9c5bd397f 100644 --- a/crates/hyperqueue/src/server/event/mod.rs +++ b/crates/hyperqueue/src/server/event/mod.rs @@ -2,6 +2,7 @@ pub mod log; pub mod payload; pub mod streamer; +use crate::stream::StreamSerializationConfig; use chrono::serde::ts_milliseconds; use chrono::{DateTime, Utc}; use payload::EventPayload; @@ -9,6 +10,8 @@ use serde::{Deserialize, Serialize}; pub type EventId = u32; +type EventSerializationConfig = StreamSerializationConfig; + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Event { #[serde(with = "ts_milliseconds")] diff --git a/crates/hyperqueue/src/stream/mod.rs b/crates/hyperqueue/src/stream/mod.rs index 1077754f1..4f570c054 100644 --- a/crates/hyperqueue/src/stream/mod.rs +++ b/crates/hyperqueue/src/stream/mod.rs @@ -1 +1,5 @@ +use crate::common::serialization::TrailingAllowedConfig; + pub mod reader; + +pub type StreamSerializationConfig = TrailingAllowedConfig; diff --git a/crates/hyperqueue/src/stream/reader/outputlog.rs b/crates/hyperqueue/src/stream/reader/outputlog.rs index 1b7ff6f11..e8a313f1d 100644 --- a/crates/hyperqueue/src/stream/reader/outputlog.rs +++ b/crates/hyperqueue/src/stream/reader/outputlog.rs @@ -1,7 +1,8 @@ use crate::client::commands::outputlog::{CatOpts, Channel, ExportOpts, ShowOpts}; use crate::common::arraydef::IntArray; use crate::common::error::HqError; -use crate::common::serialization::bincode_config; +use crate::common::serialization::SerializationConfig; +use crate::stream::StreamSerializationConfig; use crate::transfer::stream::{ChannelId, StreamChunkHeader}; use crate::worker::streamer::{StreamFileHeader, STREAM_FILE_HEADER, STREAM_FILE_SUFFIX}; use crate::{JobId, JobTaskId, Set}; @@ -156,7 +157,7 @@ impl OutputLog { } fn read_chunk(file: &mut BufReader) -> crate::Result> { - match bincode_config().deserialize_from(file) { + match StreamSerializationConfig::config().deserialize_from(file) { Ok(event) => Ok(Some(event)), Err(error) => match error.deref() { bincode::ErrorKind::Io(e) @@ -217,7 +218,7 @@ impl OutputLog { if header != STREAM_FILE_HEADER { anyhow::bail!("Invalid file format"); } - Ok(bincode_config().deserialize_from(file)?) + Ok(StreamSerializationConfig::config().deserialize_from(file)?) } pub fn summary(&self) -> Summary { diff --git a/crates/hyperqueue/src/worker/streamer.rs b/crates/hyperqueue/src/worker/streamer.rs index 39e97aaa8..1b7f343fb 100644 --- a/crates/hyperqueue/src/worker/streamer.rs +++ b/crates/hyperqueue/src/worker/streamer.rs @@ -1,5 +1,6 @@ use crate::common::error::HqError; -use crate::common::serialization::bincode_config; +use crate::common::serialization::SerializationConfig; +use crate::stream::StreamSerializationConfig; use crate::transfer::stream::{ChannelId, StreamChunkHeader}; use crate::WrappedRcRefCell; use crate::{JobId, JobTaskId, Map}; @@ -186,7 +187,7 @@ async fn stream_writer( server_uid: Cow::Borrowed(&streamer.server_uid), worker_id: streamer.worker_id, }; - bincode_config().serialize_into(&mut buffer, &header)?; + StreamSerializationConfig::config().serialize_into(&mut buffer, &header)?; }; file.write_all(&buffer).await?; while let Some(message) = receiver.recv().await { @@ -194,7 +195,7 @@ async fn stream_writer( StreamerMessage::Write { header, data } => { log::debug!("Waiting data chunk into stream file"); buffer.clear(); - bincode_config().serialize_into(&mut buffer, &header)?; + StreamSerializationConfig::config().serialize_into(&mut buffer, &header)?; file.write_all(&buffer).await?; if !data.is_empty() { file.write_all(&data).await? From 933f243dfc245a8795690e2ac6e8ea59d9b084dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Mon, 21 Oct 2024 09:28:32 +0200 Subject: [PATCH 5/5] Use `Box<[u8]>` instead of `Vec` in `Serialized` --- crates/hyperqueue/src/common/serialization.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/hyperqueue/src/common/serialization.rs b/crates/hyperqueue/src/common/serialization.rs index dc4ae881b..afc18b5ea 100644 --- a/crates/hyperqueue/src/common/serialization.rs +++ b/crates/hyperqueue/src/common/serialization.rs @@ -29,7 +29,7 @@ impl SerializationConfig for TrailingAllowedConfig { #[derive(Serialize, Deserialize)] pub struct Serialized { #[serde(with = "serde_bytes")] - data: Vec, + data: Box<[u8]>, _phantom: PhantomData<(T, C)>, } @@ -55,8 +55,11 @@ impl Clone for Serialized { impl Serialized { pub fn new(value: &T) -> bincode::Result { + let result = C::config().serialize(value)?; + // Check that we're not reallocating needlessly in `into_boxed_slice` + debug_assert_eq!(result.capacity(), result.len()); Ok(Self { - data: C::config().serialize(value)?, + data: result.into_boxed_slice(), _phantom: Default::default(), }) }