diff --git a/crates/hyperqueue/src/client/commands/journal/output.rs b/crates/hyperqueue/src/client/commands/journal/output.rs index e3be5cab9..1a7ee1c5e 100644 --- a/crates/hyperqueue/src/client/commands/journal/output.rs +++ b/crates/hyperqueue/src/client/commands/journal/output.rs @@ -1,8 +1,7 @@ use crate::client::output::json::format_datetime; use crate::server::event::payload::EventPayload; -use crate::server::event::{bincode_config, Event}; -use crate::transfer::messages::JobDescription; -use bincode::Options; +use crate::server::event::Event; +use crate::transfer::messages::{JobDescription, SubmitRequest}; use serde_json::json; use tako::worker::WorkerOverview; @@ -103,14 +102,12 @@ fn format_payload(event: EventPayload) -> serde_json::Value { closed_job, serialized_desc, } => { - let job_desc: JobDescription = bincode_config() - .deserialize(&serialized_desc) - .expect("Invalid job description data"); + let submit: SubmitRequest = serialized_desc.deserialize().expect("Invalid submit data"); json!({ "type": "job-created", "job": job_id, "closed_job": closed_job, - "desc": JobInfoFormatter(&job_desc).to_json(), + "desc": JobInfoFormatter(&submit.job_desc).to_json(), }) } EventPayload::JobCompleted(job_id) => json!({ diff --git a/crates/hyperqueue/src/server/event/mod.rs b/crates/hyperqueue/src/server/event/mod.rs index 5dc5a7fc9..82f449d71 100644 --- a/crates/hyperqueue/src/server/event/mod.rs +++ b/crates/hyperqueue/src/server/event/mod.rs @@ -6,7 +6,9 @@ use bincode::Options; use chrono::serde::ts_milliseconds; use chrono::{DateTime, Utc}; use payload::EventPayload; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use std::marker::PhantomData; pub type EventId = u32; @@ -21,3 +23,33 @@ pub struct Event { pub(crate) fn bincode_config() -> impl Options { bincode::DefaultOptions::new().allow_trailing_bytes() } + +/// Strongly typed wrapper over serialized with Bincode. +#[derive(Serialize, Deserialize, Debug)] +pub struct Serialized { + #[serde(with = "serde_bytes")] + data: Vec, + _phantom: PhantomData, +} + +impl Clone for Serialized { + fn clone(&self) -> Self { + Self { + data: self.data.clone(), + _phantom: PhantomData, + } + } +} + +impl Serialized { + pub fn new(value: &T) -> bincode::Result { + Ok(Self { + data: bincode_config().serialize(value)?, + _phantom: Default::default(), + }) + } + + pub fn deserialize(&self) -> bincode::Result { + bincode_config().deserialize(&self.data) + } +} diff --git a/crates/hyperqueue/src/server/event/payload.rs b/crates/hyperqueue/src/server/event/payload.rs index 2d3a15965..3fd3811e9 100644 --- a/crates/hyperqueue/src/server/event/payload.rs +++ b/crates/hyperqueue/src/server/event/payload.rs @@ -1,6 +1,7 @@ use crate::server::autoalloc::AllocationId; use crate::server::autoalloc::QueueId; -use crate::transfer::messages::{AllocationQueueParams, JobDescription}; +use crate::server::event::Serialized; +use crate::transfer::messages::{AllocationQueueParams, JobDescription, SubmitRequest}; use crate::JobId; use crate::{JobTaskId, WorkerId}; use serde::{Deserialize, Serialize}; @@ -19,13 +20,13 @@ pub enum EventPayload { WorkerOverviewReceived(WorkerOverview), /// A Job was submitted by the user -- full information to reconstruct the job; /// it will be only stored into file, not held in memory - /// Vec is serialized JobDescription; the main reason is avoiding duplication of JobDescription + /// Vec is serialized SubmitRequest; the main reason is avoiding duplication of SubmitRequest /// (we serialize it before it is stripped down) /// and a nice side effect is that Events can be deserialized without deserializing a potentially large submit data Submit { job_id: JobId, closed_job: bool, - serialized_desc: Vec, + serialized_desc: Serialized, }, /// All tasks of the job have finished. JobCompleted(JobId), @@ -43,7 +44,7 @@ pub enum EventPayload { job_id: JobId, task_id: JobTaskId, }, - // Task that failed to execute + /// Task has failed to execute TaskFailed { job_id: JobId, task_id: JobTaskId, diff --git a/crates/hyperqueue/src/server/event/streamer.rs b/crates/hyperqueue/src/server/event/streamer.rs index 66d9c5c6d..245bc940f 100644 --- a/crates/hyperqueue/src/server/event/streamer.rs +++ b/crates/hyperqueue/src/server/event/streamer.rs @@ -1,10 +1,9 @@ use crate::server::autoalloc::{AllocationId, QueueId}; use crate::server::event::log::{EventStreamMessage, EventStreamSender}; use crate::server::event::payload::EventPayload; -use crate::server::event::{bincode_config, Event}; +use crate::server::event::{Event, Serialized}; use crate::transfer::messages::{AllocationQueueParams, JobDescription, SubmitRequest}; use crate::{JobId, JobTaskId, WorkerId}; -use bincode::Options; use chrono::Utc; use smallvec::SmallVec; use tako::gateway::LostWorkerReason; @@ -64,7 +63,7 @@ impl EventStreamer { self.send_event(EventPayload::Submit { job_id, closed_job: submit_request.job_id.is_none(), - serialized_desc: bincode_config().serialize(submit_request)?, + serialized_desc: Serialized::new(submit_request)?, }); Ok(()) } diff --git a/crates/hyperqueue/src/server/restore.rs b/crates/hyperqueue/src/server/restore.rs index 38790fa6e..4325640e0 100644 --- a/crates/hyperqueue/src/server/restore.rs +++ b/crates/hyperqueue/src/server/restore.rs @@ -1,6 +1,5 @@ use crate::server::autoalloc::QueueId; use crate::server::client::submit_job_desc; -use crate::server::event::bincode_config; use crate::server::event::log::JournalReader; use crate::server::event::payload::EventPayload; use crate::server::job::{Job, JobTaskState, StartedTaskData}; @@ -10,7 +9,6 @@ use crate::transfer::messages::{ }; use crate::worker::start::RunningTaskContext; use crate::{JobId, JobTaskId, Map}; -use bincode::Options; use std::path::Path; use tako::gateway::NewTasksMessage; use tako::{ItemId, WorkerId}; @@ -177,8 +175,7 @@ impl StateRestorer { serialized_desc, } => { log::debug!("Replaying: JobTasksCreated {job_id}"); - let submit_request: SubmitRequest = - bincode_config().deserialize(&serialized_desc)?; + let submit_request: SubmitRequest = serialized_desc.deserialize()?; if closed_job { let mut job = RestorerJob::new(submit_request.job_desc, false); job.add_submit(submit_request.submit_desc);