From 80e0b65696d759e8808e5314aa0341450dcf9b37 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 4 Sep 2024 17:06:35 +0800 Subject: [PATCH] compute tests Signed-off-by: Bugen Zhao --- src/stream/src/executor/dispatch.rs | 17 ++++++++++------- src/stream/src/executor/row_id_gen.rs | 5 ++++- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 4a43ff618ebf7..1fb8ecb247cde 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -1103,8 +1103,8 @@ mod tests { } async fn test_hash_dispatcher_complex_inner() { - // This test only works when VirtualNode::COUNT is 256. - static_assertions::const_assert_eq!(VirtualNode::COUNT, 256); + // This test only works when vnode count is 256. + assert_eq!(VirtualNode::COUNT_FOR_TEST, 256); let num_outputs = 2; // actor id ranges from 1 to 2 let key_indices = &[0, 2]; @@ -1119,9 +1119,9 @@ mod tests { }) .collect::>(); let mut hash_mapping = (1..num_outputs + 1) - .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT / num_outputs]) + .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs]) .collect_vec(); - hash_mapping.resize(VirtualNode::COUNT, num_outputs as u32); + hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32); let mut hash_dispatcher = HashDataDispatcher::new( outputs, key_indices.to_vec(), @@ -1360,6 +1360,9 @@ mod tests { #[tokio::test] async fn test_hash_dispatcher() { + // This test only works when vnode count is 256. + assert_eq!(VirtualNode::COUNT_FOR_TEST, 256); + let num_outputs = 5; // actor id ranges from 1 to 5 let cardinality = 10; let dimension = 4; @@ -1375,9 +1378,9 @@ mod tests { }) .collect::>(); let mut hash_mapping = (1..num_outputs + 1) - .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT / num_outputs]) + .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs]) .collect_vec(); - hash_mapping.resize(VirtualNode::COUNT, num_outputs as u32); + hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32); let mut hash_dispatcher = HashDataDispatcher::new( outputs, key_indices.to_vec(), @@ -1411,7 +1414,7 @@ mod tests { hasher.update(&bytes); } let output_idx = - hash_mapping[hasher.finish() as usize % VirtualNode::COUNT] as usize - 1; + hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST] as usize - 1; for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) { builder.append(Some(*val)); } diff --git a/src/stream/src/executor/row_id_gen.rs b/src/stream/src/executor/row_id_gen.rs index 1fcb85c26f88e..5465a1b54ec2e 100644 --- a/src/stream/src/executor/row_id_gen.rs +++ b/src/stream/src/executor/row_id_gen.rs @@ -134,13 +134,16 @@ mod tests { #[tokio::test] async fn test_row_id_gen_executor() { + // This test only works when vnode count is 256. + assert_eq!(VirtualNode::COUNT_FOR_TEST, 256); + let schema = Schema::new(vec![ Field::unnamed(DataType::Serial), Field::unnamed(DataType::Int64), ]); let pk_indices = vec![0]; let row_id_index = 0; - let row_id_generator = Bitmap::ones(VirtualNode::COUNT); + let row_id_generator = Bitmap::ones(VirtualNode::COUNT_FOR_TEST); let (mut tx, upstream) = MockSource::channel(); let upstream = upstream.into_executor(schema.clone(), pk_indices.clone());