Skip to content

Commit

Permalink
fix(interactive): fix peers_contains bugs in count operator in subtas…
Browse files Browse the repository at this point in the history
…ks (#3390)

Fixes #3177 #3370
  • Loading branch information
lnfjpt authored Dec 1, 2023
1 parent d1d53f2 commit dbf2803
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,17 @@ impl<D: Data> Count<D> for Stream<D> {
if let Some(end) = batch.take_end() {
let mut session = output.new_session(&batch.tag)?;
trace_worker!("local count {} of {:?}", cnt, batch.tag);
session.give_last(cnt, end)?;
if end.tag.len() > 0 {
let mut new_end = end.clone();
let mut new_peers = end.peers().clone();
let owner_index = batch.tag.current_uncheck()
% crate::worker_id::get_current_worker().total_peers();
new_peers.add_source(owner_index);
new_end.update_peers(new_peers);
session.give_last(cnt, new_end)?;
} else {
session.give_last(cnt, end)?;
}
} else {
table.insert(batch.tag.clone(), cnt);
}
Expand All @@ -30,15 +40,36 @@ impl<D: Data> Count<D> for Stream<D> {
if let Some(cnt) = table.remove(&batch.tag) {
let mut session = output.new_session(&batch.tag)?;
trace_worker!("local count {} of {:?}", cnt, batch.tag);
session.give_last(cnt, end)?;
if end.tag.len() > 0 {
let mut new_end = end.clone();
let mut new_peers = end.peers().clone();
let owner_index = batch.tag.current_uncheck()
% crate::worker_id::get_current_worker().total_peers();
new_peers.add_source(owner_index);
new_end.update_peers(new_peers);
session.give_last(cnt, new_end)?;
} else {
session.give_last(cnt, end)?;
}
} else {
let worker = crate::worker_id::get_current_worker().index;
if end.contains_source(worker) {
let new_end = if end.tag.len() > 0 {
let mut new_end = end.clone();
let mut new_peers = end.peers().clone();
let owner_index = batch.tag.current_uncheck()
% crate::worker_id::get_current_worker().total_peers();
new_peers.add_source(owner_index);
new_end.update_peers(new_peers);
new_end
} else {
end
};
if new_end.contains_source(worker) {
let mut session = output.new_session(&batch.tag)?;
trace_worker!("local count {} of {:?}", 0, batch.tag);
session.give_last(0, end)?;
session.give_last(0, new_end)?;
} else {
output.notify_end(end)?;
output.notify_end(new_end)?;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//

use pegasus::api::{
Collect, CorrelatedSubTask, Count, Filter, Fold, FoldByKey, KeyBy, Limit, Map, Sink, SortBy,
Collect, CorrelatedSubTask, Count, Filter, Fold, FoldByKey, HasAny, KeyBy, Limit, Map, Sink, SortBy,
SortLimitBy,
};
use pegasus::JobConf;
Expand Down Expand Up @@ -101,6 +101,37 @@ fn count_test_03() {
assert_eq!(1, count);
}

#[test]
fn count_test_04() {
let mut conf = JobConf::new("count_test_03");
conf.set_workers(2);
let mut result = pegasus::run(conf, || {
move |input, output| {
input
.input_from(0..10u64)?
.apply(|sub| {
sub.repartition(move |id| Ok(*id % 2))
.flat_map(|i| Ok(0..i))?
.repartition(move |id| Ok(*id % 2))
.count()?
.into_stream()?
.map(|i| Ok(i))?
.any()
})?
.filter_map(|i| if i.1 { Ok(Some(i.0)) } else { Ok(None) })?
.sink_into(output)
}
})
.expect("submit job failure:");

let mut count = 0;
while let Some(Ok(d)) = result.next() {
count += d;
}

assert_eq!(90, count);
}

#[test]
fn collect_test() {
let mut conf = JobConf::new("collect_test");
Expand Down

0 comments on commit dbf2803

Please sign in to comment.