Skip to content

Commit

Permalink
compute tests
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 4, 2024
1 parent c088df5 commit db30e8a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
17 changes: 10 additions & 7 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1103,8 +1103,8 @@ mod tests {
}

async fn test_hash_dispatcher_complex_inner() {
// This test only works when VirtualNode::COUNT is 256.
static_assertions::const_assert_eq!(VirtualNode::COUNT, 256);
// This test only works when vnode count is 256.
assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);

let num_outputs = 2; // actor id ranges from 1 to 2
let key_indices = &[0, 2];
Expand All @@ -1119,9 +1119,9 @@ mod tests {
})
.collect::<Vec<_>>();
let mut hash_mapping = (1..num_outputs + 1)
.flat_map(|id| vec![id as ActorId; VirtualNode::COUNT / num_outputs])
.flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
.collect_vec();
hash_mapping.resize(VirtualNode::COUNT, num_outputs as u32);
hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
let mut hash_dispatcher = HashDataDispatcher::new(
outputs,
key_indices.to_vec(),
Expand Down Expand Up @@ -1360,6 +1360,9 @@ mod tests {

#[tokio::test]
async fn test_hash_dispatcher() {
// This test only works when vnode count is 256.
assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);

let num_outputs = 5; // actor id ranges from 1 to 5
let cardinality = 10;
let dimension = 4;
Expand All @@ -1375,9 +1378,9 @@ mod tests {
})
.collect::<Vec<_>>();
let mut hash_mapping = (1..num_outputs + 1)
.flat_map(|id| vec![id as ActorId; VirtualNode::COUNT / num_outputs])
.flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
.collect_vec();
hash_mapping.resize(VirtualNode::COUNT, num_outputs as u32);
hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
let mut hash_dispatcher = HashDataDispatcher::new(
outputs,
key_indices.to_vec(),
Expand Down Expand Up @@ -1411,7 +1414,7 @@ mod tests {
hasher.update(&bytes);
}
let output_idx =
hash_mapping[hasher.finish() as usize % VirtualNode::COUNT] as usize - 1;
hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST] as usize - 1;
for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
builder.append(Some(*val));
}
Expand Down
5 changes: 4 additions & 1 deletion src/stream/src/executor/row_id_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,16 @@ mod tests {

#[tokio::test]
async fn test_row_id_gen_executor() {
// This test only works when vnode count is 256.
assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);

let schema = Schema::new(vec![
Field::unnamed(DataType::Serial),
Field::unnamed(DataType::Int64),
]);
let pk_indices = vec![0];
let row_id_index = 0;
let row_id_generator = Bitmap::ones(VirtualNode::COUNT);
let row_id_generator = Bitmap::ones(VirtualNode::COUNT_FOR_TEST);
let (mut tx, upstream) = MockSource::channel();
let upstream = upstream.into_executor(schema.clone(), pk_indices.clone());

Expand Down

0 comments on commit db30e8a

Please sign in to comment.