Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_fix_correct_commit_ssts
  • Loading branch information
Li0k committed Sep 5, 2024
2 parents 22cb537 + 3b98b71 commit be0b9ad
Show file tree
Hide file tree
Showing 23 changed files with 206 additions and 381 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use std::mem::swap;

use futures::pin_mut;
use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
use risingwave_common::hash::{HashKey, HashKeyDispatcher};
use risingwave_common::hash::{HashKey, HashKeyDispatcher, VirtualNode};
use risingwave_common::memory::MemoryContext;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
Expand All @@ -30,7 +31,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{TableDistribution, TableIter};
use risingwave_storage::table::TableIter;
use risingwave_storage::{dispatch_state_store, StateStore};

use crate::error::Result;
Expand Down Expand Up @@ -194,7 +195,8 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
.collect();

// Lookup Join always contains distribution key, so we don't need vnode bitmap
let vnodes = Some(TableDistribution::all_vnodes());
// TODO(var-vnode): use vnode count from table desc
let vnodes = Some(Bitmap::ones(VirtualNode::COUNT).into());
dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);
let inner_side_builder = InnerSideExecutorBuilder::new(
Expand Down
9 changes: 4 additions & 5 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::marker::PhantomData;

use anyhow::Context;
use itertools::Itertools;
use risingwave_common::bitmap::BitmapBuilder;
use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{
Expand Down Expand Up @@ -408,12 +408,11 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
})
.collect();

// TODO(var-vnode): use vnode count from table desc
let vnodes = Some(Bitmap::ones(VirtualNode::COUNT).into());
let inner_side_builder = InnerSideExecutorBuilder {
table_desc: table_desc.clone(),
table_distribution: TableDistribution::new_from_storage_table_desc(
Some(TableDistribution::all_vnodes()),
table_desc,
),
table_distribution: TableDistribution::new_from_storage_table_desc(vnodes, table_desc),
vnode_mapping,
outer_side_key_types,
inner_side_schema,
Expand Down
6 changes: 4 additions & 2 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use prometheus::Histogram;
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::ScalarImpl;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch};
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{collect_data_chunk, TableDistribution};
use risingwave_storage::table::collect_data_chunk;
use risingwave_storage::{dispatch_state_store, StateStore};

use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
Expand Down Expand Up @@ -106,7 +107,8 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
None => Some(TableDistribution::all_vnodes()),
// TODO(var-vnode): use vnode count from table desc
None => Some(Bitmap::ones(VirtualNode::COUNT).into()),
};

let chunk_size = source.context.get_config().developer.chunk_size as u32;
Expand Down
5 changes: 3 additions & 2 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use prometheus::Histogram;
use risingwave_common::array::DataChunk;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Schema};
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
Expand All @@ -32,7 +33,6 @@ use risingwave_pb::plan_common::as_of::AsOfType;
use risingwave_pb::plan_common::{as_of, PbAsOf, StorageTableDesc};
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::TableDistribution;
use risingwave_storage::{dispatch_state_store, StateStore};

use crate::error::{BatchError, Result};
Expand Down Expand Up @@ -210,7 +210,8 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
None => Some(TableDistribution::all_vnodes()),
// TODO(var-vnode): use vnode count from table desc
None => Some(Bitmap::ones(VirtualNode::COUNT).into()),
};

let scan_ranges = {
Expand Down
14 changes: 14 additions & 0 deletions src/common/src/hash/consistent_hash/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::ops::RangeInclusive;

use crate::bitmap::Bitmap;
use crate::hash::table_distribution::SINGLETON_VNODE;
use crate::hash::VirtualNode;

/// An extension trait for `Bitmap` to support virtual node operations.
Expand All @@ -36,4 +37,17 @@ impl Bitmap {
self.high_ranges()
.map(|r| (VirtualNode::from_index(*r.start())..=VirtualNode::from_index(*r.end())))
}

/// Returns whether only the [`SINGLETON_VNODE`] is set in the bitmap.
///
/// Note that this method returning `true` does not imply that the bitmap was created by
/// [`VnodeBitmapExt::singleton`], or that the bitmap has length 1.
pub fn is_singleton(&self) -> bool {
self.count_ones() == 1 && self.iter_vnodes().next().unwrap() == SINGLETON_VNODE
}

/// Creates a bitmap with length 1 and the single bit set.
pub fn singleton() -> Self {
Self::ones(1)
}
}
5 changes: 5 additions & 0 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ impl VirtualNode {
}
}

impl VirtualNode {
pub const COUNT_FOR_TEST: usize = Self::COUNT;
pub const MAX_FOR_TEST: VirtualNode = Self::MAX;
}

impl VirtualNode {
// `compute_chunk` is used to calculate the `VirtualNode` for the columns in the
// chunk. When only one column is provided and its type is `Serial`, we consider the column to
Expand Down
Loading

0 comments on commit be0b9ad

Please sign in to comment.