Skip to content

Commit

Permalink
Remove DapReportInitializer
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Nov 27, 2024
1 parent 1550a14 commit 4bb2929
Show file tree
Hide file tree
Showing 11 changed files with 29 additions and 59 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/dapf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ repository.workspace = true

[dependencies]
anyhow.workspace = true
async-trait.workspace = true
chrono.workspace = true
clap = { workspace = true, features = ["env"] }
daphne = { path = "../daphne", features = ["report-generator", "prometheus"] }
Expand Down
13 changes: 1 addition & 12 deletions crates/dapf/src/acceptance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ pub mod load_testing;

use crate::{deduce_dap_version_from_url, functions, test_durations::TestDurations, HttpClient};
use anyhow::{anyhow, bail, Context, Result};
use async_trait::async_trait;
use daphne::{
hpke::{HpkeConfig, HpkeKemId, HpkeReceiverConfig},
messages::{
self, taskprov::TaskprovAdvertisement, AggregateShareReq, AggregationJobId, Base64Encode,
BatchId, BatchSelector, PartialBatchSelector, TaskId,
},
metrics::DaphneMetrics,
roles::DapReportInitializer,
testing::report_generator::ReportGenerator,
vdaf::VdafConfig,
DapAggregateShare, DapAggregateSpan, DapAggregationParam, DapMeasurement, DapQueryConfig,
Expand All @@ -42,7 +40,6 @@ use std::{
convert::TryFrom,
env,
num::NonZeroU32,
ops::Range,
path::PathBuf,
sync::atomic::{AtomicUsize, Ordering},
time::{Duration, Instant, SystemTime},
Expand Down Expand Up @@ -489,7 +486,7 @@ impl Test {
let (agg_job_state, agg_job_init_req) = task_config
.test_produce_agg_job_req(
fake_leader_hpke_receiver_config,
self,
messages::Time::MIN..messages::Time::MAX,
task_id,
part_batch_sel,
&DapAggregationParam::Empty,
Expand Down Expand Up @@ -697,14 +694,6 @@ impl Test {
}
}

#[async_trait]
impl DapReportInitializer for Test {
fn valid_report_time_range(&self) -> Range<messages::Time> {
// Accept reports with any timestmap.
0..u64::MAX
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Now(u64);
pub fn now() -> Now {
Expand Down
5 changes: 1 addition & 4 deletions crates/daphne-server/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use daphne::{
metrics::DaphneMetrics,
roles::{
aggregator::{MergeAggShareError, TaskprovConfig},
DapAggregator, DapReportInitializer,
DapAggregator,
},
taskprov, DapAggregateShare, DapAggregateSpan, DapError, DapGlobalConfig, DapTaskConfig,
DapVersion,
Expand Down Expand Up @@ -344,10 +344,7 @@ impl DapAggregator for crate::App {
fn audit_log(&self) -> &dyn AuditLog {
&*self.audit_log
}
}

#[async_trait]
impl DapReportInitializer for crate::App {
fn valid_report_time_range(&self) -> Range<messages::Time> {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
Expand Down
20 changes: 9 additions & 11 deletions crates/daphne/src/protocol/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::{
Transition, TransitionFailure, TransitionVar,
},
metrics::{DaphneMetrics, ReportStatus},
roles::DapReportInitializer,
vdaf::{
prio2::{prio2_prep_finish, prio2_prep_finish_from_shares, prio2_prep_init},
prio3::{prio3_prep_finish, prio3_prep_finish_from_shares, prio3_prep_init},
Expand Down Expand Up @@ -294,7 +293,7 @@ impl DapTaskConfig {
pub fn produce_agg_job_req<S>(
&self,
decrypter: impl HpkeDecrypter,
initializer: &impl DapReportInitializer,
valid_report_time_range: Range<messages::Time>,
task_id: &TaskId,
part_batch_sel: &PartialBatchSelector,
agg_param: &DapAggregationParam,
Expand All @@ -306,7 +305,7 @@ impl DapTaskConfig {
{
self.produce_agg_job_req_impl(
decrypter,
initializer,
valid_report_time_range,
task_id,
part_batch_sel,
agg_param,
Expand All @@ -320,7 +319,7 @@ impl DapTaskConfig {
fn produce_agg_job_req_impl<S>(
&self,
decrypter: impl HpkeDecrypter,
initializer: &impl DapReportInitializer,
valid_report_time_range: Range<messages::Time>,
task_id: &TaskId,
part_batch_sel: &PartialBatchSelector,
agg_param: &DapAggregationParam,
Expand Down Expand Up @@ -354,7 +353,7 @@ impl DapTaskConfig {

let initialized_report = InitializedReport::new(
&decrypter,
initializer.valid_report_time_range(),
valid_report_time_range.clone(),
true,
task_id,
self,
Expand Down Expand Up @@ -430,7 +429,7 @@ impl DapTaskConfig {
pub fn test_produce_agg_job_req<S>(
&self,
decrypter: impl HpkeDecrypter,
initializer: &impl DapReportInitializer,
valid_report_time_range: Range<messages::Time>,
task_id: &TaskId,
part_batch_sel: &PartialBatchSelector,
agg_param: &DapAggregationParam,
Expand All @@ -443,7 +442,7 @@ impl DapTaskConfig {
{
self.produce_agg_job_req_impl(
decrypter,
initializer,
valid_report_time_range,
task_id,
part_batch_sel,
agg_param,
Expand All @@ -456,17 +455,16 @@ impl DapTaskConfig {
/// Helper: Consume the `AggregationJobInitReq` sent by the Leader and return the initialized
/// reports.
#[tracing::instrument(skip_all)]
pub fn consume_agg_job_req<H, I>(
pub fn consume_agg_job_req<H>(
&self,
decrypter: &H,
initializer: &I,
valid_report_time_range: Range<messages::Time>,
task_id: &TaskId,
agg_job_init_req: AggregationJobInitReq,
replay_protection: ReplayProtection,
) -> Result<Vec<InitializedReport>, DapError>
where
H: HpkeDecrypter + Sync,
I: DapReportInitializer + Sync,
{
let agg_param =
DapAggregationParam::get_decoded_with_param(&self.vdaf, &agg_job_init_req.agg_param)
Expand All @@ -490,7 +488,7 @@ impl DapTaskConfig {
.map(|prep_init| {
InitializedReport::new(
decrypter,
initializer.valid_report_time_range(),
valid_report_time_range.clone(),
false,
task_id,
self,
Expand Down
1 change: 0 additions & 1 deletion crates/daphne/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ mod test {
TransitionVar,
},
protocol::aggregator::InitializedReport,
roles::DapReportInitializer,
test_versions,
testing::AggregationJobTest,
vdaf::{Prio3Config, VdafConfig},
Expand Down
13 changes: 4 additions & 9 deletions crates/daphne/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@ use crate::{
DapTaskConfig, DapVersion,
};

/// Report initializer. Used by a DAP Aggregator [`DapAggregator`] when initializing an aggregation
/// job.
#[async_trait]
pub trait DapReportInitializer {
/// Return the time range in which a report must appear in order to be considered valid.
fn valid_report_time_range(&self) -> Range<Time>;
}

#[derive(Debug)]
pub enum MergeAggShareError {
AlreadyCollected,
Expand All @@ -42,7 +34,7 @@ pub struct TaskprovConfig<'s> {

/// DAP Aggregator functionality.
#[async_trait]
pub trait DapAggregator: HpkeProvider + DapReportInitializer + Sized {
pub trait DapAggregator: HpkeProvider + Sized {
/// Look up the DAP global configuration.
async fn get_global_config(&self) -> Result<DapGlobalConfig, DapError>;

Expand Down Expand Up @@ -137,6 +129,9 @@ pub trait DapAggregator: HpkeProvider + DapReportInitializer + Sized {

/// Access the audit log.
fn audit_log(&self) -> &dyn AuditLog;

/// Return the time range in which a report must appear in order to be considered valid.
fn valid_report_time_range(&self) -> Range<Time>;
}

/// Handle request for the Aggregator's HPKE configuration.
Expand Down
2 changes: 1 addition & 1 deletion crates/daphne/src/roles/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub async fn handle_agg_job_init_req<A: DapHelper + Sync>(
let version = req.version;
let initialized_reports = task_config.consume_agg_job_req(
&aggregator.get_receiver_configs(task_config.version).await?,
aggregator,
aggregator.valid_report_time_range(),
&task_id,
req.payload,
replay_protection,
Expand Down
2 changes: 1 addition & 1 deletion crates/daphne/src/roles/leader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ async fn run_agg_job<A: DapLeader>(
let agg_job_id = AggregationJobId(thread_rng().gen());
let (agg_job_state, agg_job_init_req) = task_config.produce_agg_job_req(
aggregator.get_receiver_configs(task_config.version).await?,
aggregator,
aggregator.valid_report_time_range(),
task_id,
part_batch_sel,
agg_param,
Expand Down
4 changes: 2 additions & 2 deletions crates/daphne/src/roles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
DapAbort, DapError, DapGlobalConfig, DapQueryConfig, DapRequestMeta, DapTaskConfig,
};

pub use aggregator::{DapAggregator, DapReportInitializer};
pub use aggregator::DapAggregator;
pub use helper::DapHelper;
pub use leader::DapLeader;

Expand Down Expand Up @@ -459,7 +459,7 @@ mod test {
let (leader_state, agg_job_init_req) = task_config
.produce_agg_job_req(
&*self.leader.hpke_receiver_config_list,
&*self.leader,
self.leader.valid_report_time_range(),
task_id,
&part_batch_sel,
&agg_param,
Expand Down
26 changes: 10 additions & 16 deletions crates/daphne/src/testing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
aggregator::{MergeAggShareError, TaskprovConfig},
helper,
leader::{in_memory_leader::InMemoryLeaderState, WorkItem},
DapAggregator, DapHelper, DapLeader, DapReportInitializer,
DapAggregator, DapHelper, DapLeader,
},
taskprov, DapAbort, DapAggregateResult, DapAggregateShare, DapAggregateSpan,
DapAggregationJobState, DapAggregationParam, DapBatchBucket, DapCollectionJob, DapError,
Expand Down Expand Up @@ -68,14 +68,11 @@ pub struct AggregationJobTest {
pub(crate) leader_metrics: DaphnePromMetrics,
}

#[async_trait]
impl DapReportInitializer for AggregationJobTest {
fn valid_report_time_range(&self) -> Range<Time> {
impl AggregationJobTest {
pub fn valid_report_time_range(&self) -> Range<Time> {
self.valid_report_range.clone()
}
}

impl AggregationJobTest {
/// Create an aggregation job test with the given VDAF config, HPKE KEM algorithm, DAP protocol
/// version. The KEM algorithm is used to generate an HPKE config for each party.
pub fn new(vdaf: &VdafConfig, kem_id: HpkeKemId, version: DapVersion) -> Self {
Expand Down Expand Up @@ -193,7 +190,7 @@ impl AggregationJobTest {
self.task_config
.test_produce_agg_job_req(
&self.leader_hpke_receiver_config,
self,
self.valid_report_time_range(),
&self.task_id,
&PartialBatchSelector::TimeInterval,
agg_param,
Expand All @@ -219,7 +216,7 @@ impl AggregationJobTest {
.task_config
.consume_agg_job_req(
&self.helper_hpke_receiver_config,
self,
self.valid_report_time_range(),
&self.task_id,
agg_job_init_req,
self.replay_protection,
Expand Down Expand Up @@ -654,14 +651,6 @@ impl HpkeProvider for InMemoryAggregator {
}
}

#[async_trait]
impl DapReportInitializer for InMemoryAggregator {
fn valid_report_time_range(&self) -> Range<messages::Time> {
// Accept reports with any timestmap.
0..u64::MAX
}
}

#[async_trait]
impl DapAggregator for InMemoryAggregator {
async fn get_global_config(&self) -> Result<DapGlobalConfig, DapError> {
Expand Down Expand Up @@ -866,6 +855,11 @@ impl DapAggregator for InMemoryAggregator {
fn audit_log(&self) -> &dyn AuditLog {
&self.audit_log
}

fn valid_report_time_range(&self) -> Range<messages::Time> {
// Accept reports with any timestmap.
messages::Time::MIN..messages::Time::MAX
}
}

#[async_trait]
Expand Down

0 comments on commit 4bb2929

Please sign in to comment.