diff --git a/Cargo.lock b/Cargo.lock index ab8b845e77c27..0c000d59950b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8858,6 +8858,7 @@ dependencies = [ "prometheus", "rand", "risingwave_common", + "risingwave_common_estimate_size", "risingwave_connector", "risingwave_dml", "risingwave_expr", @@ -9094,7 +9095,9 @@ version = "1.7.0-alpha" dependencies = [ "bytes", "educe 0.5.7", + "ethnum", "fixedbitset 0.5.0", + "jsonbb", "lru 0.7.6", "risingwave_common_proc_macro", "rust_decimal", @@ -9353,6 +9356,7 @@ dependencies = [ "regex", "reqwest", "risingwave_common", + "risingwave_common_estimate_size", "risingwave_jni_core", "risingwave_pb", "risingwave_rpc_client", @@ -9516,6 +9520,7 @@ dependencies = [ "parse-display", "paste", "risingwave_common", + "risingwave_common_estimate_size", "risingwave_expr_macro", "risingwave_pb", "risingwave_udf", @@ -9555,6 +9560,7 @@ dependencies = [ "openssl", "regex", "risingwave_common", + "risingwave_common_estimate_size", "risingwave_expr", "risingwave_pb", "rust_decimal", @@ -9681,6 +9687,7 @@ dependencies = [ "parse-display", "prost 0.12.1", "risingwave_common", + "risingwave_common_estimate_size", "risingwave_pb", "tracing", "workspace-hack", @@ -10300,6 +10307,7 @@ dependencies = [ "rand", "risingwave_backup", "risingwave_common", + "risingwave_common_estimate_size", "risingwave_common_service", "risingwave_expr", "risingwave_hummock_sdk", @@ -10363,6 +10371,7 @@ dependencies = [ "prost 0.12.1", "rand", "risingwave_common", + "risingwave_common_estimate_size", "risingwave_connector", "risingwave_dml", "risingwave_expr", diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 280c98020fd97..f87db26ba84be 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -34,6 +34,7 @@ parking_lot = { version = "0.12", features = ["arc_lock"] } paste = "1" prometheus = { version = "0.13", features = ["process"] } risingwave_common = { workspace = true } +risingwave_common_estimate_size = { workspace = true } risingwave_connector = { workspace = true } risingwave_dml = { workspace = true } risingwave_expr = { workspace = true } diff --git a/src/batch/src/executor/aggregation/distinct.rs b/src/batch/src/executor/aggregation/distinct.rs index 3ee73618963c6..6792ff7a80f69 100644 --- a/src/batch/src/executor/aggregation/distinct.rs +++ b/src/batch/src/executor/aggregation/distinct.rs @@ -17,9 +17,9 @@ use std::ops::Range; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::BitmapBuilder; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, Datum}; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{ AggStateDyn, AggregateFunction, AggregateState, BoxedAggregateFunction, }; diff --git a/src/batch/src/executor/aggregation/orderby.rs b/src/batch/src/executor/aggregation/orderby.rs index d02ac0a0476d9..e9cb90eb7fd79 100644 --- a/src/batch/src/executor/aggregation/orderby.rs +++ b/src/batch/src/executor/aggregation/orderby.rs @@ -16,12 +16,12 @@ use std::ops::Range; use anyhow::Context; use risingwave_common::array::{Op, RowRef, StreamChunk}; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::memcmp_encoding; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{ AggStateDyn, AggregateFunction, AggregateState, BoxedAggregateFunction, }; diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index 6acd3de2f54f2..402d96934a1b1 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -18,11 +18,11 @@ use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::{DataChunk, StreamChunk}; use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher}; use risingwave_common::memory::MemoryContext; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::HashAggNode; diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index ea8959849d017..83270dfd2a2c2 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -28,6 +28,7 @@ use risingwave_common::row::{repeat_n, RowExt}; use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::expr::{build_from_prost, BoxedExpression, Expression}; use risingwave_pb::batch_plan::plan_node::NodeBody; @@ -36,7 +37,6 @@ use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::risingwave_common::estimate_size::EstimateSize; use crate::risingwave_common::hash::NullBitmap; use crate::task::{BatchTaskContext, ShutdownToken}; diff --git a/src/batch/src/executor/join/lookup_join_base.rs b/src/batch/src/executor/join/lookup_join_base.rs index 8128e901ab7a5..72af8ef418449 100644 --- a/src/batch/src/executor/join/lookup_join_base.rs +++ b/src/batch/src/executor/join/lookup_join_base.rs @@ -26,6 +26,7 @@ use risingwave_common::types::{DataType, ToOwnedDatum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType}; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::expr::BoxedExpression; use crate::error::BatchError; @@ -34,7 +35,6 @@ use crate::executor::{ utils, BoxedDataChunkListStream, BoxedExecutor, BufferChunkExecutor, EquiJoinParams, HashJoinExecutor, JoinHashMap, JoinType, LookupExecutorBuilder, RowId, }; -use crate::risingwave_common::estimate_size::EstimateSize; use crate::task::ShutdownToken; /// Lookup Join Base. diff --git a/src/batch/src/executor/join/nested_loop_join.rs b/src/batch/src/executor/join/nested_loop_join.rs index 9ecd1fe1709e5..43b6448ad39f2 100644 --- a/src/batch/src/executor/join/nested_loop_join.rs +++ b/src/batch/src/executor/join/nested_loop_join.rs @@ -17,12 +17,12 @@ use risingwave_common::array::data_chunk_iter::RowRef; use risingwave_common::array::{Array, DataChunk}; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::catalog::Schema; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::memory::MemoryContext; use risingwave_common::row::{repeat_n, RowExt}; use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqDebug; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::expr::{ build_from_prost as expr_build_from_prost, BoxedExpression, Expression, }; diff --git a/src/batch/src/executor/merge_sort_exchange.rs b/src/batch/src/executor/merge_sort_exchange.rs index c739d88e0c692..1562f2a8f4557 100644 --- a/src/batch/src/executor/merge_sort_exchange.rs +++ b/src/batch/src/executor/merge_sort_exchange.rs @@ -17,10 +17,10 @@ use std::sync::Arc; use futures_async_stream::try_stream; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::memory::{MemMonitoredHeap, MemoryContext, MonitoredGlobalAlloc}; use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::sort_util::{ColumnOrder, HeapElem}; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::PbExchangeSource; diff --git a/src/batch/src/executor/top_n.rs b/src/batch/src/executor/top_n.rs index bcfe8a388c64a..c02b84ec5b7ca 100644 --- a/src/batch/src/executor/top_n.rs +++ b/src/batch/src/executor/top_n.rs @@ -19,12 +19,12 @@ use std::vec::Vec; use futures_async_stream::try_stream; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Schema; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::memory::{MemMonitoredHeap, MemoryContext}; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::memcmp_encoding::{encode_chunk, MemcmpEncoded}; use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::batch_plan::plan_node::NodeBody; use crate::error::{BatchError, Result}; diff --git a/src/common/estimate_size/Cargo.toml b/src/common/estimate_size/Cargo.toml index 2a3bfa50b1630..04bcf369cc588 100644 --- a/src/common/estimate_size/Cargo.toml +++ b/src/common/estimate_size/Cargo.toml @@ -17,11 +17,13 @@ normal = ["workspace-hack"] [dependencies] bytes = "1" educe = "0.5" +ethnum = { version = "1", features = ["serde"] } fixedbitset = "0.5" +jsonbb = "0.1.2" lru = { workspace = true } +risingwave_common_proc_macro = { workspace = true } rust_decimal = "1" serde_json = "1" -risingwave_common_proc_macro = { workspace = true } [lints] workspace = true diff --git a/src/common/estimate_size/src/lib.rs b/src/common/estimate_size/src/lib.rs index 16ae3d9153f83..32e41fcf87fb2 100644 --- a/src/common/estimate_size/src/lib.rs +++ b/src/common/estimate_size/src/lib.rs @@ -17,13 +17,13 @@ pub mod collections; +use std::cmp::Reverse; use std::marker::PhantomData; use std::sync::atomic::{AtomicUsize, Ordering}; use bytes::Bytes; use fixedbitset::FixedBitSet; pub use risingwave_common_proc_macro::EstimateSize; -use rust_decimal::Decimal as RustDecimal; /// The trait for estimating the actual memory usage of a struct. /// @@ -93,6 +93,18 @@ impl EstimateSize for serde_json::Value { } } +impl EstimateSize for jsonbb::Value { + fn estimated_heap_size(&self) -> usize { + self.capacity() + } +} + +impl EstimateSize for jsonbb::Builder { + fn estimated_heap_size(&self) -> usize { + self.capacity() + } +} + impl EstimateSize for (T1, T2) { fn estimated_heap_size(&self) -> usize { self.0.estimated_heap_size() + self.1.estimated_heap_size() @@ -127,13 +139,21 @@ impl EstimateSize for Box<[T]> { } } +impl EstimateSize for Reverse { + fn estimated_heap_size(&self) -> usize { + self.0.estimated_heap_size() + } +} + impl EstimateSize for [T; LEN] { fn estimated_heap_size(&self) -> usize { 0 } } -impl ZeroHeapSize for RustDecimal {} +impl ZeroHeapSize for rust_decimal::Decimal {} + +impl ZeroHeapSize for ethnum::I256 {} impl ZeroHeapSize for PhantomData {} diff --git a/src/common/src/array/bool_array.rs b/src/common/src/array/bool_array.rs index 8034ee788c862..ebee50c8e4b72 100644 --- a/src/common/src/array/bool_array.rs +++ b/src/common/src/array/bool_array.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::data::{ArrayType, PbArray}; use super::{Array, ArrayBuilder, DataType}; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::estimate_size::EstimateSize; #[derive(Debug, Clone, PartialEq, Eq)] pub struct BoolArray { diff --git a/src/common/src/array/bytes_array.rs b/src/common/src/array/bytes_array.rs index 5ad340c0917b0..1257730b63c96 100644 --- a/src/common/src/array/bytes_array.rs +++ b/src/common/src/array/bytes_array.rs @@ -15,13 +15,13 @@ use std::iter; use std::mem::size_of; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::common::buffer::CompressionType; use risingwave_pb::common::Buffer; use risingwave_pb::data::{ArrayType, PbArray}; use super::{Array, ArrayBuilder, DataType}; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::estimate_size::EstimateSize; use crate::util::iter_util::ZipEqDebug; /// `BytesArray` is a collection of Rust `[u8]`s. diff --git a/src/common/src/array/data_chunk.rs b/src/common/src/array/data_chunk.rs index 22d5bb55711ce..849979abc5155 100644 --- a/src/common/src/array/data_chunk.rs +++ b/src/common/src/array/data_chunk.rs @@ -23,13 +23,13 @@ use either::Either; use itertools::Itertools; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::data::PbDataChunk; use super::{Array, ArrayImpl, ArrayRef, ArrayResult, StructArray}; use crate::array::data_chunk_iter::RowRef; use crate::array::ArrayBuilderImpl; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::estimate_size::EstimateSize; use crate::field_generator::{FieldGeneratorImpl, VarcharProperty}; use crate::hash::HashCode; use crate::row::Row; diff --git a/src/common/src/array/jsonb_array.rs b/src/common/src/array/jsonb_array.rs index 4e0bcaafd35da..4a0df2f55835d 100644 --- a/src/common/src/array/jsonb_array.rs +++ b/src/common/src/array/jsonb_array.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::data::{PbArray, PbArrayType}; use super::{Array, ArrayBuilder, ArrayImpl, ArrayResult}; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::estimate_size::EstimateSize; use crate::types::{DataType, JsonbRef, JsonbVal, Scalar}; #[derive(Debug, Clone, EstimateSize)] @@ -176,15 +176,3 @@ impl FromIterator for JsonbArray { iter.into_iter().map(Some).collect() } } - -impl EstimateSize for jsonbb::Value { - fn estimated_heap_size(&self) -> usize { - self.capacity() - } -} - -impl EstimateSize for jsonbb::Builder { - fn estimated_heap_size(&self) -> usize { - self.capacity() - } -} diff --git a/src/common/src/array/list_array.rs b/src/common/src/array/list_array.rs index 8f0235f8145d1..dae9a4a94bc93 100644 --- a/src/common/src/array/list_array.rs +++ b/src/common/src/array/list_array.rs @@ -20,6 +20,7 @@ use std::mem::size_of; use bytes::{Buf, BufMut}; use itertools::Itertools; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::data::{ListArrayData, PbArray, PbArrayType}; use serde::{Deserialize, Serializer}; use thiserror_ext::AsReport; @@ -29,7 +30,6 @@ use super::{ PrimitiveArrayItemType, RowRef, Utf8Array, }; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::estimate_size::EstimateSize; use crate::row::Row; use crate::types::{ hash_datum, DataType, Datum, DatumRef, DefaultOrd, Scalar, ScalarImpl, ScalarRefImpl, diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index fd8d408d5f813..ae8d1a9e19e38 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -62,6 +62,7 @@ pub use jsonb_array::{JsonbArray, JsonbArrayBuilder}; pub use list_array::{ListArray, ListArrayBuilder, ListRef, ListValue}; use paste::paste; pub use primitive_array::{PrimitiveArray, PrimitiveArrayBuilder, PrimitiveArrayItemType}; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::data::PbArray; pub use stream_chunk::{Op, StreamChunk, StreamChunkTestExt}; pub use struct_array::{StructArray, StructArrayBuilder, StructRef, StructValue}; @@ -70,7 +71,6 @@ pub use utf8_array::*; pub use self::error::ArrayError; pub use crate::array::num256_array::{Int256Array, Int256ArrayBuilder}; use crate::buffer::Bitmap; -use crate::estimate_size::EstimateSize; use crate::types::*; use crate::{dispatch_array_builder_variants, dispatch_array_variants, for_all_array_variants}; pub type ArrayResult = Result; diff --git a/src/common/src/array/num256_array.rs b/src/common/src/array/num256_array.rs index 9e9c56fcef0b5..48ee99ac21db2 100644 --- a/src/common/src/array/num256_array.rs +++ b/src/common/src/array/num256_array.rs @@ -15,13 +15,13 @@ use std::io::{Cursor, Read}; use ethnum::I256; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::common::buffer::CompressionType; use risingwave_pb::common::Buffer; use risingwave_pb::data::PbArray; use crate::array::{Array, ArrayBuilder, ArrayImpl, ArrayResult}; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::estimate_size::{EstimateSize, ZeroHeapSize}; use crate::types::{DataType, Int256, Int256Ref, Scalar}; #[derive(Debug, Clone, EstimateSize)] @@ -36,8 +36,6 @@ pub struct Int256Array { data: Box<[I256]>, } -impl ZeroHeapSize for I256 {} - #[rustfmt::skip] macro_rules! impl_array_for_num256 { ( diff --git a/src/common/src/array/primitive_array.rs b/src/common/src/array/primitive_array.rs index 844774f9f69e8..9840ba68135c5 100644 --- a/src/common/src/array/primitive_array.rs +++ b/src/common/src/array/primitive_array.rs @@ -16,13 +16,13 @@ use std::fmt::Debug; use std::io::Write; use std::mem::size_of; +use risingwave_common_estimate_size::{EstimateSize, ZeroHeapSize}; use risingwave_pb::common::buffer::CompressionType; use risingwave_pb::common::Buffer; use risingwave_pb::data::{ArrayType, PbArray}; use super::{Array, ArrayBuilder, ArrayImpl, ArrayResult}; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::estimate_size::{EstimateSize, ZeroHeapSize}; use crate::for_all_native_types; use crate::types::*; diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index 0fd5563d4a790..157e98a429544 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -22,6 +22,7 @@ use either::Either; use itertools::Itertools; use rand::prelude::SmallRng; use rand::{Rng, SeedableRng}; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::data::{PbOp, PbStreamChunk}; use super::stream_chunk_builder::StreamChunkBuilder; @@ -29,7 +30,6 @@ use super::{ArrayImpl, ArrayRef, ArrayResult, DataChunkTestExt, RowRef}; use crate::array::DataChunk; use crate::buffer::{Bitmap, BitmapBuilder}; use crate::catalog::Schema; -use crate::estimate_size::EstimateSize; use crate::field_generator::VarcharProperty; use crate::row::Row; use crate::types::{DataType, DefaultOrdered, ToText}; diff --git a/src/common/src/array/struct_array.rs b/src/common/src/array/struct_array.rs index 6309c68347486..e5a1687bbcfeb 100644 --- a/src/common/src/array/struct_array.rs +++ b/src/common/src/array/struct_array.rs @@ -20,13 +20,13 @@ use std::sync::Arc; use bytes::{Buf, BufMut}; use either::Either; use itertools::Itertools; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::data::{PbArray, PbArrayType, StructArrayData}; use super::{Array, ArrayBuilder, ArrayBuilderImpl, ArrayImpl, ArrayResult, DataChunk}; use crate::array::ArrayRef; use crate::buffer::{Bitmap, BitmapBuilder}; use crate::error::BoxedError; -use crate::estimate_size::EstimateSize; use crate::types::{ hash_datum, DataType, Datum, DatumRef, DefaultOrd, Scalar, ScalarImpl, StructType, ToDatumRef, ToText, diff --git a/src/common/src/array/utf8_array.rs b/src/common/src/array/utf8_array.rs index 744bf4c192aa5..eddcc50bb8ec2 100644 --- a/src/common/src/array/utf8_array.rs +++ b/src/common/src/array/utf8_array.rs @@ -14,12 +14,12 @@ use std::fmt::{Display, Write}; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::data::{ArrayType, PbArray}; use super::bytes_array::{BytesWriter, PartialBytesWriter}; use super::{Array, ArrayBuilder, BytesArray, BytesArrayBuilder, DataType}; use crate::buffer::Bitmap; -use crate::estimate_size::EstimateSize; /// `Utf8Array` is a collection of Rust Utf8 `str`s. It's a wrapper of `BytesArray`. #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/src/common/src/buffer/bitmap.rs b/src/common/src/buffer/bitmap.rs index 1dc9bfdd64965..272cae6404f1e 100644 --- a/src/common/src/buffer/bitmap.rs +++ b/src/common/src/buffer/bitmap.rs @@ -39,11 +39,10 @@ use std::iter::{self, TrustedLen}; use std::ops::{BitAnd, BitAndAssign, BitOr, BitOrAssign, BitXor, Not, RangeInclusive}; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::common::buffer::CompressionType; use risingwave_pb::common::PbBuffer; -use crate::estimate_size::EstimateSize; - #[derive(Default, Debug, Clone, EstimateSize)] pub struct BitmapBuilder { len: usize, diff --git a/src/common/src/hash/key.rs b/src/common/src/hash/key.rs index 4e7072a4abdf4..c136710edb23f 100644 --- a/src/common/src/hash/key.rs +++ b/src/common/src/hash/key.rs @@ -30,11 +30,11 @@ use std::marker::PhantomData; use bytes::{Buf, BufMut}; use chrono::{Datelike, Timelike}; use fixedbitset::FixedBitSet; +use risingwave_common_estimate_size::EstimateSize; use smallbitset::Set64; use static_assertions::const_assert_eq; use crate::array::{ListValue, StructValue}; -use crate::estimate_size::EstimateSize; use crate::types::{ DataType, Date, Decimal, Int256, Int256Ref, JsonbVal, Scalar, ScalarRef, ScalarRefImpl, Serial, Time, Timestamp, Timestamptz, F32, F64, diff --git a/src/common/src/hash/key_v2.rs b/src/common/src/hash/key_v2.rs index 6d64106d556d2..fa3512f2dc0cf 100644 --- a/src/common/src/hash/key_v2.rs +++ b/src/common/src/hash/key_v2.rs @@ -20,11 +20,11 @@ use bytes::BufMut; use educe::Educe; use either::{for_both, Either}; use itertools::Itertools; +use risingwave_common_estimate_size::EstimateSize; use tinyvec::ArrayVec; use super::{HeapNullBitmap, NullBitmap, XxHash64HashCode}; use crate::array::{Array, ArrayBuilder, ArrayBuilderImpl, ArrayResult, DataChunk}; -use crate::estimate_size::EstimateSize; use crate::hash::{HashKeyDe, HashKeySer}; use crate::row::OwnedRow; use crate::types::{DataType, Datum, ScalarImpl}; diff --git a/src/common/src/memory/monitored_heap.rs b/src/common/src/memory/monitored_heap.rs index cc0e123e8ed2d..58df24817a69b 100644 --- a/src/common/src/memory/monitored_heap.rs +++ b/src/common/src/memory/monitored_heap.rs @@ -15,7 +15,8 @@ use std::collections::BinaryHeap; use std::mem::size_of; -use crate::estimate_size::EstimateSize; +use risingwave_common_estimate_size::EstimateSize; + use crate::memory::{MemoryContext, MonitoredGlobalAlloc}; pub struct MemMonitoredHeap { diff --git a/src/common/src/row/compacted_row.rs b/src/common/src/row/compacted_row.rs index 82814d71b6d11..9dba1e46c3696 100644 --- a/src/common/src/row/compacted_row.rs +++ b/src/common/src/row/compacted_row.rs @@ -13,9 +13,9 @@ // limitations under the License. use bytes::Bytes; +use risingwave_common_estimate_size::EstimateSize; use super::{OwnedRow, Row, RowDeserializer}; -use crate::estimate_size::EstimateSize; use crate::types::DataType; use crate::util::value_encoding; diff --git a/src/common/src/row/owned_row.rs b/src/common/src/row/owned_row.rs index 282e25dac0913..8ea3803538d84 100644 --- a/src/common/src/row/owned_row.rs +++ b/src/common/src/row/owned_row.rs @@ -14,8 +14,9 @@ use std::mem; +use risingwave_common_estimate_size::EstimateSize; + use super::Row; -use crate::estimate_size::EstimateSize; use crate::types::{ DataType, Date, Datum, DatumRef, Decimal, Interval, ScalarImpl, Time, Timestamp, ToDatumRef, }; diff --git a/src/common/src/types/datetime.rs b/src/common/src/types/datetime.rs index c609017d06e3f..9199dd41b6e5b 100644 --- a/src/common/src/types/datetime.rs +++ b/src/common/src/types/datetime.rs @@ -23,13 +23,13 @@ use std::str::FromStr; use bytes::{Bytes, BytesMut}; use chrono::{Datelike, Days, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday}; use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type}; +use risingwave_common_estimate_size::ZeroHeapSize; use thiserror::Error; use super::to_binary::ToBinary; use super::to_text::ToText; use super::{CheckedAdd, DataType, Interval}; use crate::array::{ArrayError, ArrayResult}; -use crate::estimate_size::ZeroHeapSize; /// The same as `NaiveDate::from_ymd(1970, 1, 1).num_days_from_ce()`. /// Minus this magic number to store the number of days since 1970-01-01. diff --git a/src/common/src/types/decimal.rs b/src/common/src/types/decimal.rs index 5a72fb95f36e8..b6031a1665540 100644 --- a/src/common/src/types/decimal.rs +++ b/src/common/src/types/decimal.rs @@ -21,6 +21,7 @@ use num_traits::{ CheckedAdd, CheckedDiv, CheckedMul, CheckedNeg, CheckedRem, CheckedSub, Num, One, Zero, }; use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type}; +use risingwave_common_estimate_size::ZeroHeapSize; use rust_decimal::prelude::FromStr; use rust_decimal::{Decimal as RustDecimal, Error, MathematicalOps as _, RoundingStrategy}; @@ -28,7 +29,6 @@ use super::to_binary::ToBinary; use super::to_text::ToText; use super::DataType; use crate::array::ArrayResult; -use crate::estimate_size::ZeroHeapSize; use crate::types::ordered_float::OrderedFloat; use crate::types::Decimal::Normalized; @@ -795,9 +795,9 @@ impl From for Decimal { #[cfg(test)] mod tests { use itertools::Itertools as _; + use risingwave_common_estimate_size::EstimateSize; use super::*; - use crate::estimate_size::EstimateSize; use crate::util::iter_util::ZipEqFast; fn check(lhs: f32, rhs: f32) -> bool { diff --git a/src/common/src/types/interval.rs b/src/common/src/types/interval.rs index 0160ff7955591..4436181059d78 100644 --- a/src/common/src/types/interval.rs +++ b/src/common/src/types/interval.rs @@ -26,12 +26,12 @@ use chrono::Timelike; use num_traits::{CheckedAdd, CheckedNeg, CheckedSub, Zero}; use postgres_types::{to_sql_checked, FromSql}; use regex::Regex; +use risingwave_common_estimate_size::ZeroHeapSize; use risingwave_pb::data::PbInterval; use rust_decimal::prelude::Decimal; use super::to_binary::ToBinary; use super::*; -use crate::estimate_size::ZeroHeapSize; /// Every interval can be represented by a `Interval`. /// diff --git a/src/common/src/types/jsonb.rs b/src/common/src/types/jsonb.rs index 025f95b2173b4..6b48893ac9ede 100644 --- a/src/common/src/types/jsonb.rs +++ b/src/common/src/types/jsonb.rs @@ -17,8 +17,8 @@ use std::hash::Hash; use bytes::Buf; use jsonbb::{Value, ValueRef}; +use risingwave_common_estimate_size::EstimateSize; -use crate::estimate_size::EstimateSize; use crate::types::{Scalar, ScalarRef}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] diff --git a/src/common/src/types/num256.rs b/src/common/src/types/num256.rs index 2527e5c8be7cb..43c97fd8b155a 100644 --- a/src/common/src/types/num256.rs +++ b/src/common/src/types/num256.rs @@ -25,12 +25,12 @@ use ethnum::{i256, u256, AsI256}; use num_traits::{ CheckedAdd, CheckedDiv, CheckedMul, CheckedNeg, CheckedRem, CheckedSub, Num, One, Zero, }; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::data::ArrayType; use serde::{Deserialize, Serialize}; use to_text::ToText; use crate::array::ArrayResult; -use crate::estimate_size::EstimateSize; use crate::types::to_binary::ToBinary; use crate::types::{to_text, Buf, DataType, Scalar, ScalarRef, F64}; diff --git a/src/common/src/types/ordered.rs b/src/common/src/types/ordered.rs index af4b91bea3ff5..77a0588e06672 100644 --- a/src/common/src/types/ordered.rs +++ b/src/common/src/types/ordered.rs @@ -14,11 +14,12 @@ //! `ScalarImpl` and `Datum` wrappers that implement `PartialOrd` and `Ord` with default order type. -use std::cmp::{Ord, Ordering, Reverse}; +use std::cmp::{Ord, Ordering}; use std::ops::Deref; +use risingwave_common_estimate_size::EstimateSize; + use crate::dispatch_scalar_ref_variants; -use crate::estimate_size::EstimateSize; use crate::types::{Datum, DatumRef, ScalarImpl, ScalarRefImpl}; use crate::util::sort_util::{cmp_datum, partial_cmp_datum, OrderType}; @@ -104,12 +105,6 @@ impl EstimateSize for DefaultOrdered { } } -impl EstimateSize for Reverse { - fn estimated_heap_size(&self) -> usize { - self.0.estimated_heap_size() - } -} - impl DefaultOrdered { pub fn new(inner: T) -> Self { Self(inner) diff --git a/src/common/src/types/ordered_float.rs b/src/common/src/types/ordered_float.rs index f08b417d9848c..9e18caece87f0 100644 --- a/src/common/src/types/ordered_float.rs +++ b/src/common/src/types/ordered_float.rs @@ -1012,13 +1012,13 @@ mod impl_into_ordered { } pub use impl_into_ordered::IntoOrdered; +use risingwave_common_estimate_size::ZeroHeapSize; use serde::Serialize; -use crate::estimate_size::ZeroHeapSize; - #[cfg(test)] mod tests { - use crate::estimate_size::EstimateSize; + use risingwave_common_estimate_size::EstimateSize; + use crate::types::ordered_float::OrderedFloat; use crate::types::IntoOrdered; diff --git a/src/common/src/types/sentinel.rs b/src/common/src/types/sentinel.rs index 22eb91d4d476b..18177d9cd279e 100644 --- a/src/common/src/types/sentinel.rs +++ b/src/common/src/types/sentinel.rs @@ -13,8 +13,7 @@ // limitations under the License. use enum_as_inner::EnumAsInner; - -use crate::estimate_size::EstimateSize; +use risingwave_common_estimate_size::EstimateSize; /// [`Sentinelled`] wraps type `T` to provide smallest (smaller than any normal `T` value) and largest /// (larger than ant normal `T` value) sentinel value for `T`. diff --git a/src/common/src/types/serial.rs b/src/common/src/types/serial.rs index 5c84c95fa0f7a..5f4ba237ee300 100644 --- a/src/common/src/types/serial.rs +++ b/src/common/src/types/serial.rs @@ -17,9 +17,9 @@ use std::hash::Hash; use bytes::BytesMut; use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type}; +use risingwave_common_estimate_size::ZeroHeapSize; use serde::{Serialize, Serializer}; -use crate::estimate_size::ZeroHeapSize; use crate::util::row_id::RowId; // Serial is an alias for i64 diff --git a/src/common/src/types/timestamptz.rs b/src/common/src/types/timestamptz.rs index f14d5d2edee6e..29369d36e9757 100644 --- a/src/common/src/types/timestamptz.rs +++ b/src/common/src/types/timestamptz.rs @@ -20,13 +20,13 @@ use bytes::{Bytes, BytesMut}; use chrono::{DateTime, Datelike, TimeZone, Utc}; use chrono_tz::Tz; use postgres_types::{accepts, to_sql_checked, IsNull, ToSql, Type}; +use risingwave_common_estimate_size::ZeroHeapSize; use serde::{Deserialize, Serialize}; use super::to_binary::ToBinary; use super::to_text::ToText; use super::DataType; use crate::array::ArrayResult; -use crate::estimate_size::ZeroHeapSize; /// Timestamp with timezone. #[derive( diff --git a/src/common/src/util/memcmp_encoding.rs b/src/common/src/util/memcmp_encoding.rs index ffcfe5d42a18b..5709d42d226f8 100644 --- a/src/common/src/util/memcmp_encoding.rs +++ b/src/common/src/util/memcmp_encoding.rs @@ -16,11 +16,11 @@ use std::ops::Deref; use bytes::{Buf, BufMut}; use itertools::Itertools; +use risingwave_common_estimate_size::EstimateSize; use serde::{Deserialize, Serialize}; use super::iter_util::{ZipEqDebug, ZipEqFast}; use crate::array::{ArrayImpl, DataChunk}; -use crate::estimate_size::EstimateSize; use crate::row::{OwnedRow, Row}; use crate::types::{ DataType, Date, Datum, Int256, ScalarImpl, Serial, Time, Timestamp, Timestamptz, ToDatumRef, diff --git a/src/common/src/util/sort_util.rs b/src/common/src/util/sort_util.rs index 6758c36b82238..015222136d382 100644 --- a/src/common/src/util/sort_util.rs +++ b/src/common/src/util/sort_util.rs @@ -17,13 +17,13 @@ use std::fmt; use std::sync::Arc; use parse_display::Display; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType}; use super::iter_util::ZipEqDebug; use crate::array::{Array, DataChunk}; use crate::catalog::{FieldDisplay, Schema}; use crate::dispatch_array_variants; -use crate::estimate_size::EstimateSize; use crate::row::Row; use crate::types::{DefaultOrdered, ToDatumRef}; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index dc5d71807260d..8faea40d5be25 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -106,20 +106,17 @@ rdkafka = { workspace = true, features = [ "gssapi", "zstd", ] } -redis = { version = "0.25", features = [ - "aio", - "tokio-comp", - "async-std-comp", -] } +redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp"] } regex = "1.4" reqwest = { version = "0.11", features = ["json"] } risingwave_common = { workspace = true } +risingwave_common_estimate_size = { workspace = true } risingwave_jni_core = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } rumqttc = { version = "0.22.0", features = ["url"] } rust_decimal = "1" -rustls-native-certs = "0.6" +rustls-native-certs = "0.6" rustls-pemfile = "1" rw_futures_util = { workspace = true } serde = { version = "1", features = ["derive", "rc"] } diff --git a/src/connector/src/source/nexmark/source/reader.rs b/src/connector/src/source/nexmark/source/reader.rs index dbed90d99ae26..7b402cbc13680 100644 --- a/src/connector/src/source/nexmark/source/reader.rs +++ b/src/connector/src/source/nexmark/source/reader.rs @@ -22,9 +22,9 @@ use nexmark::event::EventType; use nexmark::EventGenerator; use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_common_estimate_size::EstimateSize; use tokio::time::Instant; use crate::error::ConnectorResult; diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index 03ed84ac6a216..14eb26ee0d285 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -49,6 +49,7 @@ openssl = { version = "0.10", features = ["vendored"] } parse-display = "0.9" paste = "1" risingwave_common = { workspace = true } +risingwave_common_estimate_size = { workspace = true } risingwave_expr_macro = { path = "../macro" } risingwave_pb = { workspace = true } risingwave_udf = { workspace = true } diff --git a/src/expr/core/src/aggregate/mod.rs b/src/expr/core/src/aggregate/mod.rs index 54e0ba582054e..4e8666e8c9dee 100644 --- a/src/expr/core/src/aggregate/mod.rs +++ b/src/expr/core/src/aggregate/mod.rs @@ -18,8 +18,8 @@ use std::ops::Range; use downcast_rs::{impl_downcast, Downcast}; use itertools::Itertools; use risingwave_common::array::StreamChunk; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::types::{DataType, Datum}; +use risingwave_common_estimate_size::EstimateSize; use crate::sig::FuncBuilder; use crate::{ExprError, Result}; diff --git a/src/expr/core/src/window_function/state/aggregate.rs b/src/expr/core/src/window_function/state/aggregate.rs index 08689bff6da72..45d3729702932 100644 --- a/src/expr/core/src/window_function/state/aggregate.rs +++ b/src/expr/core/src/window_function/state/aggregate.rs @@ -16,10 +16,10 @@ use std::collections::BTreeSet; use futures_util::FutureExt; use risingwave_common::array::{DataChunk, Op, StreamChunk}; -use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::{bail, must_match}; +use risingwave_common_estimate_size::{EstimateSize, KvSize}; use smallvec::SmallVec; use super::buffer::{RangeWindow, RowsWindow, WindowBuffer, WindowImpl}; diff --git a/src/expr/core/src/window_function/state/mod.rs b/src/expr/core/src/window_function/state/mod.rs index 69162f33e5f5b..6a86667bfcafc 100644 --- a/src/expr/core/src/window_function/state/mod.rs +++ b/src/expr/core/src/window_function/state/mod.rs @@ -15,10 +15,10 @@ use std::collections::BTreeSet; use itertools::Itertools; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::OwnedRow; use risingwave_common::types::{Datum, DefaultOrdered}; use risingwave_common::util::memcmp_encoding::MemcmpEncoded; +use risingwave_common_estimate_size::EstimateSize; use smallvec::SmallVec; use super::{WindowFuncCall, WindowFuncKind}; diff --git a/src/expr/core/src/window_function/state/rank.rs b/src/expr/core/src/window_function/state/rank.rs index 3f1ad380e689a..238af18699315 100644 --- a/src/expr/core/src/window_function/state/rank.rs +++ b/src/expr/core/src/window_function/state/rank.rs @@ -14,10 +14,10 @@ use std::marker::PhantomData; -use risingwave_common::estimate_size::collections::EstimatedVecDeque; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::types::Datum; use risingwave_common::util::memcmp_encoding::MemcmpEncoded; +use risingwave_common_estimate_size::collections::EstimatedVecDeque; +use risingwave_common_estimate_size::EstimateSize; use smallvec::SmallVec; use self::private::RankFuncCount; diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index e7b765820dfdd..1e9266dba5b41 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -39,6 +39,7 @@ num-traits = "0.2" openssl = { version = "0.10", features = ["vendored"] } regex = "1" risingwave_common = { workspace = true } +risingwave_common_estimate_size = { workspace = true } risingwave_expr = { workspace = true } risingwave_pb = { workspace = true } rust_decimal = { version = "1", features = ["db-postgres", "maths"] } diff --git a/src/expr/impl/src/aggregate/approx_count_distinct/append_only.rs b/src/expr/impl/src/aggregate/approx_count_distinct/append_only.rs index 487ffd3d336c4..90b41cc8284bb 100644 --- a/src/expr/impl/src/aggregate/approx_count_distinct/append_only.rs +++ b/src/expr/impl/src/aggregate/approx_count_distinct/append_only.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_common::bail; -use risingwave_common::estimate_size::EstimateSize; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::Result; use super::Bucket; diff --git a/src/expr/impl/src/aggregate/approx_count_distinct/mod.rs b/src/expr/impl/src/aggregate/approx_count_distinct/mod.rs index e6c98c4c050d1..bfd1548963d56 100644 --- a/src/expr/impl/src/aggregate/approx_count_distinct/mod.rs +++ b/src/expr/impl/src/aggregate/approx_count_distinct/mod.rs @@ -19,9 +19,9 @@ use std::ops::Range; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bail; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::Row; use risingwave_common::types::*; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{AggCall, AggStateDyn, AggregateFunction, AggregateState}; use risingwave_expr::{build_aggregate, ExprError, Result}; diff --git a/src/expr/impl/src/aggregate/array_agg.rs b/src/expr/impl/src/aggregate/array_agg.rs index f68d99345affc..f656ea9b3e614 100644 --- a/src/expr/impl/src/aggregate/array_agg.rs +++ b/src/expr/impl/src/aggregate/array_agg.rs @@ -13,8 +13,8 @@ // limitations under the License. use risingwave_common::array::ArrayBuilderImpl; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::types::{Datum, ListValue, ScalarRefImpl}; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate; use risingwave_expr::aggregate::AggStateDyn; use risingwave_expr::expr::Context; diff --git a/src/expr/impl/src/aggregate/jsonb_agg.rs b/src/expr/impl/src/aggregate/jsonb_agg.rs index 2d12f6bcfc2aa..daa541f71881f 100644 --- a/src/expr/impl/src/aggregate/jsonb_agg.rs +++ b/src/expr/impl/src/aggregate/jsonb_agg.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::types::{Datum, JsonbVal}; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::AggStateDyn; use risingwave_expr::expr::Context; use risingwave_expr::{aggregate, ExprError, Result}; diff --git a/src/expr/impl/src/aggregate/mode.rs b/src/expr/impl/src/aggregate/mode.rs index b685255f15f0b..e5ebdf5a485ad 100644 --- a/src/expr/impl/src/aggregate/mode.rs +++ b/src/expr/impl/src/aggregate/mode.rs @@ -15,9 +15,9 @@ use std::ops::Range; use risingwave_common::array::*; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::Row; use risingwave_common::types::*; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{ AggCall, AggStateDyn, AggregateFunction, AggregateState, BoxedAggregateFunction, }; diff --git a/src/expr/impl/src/aggregate/percentile_cont.rs b/src/expr/impl/src/aggregate/percentile_cont.rs index 769636088fa3a..728495edb1122 100644 --- a/src/expr/impl/src/aggregate/percentile_cont.rs +++ b/src/expr/impl/src/aggregate/percentile_cont.rs @@ -15,9 +15,9 @@ use std::ops::Range; use risingwave_common::array::*; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::Row; use risingwave_common::types::*; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{AggCall, AggStateDyn, AggregateFunction, AggregateState}; use risingwave_expr::{build_aggregate, Result}; diff --git a/src/expr/impl/src/aggregate/percentile_disc.rs b/src/expr/impl/src/aggregate/percentile_disc.rs index 399853acf45f8..8b0114be1b8d5 100644 --- a/src/expr/impl/src/aggregate/percentile_disc.rs +++ b/src/expr/impl/src/aggregate/percentile_disc.rs @@ -15,9 +15,9 @@ use std::ops::Range; use risingwave_common::array::*; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::Row; use risingwave_common::types::*; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{ AggCall, AggStateDyn, AggregateFunction, AggregateState, BoxedAggregateFunction, }; diff --git a/src/expr/macro/src/gen.rs b/src/expr/macro/src/gen.rs index 1da6470cf38e7..684a0e2159751 100644 --- a/src/expr/macro/src/gen.rs +++ b/src/expr/macro/src/gen.rs @@ -878,7 +878,7 @@ impl FunctionAttr { use risingwave_common::types::*; use risingwave_common::bail; use risingwave_common::buffer::Bitmap; - use risingwave_common::estimate_size::EstimateSize; + use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::expr::Context; use risingwave_expr::Result; diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 331e7b098eadb..ffad192f999c1 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -43,6 +43,7 @@ prost = { workspace = true } rand = "0.8" risingwave_backup = { workspace = true } risingwave_common = { workspace = true } +risingwave_common_estimate_size = { workspace = true } risingwave_common_service = { workspace = true } risingwave_expr = { workspace = true } risingwave_hummock_sdk = { workspace = true } diff --git a/src/storage/hummock_sdk/Cargo.toml b/src/storage/hummock_sdk/Cargo.toml index 557aa4c8c8a54..09f4ddfafbeaa 100644 --- a/src/storage/hummock_sdk/Cargo.toml +++ b/src/storage/hummock_sdk/Cargo.toml @@ -21,6 +21,7 @@ itertools = "0.12" parse-display = "0.9" prost = { workspace = true } risingwave_common = { workspace = true } +risingwave_common_estimate_size = { workspace = true } risingwave_pb = { workspace = true } tracing = "0.1" @@ -29,5 +30,3 @@ workspace-hack = { path = "../../workspace-hack" } [lints] workspace = true - - diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index f0f00be411585..4264dd161d3a1 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -22,8 +22,8 @@ use std::ptr; use bytes::{Buf, BufMut, Bytes, BytesMut}; use risingwave_common::catalog::TableId; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::VirtualNode; +use risingwave_common_estimate_size::EstimateSize; use crate::{EpochWithGap, HummockEpoch}; diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index ed496618f8a33..65fe36a21ffc6 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -22,8 +22,8 @@ use std::sync::Arc; use bytes::Bytes; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks; use risingwave_pb::hummock::{PbTableWatermarks, PbVnodeWatermark}; use tracing::{debug, warn}; diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 311e51d2b8e9d..48000ec702310 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -26,8 +26,8 @@ use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; -use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_common::hash::VnodeBitmapExt; +use risingwave_common_estimate_size::{EstimateSize, KvSize}; use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange}; use risingwave_hummock_sdk::table_watermark::WatermarkDirection; use thiserror::Error; diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 9d66de9191d7e..5348dedfcde61 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -49,6 +49,7 @@ prometheus = { version = "0.13", features = ["process"] } prost = { workspace = true } rand = "0.8" risingwave_common = { workspace = true } +risingwave_common_estimate_size = { workspace = true } risingwave_connector = { workspace = true } risingwave_dml = { workspace = true } risingwave_expr = { workspace = true } diff --git a/src/stream/src/cache/managed_lru.rs b/src/stream/src/cache/managed_lru.rs index 9773f3fb51bf0..e393fa53703ba 100644 --- a/src/stream/src/cache/managed_lru.rs +++ b/src/stream/src/cache/managed_lru.rs @@ -21,9 +21,9 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use lru::{DefaultHasher, LruCache}; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_common::util::epoch::Epoch; +use risingwave_common_estimate_size::EstimateSize; use crate::common::metrics::MetricsInfo; diff --git a/src/stream/src/common/cache/state_cache/mod.rs b/src/stream/src/common/cache/state_cache/mod.rs index 25876e60ffc0c..ae3b1ad3069e4 100644 --- a/src/stream/src/common/cache/state_cache/mod.rs +++ b/src/stream/src/common/cache/state_cache/mod.rs @@ -14,7 +14,7 @@ pub use ordered::*; use risingwave_common::array::Op; -use risingwave_common::estimate_size::EstimateSize; +use risingwave_common_estimate_size::EstimateSize; pub use top_n::*; mod ordered; diff --git a/src/stream/src/common/cache/state_cache/ordered.rs b/src/stream/src/common/cache/state_cache/ordered.rs index 3e0d53c94fb10..535dbcb74e9a0 100644 --- a/src/stream/src/common/cache/state_cache/ordered.rs +++ b/src/stream/src/common/cache/state_cache/ordered.rs @@ -15,7 +15,7 @@ use std::collections::BTreeMap; use risingwave_common::array::Op; -use risingwave_common::estimate_size::{EstimateSize, KvSize}; +use risingwave_common_estimate_size::{EstimateSize, KvSize}; use super::{StateCache, StateCacheFiller}; diff --git a/src/stream/src/common/cache/state_cache/top_n.rs b/src/stream/src/common/cache/state_cache/top_n.rs index 373a3018f6e36..4d2423f6c1e10 100644 --- a/src/stream/src/common/cache/state_cache/top_n.rs +++ b/src/stream/src/common/cache/state_cache/top_n.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_common::array::Op; -use risingwave_common::estimate_size::EstimateSize; +use risingwave_common_estimate_size::EstimateSize; use super::{StateCache, StateCacheFiller}; use crate::common::cache::TopNCache; diff --git a/src/stream/src/common/cache/top_n_cache.rs b/src/stream/src/common/cache/top_n_cache.rs index 854afd2fe8a31..e6e65bacea5fe 100644 --- a/src/stream/src/common/cache/top_n_cache.rs +++ b/src/stream/src/common/cache/top_n_cache.rs @@ -14,7 +14,7 @@ use std::collections::BTreeMap; -use risingwave_common::estimate_size::{EstimateSize, KvSize}; +use risingwave_common_estimate_size::{EstimateSize, KvSize}; /// Inner top-N cache structure for [`super::TopNStateCache`]. #[derive(Clone)] diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 731d8e42126df..96702cb5bf7e3 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -19,9 +19,9 @@ use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common::util::epoch::EpochPair; +use risingwave_common_estimate_size::EstimateSize; use risingwave_connector::sink::log_store::{LogStoreResult, LogWriter}; use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; use risingwave_storage::store::{InitOptions, LocalStateStore, SealCurrentEpochOptions}; diff --git a/src/stream/src/common/table/state_table_cache.rs b/src/stream/src/common/table/state_table_cache.rs index 8c8014548a1e1..585b51eae0606 100644 --- a/src/stream/src/common/table/state_table_cache.rs +++ b/src/stream/src/common/table/state_table_cache.rs @@ -13,9 +13,9 @@ // limitations under the License. use risingwave_common::array::Op; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{DefaultOrdered, ScalarRefImpl}; +use risingwave_common_estimate_size::EstimateSize; use crate::common::cache::{StateCache, TopNStateCache}; diff --git a/src/stream/src/executor/aggregation/agg_group.rs b/src/stream/src/executor/aggregation/agg_group.rs index 81756ac373ec0..480c1d7d50663 100644 --- a/src/stream/src/executor/aggregation/agg_group.rs +++ b/src/stream/src/executor/aggregation/agg_group.rs @@ -20,10 +20,10 @@ use risingwave_common::array::stream_record::{Record, RecordType}; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::must_match; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{AggCall, BoxedAggregateFunction}; use risingwave_pb::stream_plan::PbAggNodeVersion; use risingwave_storage::StateStore; diff --git a/src/stream/src/executor/aggregation/agg_state.rs b/src/stream/src/executor/aggregation/agg_state.rs index fe9aef794242b..bfcea1cf048be 100644 --- a/src/stream/src/executor/aggregation/agg_state.rs +++ b/src/stream/src/executor/aggregation/agg_state.rs @@ -15,10 +15,10 @@ use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::must_match; use risingwave_common::types::Datum; use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction}; use risingwave_pb::stream_plan::PbAggNodeVersion; use risingwave_storage::StateStore; diff --git a/src/stream/src/executor/aggregation/agg_state_cache.rs b/src/stream/src/executor/aggregation/agg_state_cache.rs index 05eeab22147c7..71cd9a45e3f1b 100644 --- a/src/stream/src/executor/aggregation/agg_state_cache.rs +++ b/src/stream/src/executor/aggregation/agg_state_cache.rs @@ -15,12 +15,12 @@ //! Object-safe version of [`StateCache`] for aggregation. use risingwave_common::array::StreamChunk; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::Row; use risingwave_common::types::{DataType, Datum, ToOwnedDatum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::memcmp_encoding::MemcmpEncoded; use risingwave_common::util::row_serde::OrderedRowSerde; +use risingwave_common_estimate_size::EstimateSize; use smallvec::SmallVec; use crate::common::cache::{StateCache, StateCacheFiller}; diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index a7eff24bcb3a1..55a3da1db493a 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -19,11 +19,11 @@ use futures_async_stream::for_await; use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::{OwnedRow, RowExt}; use risingwave_common::types::Datum; use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{AggCall, AggKind, BoxedAggregateFunction}; use risingwave_pb::stream_plan::PbAggNodeVersion; use risingwave_storage::store::PrefetchOptions; diff --git a/src/stream/src/executor/dedup/cache.rs b/src/stream/src/executor/dedup/cache.rs index fd9edf734e99a..5a9d876c356e3 100644 --- a/src/stream/src/executor/dedup/cache.rs +++ b/src/stream/src/executor/dedup/cache.rs @@ -14,7 +14,7 @@ use std::hash::Hash; -use risingwave_common::estimate_size::EstimateSize; +use risingwave_common_estimate_size::EstimateSize; use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index 261100f2de29c..685b2e65e0831 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -22,12 +22,12 @@ use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::Schema; -use risingwave_common::estimate_size::collections::EstimatedHashMap; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::{HashKey, PrecomputedBuildHasher}; use risingwave_common::types::ScalarImpl; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_common_estimate_size::collections::EstimatedHashMap; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{build_retractable, AggCall, BoxedAggregateFunction}; use risingwave_pb::stream_plan::PbAggNodeVersion; use risingwave_storage::StateStore; diff --git a/src/stream/src/executor/join/hash_join.rs b/src/stream/src/executor/join/hash_join.rs index 123bd6e42e45e..aed43a7e63711 100644 --- a/src/stream/src/executor/join/hash_join.rs +++ b/src/stream/src/executor/join/hash_join.rs @@ -22,7 +22,6 @@ use futures::StreamExt; use futures_async_stream::for_await; use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use risingwave_common::buffer::Bitmap; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::{HashKey, PrecomputedBuildHasher}; use risingwave_common::metrics::LabelGuardedIntCounter; use risingwave_common::row::{OwnedRow, Row, RowExt}; @@ -30,6 +29,7 @@ use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::OrderType; +use risingwave_common_estimate_size::EstimateSize; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; @@ -627,7 +627,7 @@ impl JoinHashMap { } } -use risingwave_common::estimate_size::KvSize; +use risingwave_common_estimate_size::KvSize; use thiserror::Error; use super::*; diff --git a/src/stream/src/executor/join/row.rs b/src/stream/src/executor/join/row.rs index 9ab133fc314ba..6dfafc2b91df1 100644 --- a/src/stream/src/executor/join/row.rs +++ b/src/stream/src/executor/join/row.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::{self, CompactedRow, OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_common_estimate_size::EstimateSize; use crate::executor::StreamExecutorResult; diff --git a/src/stream/src/executor/lookup/cache.rs b/src/stream/src/executor/lookup/cache.rs index e9580bec4e2aa..aa3507c87e738 100644 --- a/src/stream/src/executor/lookup/cache.rs +++ b/src/stream/src/executor/lookup/cache.rs @@ -15,8 +15,9 @@ use std::collections::HashSet; use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::estimate_size::{EstimateSize, KvSize, VecWithKvSize}; use risingwave_common::row::{OwnedRow, Row, RowExt}; +use risingwave_common_estimate_size::collections::VecWithKvSize; +use risingwave_common_estimate_size::{EstimateSize, KvSize}; use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; diff --git a/src/stream/src/executor/lookup/impl_.rs b/src/stream/src/executor/lookup/impl_.rs index db2511e19e7bd..14347e9cf1e13 100644 --- a/src/stream/src/executor/lookup/impl_.rs +++ b/src/stream/src/executor/lookup/impl_.rs @@ -17,11 +17,11 @@ use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::RowRef; use risingwave_common::catalog::{ColumnDesc, Schema}; -use risingwave_common::estimate_size::VecWithKvSize; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_common_estimate_size::collections::VecWithKvSize; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; diff --git a/src/stream/src/executor/managed_state/join/join_entry_state.rs b/src/stream/src/executor/managed_state/join/join_entry_state.rs index 80221247b8a81..b0e225ebe8ee3 100644 --- a/src/stream/src/executor/managed_state/join/join_entry_state.rs +++ b/src/stream/src/executor/managed_state/join/join_entry_state.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::estimate_size::KvSize; +use risingwave_common_estimate_size::KvSize; use thiserror::Error; use super::*; diff --git a/src/stream/src/executor/managed_state/join/mod.rs b/src/stream/src/executor/managed_state/join/mod.rs index a1ca7bcdd7891..dced3899c404b 100644 --- a/src/stream/src/executor/managed_state/join/mod.rs +++ b/src/stream/src/executor/managed_state/join/mod.rs @@ -26,7 +26,6 @@ use futures_async_stream::for_await; pub(super) use join_entry_state::JoinEntryState; use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use risingwave_common::buffer::Bitmap; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::{HashKey, PrecomputedBuildHasher}; use risingwave_common::metrics::LabelGuardedIntCounter; use risingwave_common::row; @@ -35,6 +34,7 @@ use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::OrderType; +use risingwave_common_estimate_size::EstimateSize; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index 32f1fafe9b8db..16fe77cb64ebc 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -21,14 +21,14 @@ use itertools::Itertools; use risingwave_common::array::stream_record::Record; use risingwave_common::array::{ArrayRef, Op, StreamChunk}; use risingwave_common::catalog::Schema; -use risingwave_common::estimate_size::collections::EstimatedVecDeque; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{ToDatumRef, ToOwnedDatum}; use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast}; use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded}; use risingwave_common::util::sort_util::OrderType; use risingwave_common::{must_match, row}; +use risingwave_common_estimate_size::collections::EstimatedVecDeque; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::window_function::{ create_window_state, StateEvictHint, StateKey, WindowFuncCall, WindowStates, }; diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index a759e9e7334e3..db431a17dafe4 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -23,10 +23,10 @@ use delta_btree_map::{Change, DeltaBTreeMap}; use educe::Educe; use futures_async_stream::for_await; use risingwave_common::array::stream_record::Record; -use risingwave_common::estimate_size::collections::EstimatedBTreeMap; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy; use risingwave_common::types::{Datum, Sentinelled}; +use risingwave_common_estimate_size::collections::EstimatedBTreeMap; use risingwave_expr::window_function::{ RangeFrameBounds, RowsFrameBounds, StateKey, WindowFuncCall, }; diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index b4147ec5b1cbd..1d374189cb5e0 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -26,11 +26,11 @@ use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use lru::DefaultHasher; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::BitmapBuilder; -use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_common::hash::{HashKey, NullBitmap}; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqDebug; +use risingwave_common_estimate_size::{EstimateSize, KvSize}; use risingwave_expr::expr::NonStrictExpression; use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch}; use risingwave_storage::store::PrefetchOptions; diff --git a/src/stream/src/executor/top_n/top_n_cache.rs b/src/stream/src/executor/top_n/top_n_cache.rs index 8d9893cc1e978..333173053a861 100644 --- a/src/stream/src/executor/top_n/top_n_cache.rs +++ b/src/stream/src/executor/top_n/top_n_cache.rs @@ -18,9 +18,9 @@ use std::future::Future; use itertools::Itertools; use risingwave_common::array::{Op, RowRef}; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::{CompactedRow, Row, RowDeserializer, RowExt}; use risingwave_common::types::DataType; +use risingwave_common_estimate_size::EstimateSize; use risingwave_storage::StateStore; use super::topn_cache_state::TopNCacheState; diff --git a/src/stream/src/executor/top_n/topn_cache_state.rs b/src/stream/src/executor/top_n/topn_cache_state.rs index 34f3719db44be..fa28369f2b0d6 100644 --- a/src/stream/src/executor/top_n/topn_cache_state.rs +++ b/src/stream/src/executor/top_n/topn_cache_state.rs @@ -17,8 +17,8 @@ use std::alloc::Global; use std::collections::btree_map::{BTreeMap, ExtractIf, OccupiedEntry, Range}; use std::ops::RangeBounds; -use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_common::row::CompactedRow; +use risingwave_common_estimate_size::{EstimateSize, KvSize}; /// `CacheKey` is composed of `(order_by, remaining columns of pk)`. pub type CacheKey = (Vec, Vec);