From a898dcc8f42001b5a4f4dcd965cd0bde8e67b45a Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Mon, 23 Sep 2024 19:19:36 +0800 Subject: [PATCH] feat(frontend): generate query epoch by committed epoch of involved tables (#18592) --- .../tests/testdata/input/share.yaml | 1 + .../tests/testdata/output/share.yaml | 1 + src/frontend/src/handler/declare_cursor.rs | 17 ++- src/frontend/src/handler/query.rs | 49 ++++++-- src/frontend/src/optimizer/mod.rs | 3 +- .../src/optimizer/plan_visitor/mod.rs | 2 + .../read_storage_table_visitor.rs | 57 +++++++++ .../src/scheduler/distributed/query.rs | 38 +++--- .../scheduler/distributed/query_manager.rs | 6 +- src/frontend/src/scheduler/local.rs | 31 +++-- src/frontend/src/scheduler/snapshot.rs | 108 +++++++++++++----- src/frontend/src/session.rs | 9 +- src/frontend/src/session/cursor_manager.rs | 25 ++-- src/frontend/src/session/transaction.rs | 4 +- src/frontend/src/test_utils.rs | 10 +- .../hummock_sdk/src/frontend_version.rs | 12 +- src/storage/hummock_sdk/src/version.rs | 11 +- 17 files changed, 270 insertions(+), 114 deletions(-) create mode 100644 src/frontend/src/optimizer/plan_visitor/read_storage_table_visitor.rs diff --git a/src/frontend/planner_test/tests/testdata/input/share.yaml b/src/frontend/planner_test/tests/testdata/input/share.yaml index efadb03e2d43..91217f52d385 100644 --- a/src/frontend/planner_test/tests/testdata/input/share.yaml +++ b/src/frontend/planner_test/tests/testdata/input/share.yaml @@ -14,6 +14,7 @@ nexmark.split.num = '4', nexmark.min.event.gap.in.ns = '1000' ); + create table table_for_fixed_now_timestamp; expected_outputs: [] - id: self_join before: diff --git a/src/frontend/planner_test/tests/testdata/output/share.yaml b/src/frontend/planner_test/tests/testdata/output/share.yaml index 2cf3aee9fe04..b9a7fe05b668 100644 --- a/src/frontend/planner_test/tests/testdata/output/share.yaml +++ b/src/frontend/planner_test/tests/testdata/output/share.yaml @@ -15,6 +15,7 @@ nexmark.split.num = '4', nexmark.min.event.gap.in.ns = '1000' ); + create table table_for_fixed_now_timestamp; - id: self_join before: - create_sources diff --git a/src/frontend/src/handler/declare_cursor.rs b/src/frontend/src/handler/declare_cursor.rs index 3a1f33c8eab7..8c521be2adac 100644 --- a/src/frontend/src/handler/declare_cursor.rs +++ b/src/frontend/src/handler/declare_cursor.rs @@ -157,6 +157,7 @@ pub async fn create_chunk_stream_for_cursor( plan_fragmenter, query_mode, schema, + read_storage_tables, .. } = plan_fragmenter_result; @@ -169,10 +170,22 @@ pub async fn create_chunk_stream_for_cursor( match query_mode { QueryMode::Auto => unreachable!(), QueryMode::Local => CursorDataChunkStream::LocalDataChunk(Some( - local_execute(session.clone(), query, can_timeout_cancel).await?, + local_execute( + session.clone(), + query, + can_timeout_cancel, + &read_storage_tables, + ) + .await?, )), QueryMode::Distributed => CursorDataChunkStream::DistributedDataChunk(Some( - distribute_execute(session.clone(), query, can_timeout_cancel).await?, + distribute_execute( + session.clone(), + query, + can_timeout_cancel, + read_storage_tables, + ) + .await?, )), }, schema.fields.clone(), diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 480bb1c7f656..a8201d3c40cc 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -38,8 +38,8 @@ use crate::handler::util::{to_pg_field, DataChunkToRowSetAdapter}; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::Explain; use crate::optimizer::{ - ExecutionModeDecider, OptimizerContext, OptimizerContextRef, RelationCollectorVisitor, - SysTableVisitor, + ExecutionModeDecider, OptimizerContext, OptimizerContextRef, ReadStorageTableVisitor, + RelationCollectorVisitor, SysTableVisitor, }; use crate::planner::Planner; use crate::scheduler::plan_fragmenter::Query; @@ -206,6 +206,7 @@ pub struct BatchQueryPlanResult { // subset of the final one. i.e. the final one may contain more implicit dependencies on // indices. pub(crate) dependent_relations: Vec, + pub(crate) read_storage_tables: HashSet, } fn gen_batch_query_plan( @@ -230,6 +231,8 @@ fn gen_batch_query_plan( let dependent_relations = RelationCollectorVisitor::collect_with(dependent_relations, batch_plan.clone()); + let read_storage_tables = ReadStorageTableVisitor::collect(batch_plan.clone()); + let must_local = must_run_in_local_mode(batch_plan.clone()); let query_mode = match (must_dist, must_local) { @@ -260,6 +263,7 @@ fn gen_batch_query_plan( schema, stmt_type, dependent_relations: dependent_relations.into_iter().collect_vec(), + read_storage_tables, }) } @@ -311,7 +315,7 @@ pub struct BatchPlanFragmenterResult { pub(crate) query_mode: QueryMode, pub(crate) schema: Schema, pub(crate) stmt_type: StatementType, - pub(crate) _dependent_relations: Vec, + pub(crate) read_storage_tables: HashSet, } pub fn gen_batch_plan_fragmenter( @@ -323,7 +327,8 @@ pub fn gen_batch_plan_fragmenter( query_mode, schema, stmt_type, - dependent_relations, + read_storage_tables, + .. } = plan_result; tracing::trace!( @@ -347,7 +352,7 @@ pub fn gen_batch_plan_fragmenter( query_mode, schema, stmt_type, - _dependent_relations: dependent_relations, + read_storage_tables, }) } @@ -361,7 +366,7 @@ pub async fn create_stream( query_mode, schema, stmt_type, - .. + read_storage_tables, } = plan_fragmenter_result; let mut can_timeout_cancel = true; @@ -392,7 +397,13 @@ pub async fn create_stream( let row_stream = match query_mode { QueryMode::Auto => unreachable!(), QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new( - local_execute(session.clone(), query, can_timeout_cancel).await?, + local_execute( + session.clone(), + query, + can_timeout_cancel, + &read_storage_tables, + ) + .await?, column_types, formats, session.clone(), @@ -400,7 +411,13 @@ pub async fn create_stream( // Local mode do not support cancel tasks. QueryMode::Distributed => { PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new( - distribute_execute(session.clone(), query, can_timeout_cancel).await?, + distribute_execute( + session.clone(), + query, + can_timeout_cancel, + read_storage_tables, + ) + .await?, column_types, formats, session.clone(), @@ -480,6 +497,7 @@ pub async fn distribute_execute( session: Arc, query: Query, can_timeout_cancel: bool, + read_storage_tables: HashSet, ) -> Result { let timeout = if cfg!(madsim) { None @@ -493,7 +511,7 @@ pub async fn distribute_execute( let query_manager = session.env().query_manager().clone(); query_manager - .schedule(execution_context, query) + .schedule(execution_context, query, read_storage_tables) .await .map_err(|err| err.into()) } @@ -502,6 +520,7 @@ pub async fn local_execute( session: Arc, query: Query, can_timeout_cancel: bool, + read_storage_tables: &HashSet, ) -> Result { let timeout = if cfg!(madsim) { None @@ -512,12 +531,18 @@ pub async fn local_execute( }; let front_env = session.env(); - // TODO: if there's no table scan, we don't need to acquire snapshot. let snapshot = session.pinned_snapshot(); // TODO: Passing sql here - let execution = - LocalQueryExecution::new(query, front_env.clone(), "", snapshot, session, timeout); + let execution = LocalQueryExecution::new( + query, + front_env.clone(), + "", + snapshot.support_barrier_read(), + snapshot.batch_query_epoch(read_storage_tables)?, + session, + timeout, + ); Ok(execution.stream_rows()) } diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index af9d589fe3df..00c9dd0cf01b 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -29,7 +29,8 @@ pub use plan_rewriter::PlanRewriter; mod plan_visitor; pub use plan_visitor::{ - ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, SysTableVisitor, + ExecutionModeDecider, PlanVisitor, ReadStorageTableVisitor, RelationCollectorVisitor, + SysTableVisitor, }; use risingwave_sqlparser::ast::OnConflict; diff --git a/src/frontend/src/optimizer/plan_visitor/mod.rs b/src/frontend/src/optimizer/plan_visitor/mod.rs index 63a0484cfdfd..632d0bcf8bd9 100644 --- a/src/frontend/src/optimizer/plan_visitor/mod.rs +++ b/src/frontend/src/optimizer/plan_visitor/mod.rs @@ -40,7 +40,9 @@ pub use cardinality_visitor::*; mod jsonb_stream_key_checker; pub use jsonb_stream_key_checker::*; mod distributed_dml_visitor; +mod read_storage_table_visitor; pub use distributed_dml_visitor::*; +pub use read_storage_table_visitor::*; use crate::for_all_plan_nodes; use crate::optimizer::plan_node::*; diff --git a/src/frontend/src/optimizer/plan_visitor/read_storage_table_visitor.rs b/src/frontend/src/optimizer/plan_visitor/read_storage_table_visitor.rs new file mode 100644 index 000000000000..811958930520 --- /dev/null +++ b/src/frontend/src/optimizer/plan_visitor/read_storage_table_visitor.rs @@ -0,0 +1,57 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use risingwave_common::catalog::TableId; + +use super::{DefaultBehavior, DefaultValue}; +use crate::optimizer::plan_node::{BatchLogSeqScan, BatchLookupJoin}; +use crate::optimizer::plan_visitor::PlanVisitor; +use crate::PlanRef; + +#[derive(Debug, Clone, Default)] +pub struct ReadStorageTableVisitor { + tables: HashSet, +} + +impl ReadStorageTableVisitor { + pub fn collect(plan: PlanRef) -> HashSet { + let mut visitor = Self::default(); + visitor.visit(plan); + visitor.tables + } +} + +impl PlanVisitor for ReadStorageTableVisitor { + type Result = (); + + type DefaultBehavior = impl DefaultBehavior; + + fn default_behavior() -> Self::DefaultBehavior { + DefaultValue + } + + fn visit_batch_seq_scan(&mut self, plan: &crate::optimizer::plan_node::BatchSeqScan) { + self.tables.insert(plan.core().table_desc.table_id); + } + + fn visit_batch_log_seq_scan(&mut self, plan: &BatchLogSeqScan) -> Self::Result { + self.tables.insert(plan.core().table_desc.table_id); + } + + fn visit_batch_lookup_join(&mut self, plan: &BatchLookupJoin) -> Self::Result { + self.tables.insert(plan.right_table_desc().table_id); + } +} diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index b991d86eca2b..a285039aed0f 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -25,7 +25,7 @@ use pgwire::pg_server::SessionId; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::array::DataChunk; use risingwave_pb::batch_plan::{TaskId as PbTaskId, TaskOutputId as PbTaskOutputId}; -use risingwave_pb::common::HostAddress; +use risingwave_pb::common::{BatchQueryEpoch, HostAddress}; use risingwave_rpc_client::ComputeClientPoolRef; use thiserror_ext::AsReport; use tokio::sync::mpsc::{channel, Receiver, Sender}; @@ -40,7 +40,7 @@ use crate::scheduler::distributed::stage::StageEvent::ScheduledRoot; use crate::scheduler::distributed::StageEvent::Scheduled; use crate::scheduler::distributed::StageExecution; use crate::scheduler::plan_fragmenter::{Query, StageId, ROOT_TASK_ID, ROOT_TASK_OUTPUT_ID}; -use crate::scheduler::{ExecutionContextRef, ReadSnapshot, SchedulerError, SchedulerResult}; +use crate::scheduler::{ExecutionContextRef, SchedulerError, SchedulerResult}; /// Message sent to a `QueryRunner` to control its execution. #[derive(Debug)] @@ -124,7 +124,7 @@ impl QueryExecution { self: Arc, context: ExecutionContextRef, worker_node_manager: WorkerNodeSelector, - pinned_snapshot: ReadSnapshot, + batch_query_epoch: BatchQueryEpoch, compute_client_pool: ComputeClientPoolRef, catalog_reader: CatalogReader, query_execution_info: QueryExecutionInfoRef, @@ -137,7 +137,7 @@ impl QueryExecution { // reference of `pinned_snapshot`. Its ownership will be moved into `QueryRunner` so that it // can control when to release the snapshot. let stage_executions = self.gen_stage_executions( - &pinned_snapshot, + batch_query_epoch, context.clone(), worker_node_manager, compute_client_pool.clone(), @@ -182,13 +182,13 @@ impl QueryExecution { let span = tracing::info_span!( "distributed_execute", query_id = self.query.query_id.id, - epoch = ?pinned_snapshot.batch_query_epoch(), + epoch = ?batch_query_epoch, ); tracing::trace!("Starting query: {:?}", self.query.query_id); // Not trace the error here, it will be processed in scheduler. - tokio::spawn(async move { runner.run(pinned_snapshot).instrument(span).await }); + tokio::spawn(async move { runner.run().instrument(span).await }); let root_stage = root_stage_receiver .await @@ -225,7 +225,7 @@ impl QueryExecution { fn gen_stage_executions( &self, - pinned_snapshot: &ReadSnapshot, + epoch: BatchQueryEpoch, context: ExecutionContextRef, worker_node_manager: WorkerNodeSelector, compute_client_pool: ComputeClientPoolRef, @@ -244,7 +244,7 @@ impl QueryExecution { .collect::>>(); let stage_exec = Arc::new(StageExecution::new( - pinned_snapshot.batch_query_epoch(), + epoch, self.query.stage_graph.stages[&stage_id].clone(), worker_node_manager.clone(), self.shutdown_tx.clone(), @@ -296,7 +296,7 @@ impl Debug for QueryRunner { } impl QueryRunner { - async fn run(mut self, pinned_snapshot: ReadSnapshot) { + async fn run(mut self) { self.query_metrics.running_query_num.inc(); // Start leaf stages. let leaf_stages = self.query.leaf_stages(); @@ -310,8 +310,6 @@ impl QueryRunner { } let mut stages_with_table_scan = self.query.stages_with_table_scan(); let has_lookup_join_stage = self.query.has_lookup_join_stage(); - // To convince the compiler that `pinned_snapshot` will only be dropped once. - let mut pinned_snapshot_to_drop = Some(pinned_snapshot); let mut finished_stage_cnt = 0usize; while let Some(msg_inner) = self.msg_receiver.recv().await { @@ -331,7 +329,6 @@ impl QueryRunner { // thus they all successfully pinned a HummockVersion. // So we can now unpin their epoch. tracing::trace!("Query {:?} has scheduled all of its stages that have table scan (iterator creation).", self.query.query_id); - pinned_snapshot_to_drop.take(); } // For root stage, we execute in frontend local. We will pass the root fragment @@ -465,7 +462,7 @@ impl QueryRunner { #[cfg(test)] pub(crate) mod tests { - use std::collections::HashMap; + use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; use fixedbitset::FixedBitSet; @@ -495,11 +492,9 @@ pub(crate) mod tests { use crate::scheduler::distributed::QueryExecution; use crate::scheduler::plan_fragmenter::{BatchPlanFragmenter, Query}; use crate::scheduler::{ - DistributedQueryMetrics, ExecutionContext, HummockSnapshotManager, QueryExecutionInfo, - ReadSnapshot, + DistributedQueryMetrics, ExecutionContext, QueryExecutionInfo, ReadSnapshot, }; use crate::session::SessionImpl; - use crate::test_utils::MockFrontendMetaClient; use crate::utils::Condition; use crate::TableCatalog; @@ -508,14 +503,10 @@ pub(crate) mod tests { let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![])); let worker_node_selector = WorkerNodeSelector::new(worker_node_manager.clone(), false); let compute_client_pool = Arc::new(ComputeClientPool::for_test()); - let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(Arc::new( - MockFrontendMetaClient {}, - ))); let catalog_reader = CatalogReader::new(Arc::new(parking_lot::RwLock::new(Catalog::default()))); let query = create_query().await; let query_id = query.query_id().clone(); - let pinned_snapshot = hummock_snapshot_manager.acquire(); let query_execution = Arc::new(QueryExecution::new(query, (0, 0), None)); let query_execution_info = Arc::new(RwLock::new(QueryExecutionInfo::new_from_map( HashMap::from([(query_id, query_execution.clone())]), @@ -525,10 +516,9 @@ pub(crate) mod tests { .start( ExecutionContext::new(SessionImpl::mock().into(), None).into(), worker_node_selector, - ReadSnapshot::FrontendPinned { - snapshot: pinned_snapshot, - is_barrier_read: true - }, + ReadSnapshot::ReadUncommitted + .batch_query_epoch(&HashSet::from_iter([0.into()])) + .unwrap(), compute_client_pool, catalog_reader, query_execution_info, diff --git a/src/frontend/src/scheduler/distributed/query_manager.rs b/src/frontend/src/scheduler/distributed/query_manager.rs index 2d977cfb675e..a448f0e5b5b9 100644 --- a/src/frontend/src/scheduler/distributed/query_manager.rs +++ b/src/frontend/src/scheduler/distributed/query_manager.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::{Arc, RwLock}; @@ -33,6 +33,7 @@ use tokio::sync::OwnedSemaphorePermit; use super::stats::DistributedQueryMetrics; use super::QueryExecution; use crate::catalog::catalog_service::CatalogReader; +use crate::catalog::TableId; use crate::scheduler::plan_fragmenter::{Query, QueryId}; use crate::scheduler::{ExecutionContextRef, SchedulerResult}; @@ -190,6 +191,7 @@ impl QueryManager { &self, context: ExecutionContextRef, query: Query, + read_storage_tables: HashSet, ) -> SchedulerResult { if let Some(query_limit) = self.disrtibuted_query_limit && self.query_metrics.running_query_num.get() as u64 == query_limit @@ -223,7 +225,7 @@ impl QueryManager { .start( context.clone(), worker_node_manager_reader, - pinned_snapshot, + pinned_snapshot.batch_query_epoch(&read_storage_tables)?, self.compute_client_pool.clone(), self.catalog_reader.clone(), self.query_execution_info.clone(), diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index fcd15368bb5f..b11d462b151c 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -41,7 +41,7 @@ use risingwave_pb::batch_plan::{ ExchangeInfo, ExchangeSource, LocalExecutePlan, PbTaskId, PlanFragment, PlanNode as PbPlanNode, TaskOutputId, }; -use risingwave_pb::common::WorkerNode; +use risingwave_pb::common::{BatchQueryEpoch, WorkerNode}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tracing::debug; @@ -52,7 +52,7 @@ use crate::error::RwError; use crate::optimizer::plan_node::PlanNodeType; use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId}; use crate::scheduler::task_context::FrontendBatchTaskContext; -use crate::scheduler::{ReadSnapshot, SchedulerError, SchedulerResult}; +use crate::scheduler::{SchedulerError, SchedulerResult}; use crate::session::{FrontendEnv, SessionImpl}; // TODO(error-handling): use a concrete error type. @@ -61,9 +61,7 @@ pub struct LocalQueryExecution { sql: String, query: Query, front_env: FrontendEnv, - // The snapshot will be released when LocalQueryExecution is dropped. - // TODO - snapshot: ReadSnapshot, + batch_query_epoch: BatchQueryEpoch, session: Arc, worker_node_manager: WorkerNodeSelector, timeout: Option, @@ -74,21 +72,20 @@ impl LocalQueryExecution { query: Query, front_env: FrontendEnv, sql: S, - snapshot: ReadSnapshot, + support_barrier_read: bool, + batch_query_epoch: BatchQueryEpoch, session: Arc, timeout: Option, ) -> Self { let sql = sql.into(); - let worker_node_manager = WorkerNodeSelector::new( - front_env.worker_node_manager_ref(), - snapshot.support_barrier_read(), - ); + let worker_node_manager = + WorkerNodeSelector::new(front_env.worker_node_manager_ref(), support_barrier_read); Self { sql, query, front_env, - snapshot, + batch_query_epoch, session, worker_node_manager, timeout, @@ -118,7 +115,7 @@ impl LocalQueryExecution { &plan_node, &task_id, context, - self.snapshot.batch_query_epoch(), + self.batch_query_epoch, self.shutdown_rx().clone(), ); let executor = executor.build().await?; @@ -133,7 +130,7 @@ impl LocalQueryExecution { let span = tracing::info_span!( "local_execute", query_id = self.query.query_id.id, - epoch = ?self.snapshot.batch_query_epoch(), + epoch = ?self.batch_query_epoch, ); Box::pin(self.run_inner().instrument(span)) } @@ -339,7 +336,7 @@ impl LocalQueryExecution { }; let local_execute_plan = LocalExecutePlan { plan: Some(second_stage_plan_fragment), - epoch: Some(self.snapshot.batch_query_epoch()), + epoch: Some(self.batch_query_epoch), tracing_context: tracing_context.clone(), }; let exchange_source = ExchangeSource { @@ -383,7 +380,7 @@ impl LocalQueryExecution { }; let local_execute_plan = LocalExecutePlan { plan: Some(second_stage_plan_fragment), - epoch: Some(self.snapshot.batch_query_epoch()), + epoch: Some(self.batch_query_epoch), tracing_context: tracing_context.clone(), }; // NOTE: select a random work node here. @@ -422,7 +419,7 @@ impl LocalQueryExecution { }; let local_execute_plan = LocalExecutePlan { plan: Some(second_stage_plan_fragment), - epoch: Some(self.snapshot.batch_query_epoch()), + epoch: Some(self.batch_query_epoch), tracing_context: tracing_context.clone(), }; // NOTE: select a random work node here. @@ -458,7 +455,7 @@ impl LocalQueryExecution { let local_execute_plan = LocalExecutePlan { plan: Some(second_stage_plan_fragment), - epoch: Some(self.snapshot.batch_query_epoch()), + epoch: Some(self.batch_query_epoch), tracing_context, }; diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index 8f979bf2bfdf..49667b2aa143 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -12,20 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use risingwave_common::util::epoch::Epoch; +use anyhow::anyhow; +use risingwave_common::catalog::TableId; +use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; use risingwave_hummock_sdk::{ FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID, }; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; -use risingwave_pb::hummock::HummockVersionDeltas; +use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta}; use tokio::sync::watch; use crate::expr::InlineNowProcTime; use crate::meta_client::FrontendMetaClient; +use crate::scheduler::SchedulerError; /// The storage snapshot to read from in a query, which can be freely cloned. #[derive(Clone)] @@ -33,10 +36,10 @@ pub enum ReadSnapshot { /// A frontend-pinned snapshot. FrontendPinned { snapshot: PinnedSnapshotRef, - // It's embedded here because we always use it together with snapshot. - is_barrier_read: bool, }, + ReadUncommitted, + /// Other arbitrary epoch, e.g. user specified. /// Availability and consistency of underlying data should be guaranteed accordingly. /// Currently it's only used for querying meta snapshot backup. @@ -45,21 +48,34 @@ pub enum ReadSnapshot { impl ReadSnapshot { /// Get the [`BatchQueryEpoch`] for this snapshot. - pub fn batch_query_epoch(&self) -> BatchQueryEpoch { - match self { - ReadSnapshot::FrontendPinned { - snapshot, - is_barrier_read, - } => snapshot.batch_query_epoch(*is_barrier_read), + pub fn batch_query_epoch( + &self, + read_storage_tables: &HashSet, + ) -> Result { + Ok(match self { + ReadSnapshot::FrontendPinned { snapshot } => BatchQueryEpoch { + epoch: Some(batch_query_epoch::Epoch::Committed( + snapshot.batch_query_epoch(read_storage_tables)?.0, + )), + }, + ReadSnapshot::ReadUncommitted => BatchQueryEpoch { + epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)), + }, ReadSnapshot::Other(e) => BatchQueryEpoch { epoch: Some(batch_query_epoch::Epoch::Backup(e.0)), }, - } + }) } pub fn inline_now_proc_time(&self) -> InlineNowProcTime { let epoch = match self { - ReadSnapshot::FrontendPinned { snapshot, .. } => Epoch(snapshot.committed_epoch()), + ReadSnapshot::FrontendPinned { snapshot } => snapshot + .value + .state_table_info + .max_table_committed_epoch() + .map(Epoch) + .unwrap_or_else(Epoch::now), + ReadSnapshot::ReadUncommitted => Epoch::now(), ReadSnapshot::Other(epoch) => *epoch, }; InlineNowProcTime::new(epoch) @@ -67,13 +83,7 @@ impl ReadSnapshot { /// Returns true if this snapshot is a barrier read. pub fn support_barrier_read(&self) -> bool { - match self { - ReadSnapshot::FrontendPinned { - snapshot: _, - is_barrier_read, - } => *is_barrier_read, - ReadSnapshot::Other(_) => false, - } + matches!(self, ReadSnapshot::ReadUncommitted) } } @@ -93,17 +103,38 @@ impl std::fmt::Debug for PinnedSnapshot { pub type PinnedSnapshotRef = Arc; impl PinnedSnapshot { - fn batch_query_epoch(&self, is_barrier_read: bool) -> BatchQueryEpoch { - let epoch = if is_barrier_read { - batch_query_epoch::Epoch::Current(u64::MAX) - } else { - batch_query_epoch::Epoch::Committed(self.value.max_committed_epoch) - }; - BatchQueryEpoch { epoch: Some(epoch) } + fn batch_query_epoch( + &self, + read_storage_tables: &HashSet, + ) -> Result { + // use the min committed epoch of tables involved in the scan + let epoch = read_storage_tables + .iter() + .map(|table_id| { + self.value + .state_table_info + .info() + .get(table_id) + .map(|info| Epoch(info.committed_epoch)) + .ok_or_else(|| anyhow!("table id {table_id} may have been dropped")) + }) + .try_fold(None, |prev_min_committed_epoch, committed_epoch| { + committed_epoch.map(|committed_epoch| { + if let Some(prev_min_committed_epoch) = prev_min_committed_epoch + && prev_min_committed_epoch <= committed_epoch + { + Some(prev_min_committed_epoch) + } else { + Some(committed_epoch) + } + }) + })? + .unwrap_or_else(Epoch::now); + Ok(epoch) } - pub fn committed_epoch(&self) -> u64 { - self.value.max_committed_epoch + pub fn version(&self) -> &FrontendHummockVersion { + &self.value } } @@ -111,7 +142,6 @@ impl PinnedSnapshot { fn invalid_snapshot() -> FrontendHummockVersion { FrontendHummockVersion { id: INVALID_VERSION_ID, - max_committed_epoch: 0, state_table_info: HummockVersionStateTableInfo::from_protobuf(&HashMap::new()), table_change_log: Default::default(), } @@ -169,6 +199,24 @@ impl HummockSnapshotManager { }) } + pub fn add_table_for_test(&self, table_id: TableId) { + self.update_inner(|version| { + let mut version = version.clone(); + version.id = version.id.next(); + version.state_table_info.apply_delta( + &HashMap::from_iter([( + table_id, + StateTableInfoDelta { + committed_epoch: INVALID_EPOCH, + compaction_group_id: 0, + }, + )]), + &HashSet::new(), + ); + Some(version) + }); + } + fn update_inner( &self, get_new_snapshot: impl FnOnce(&FrontendHummockVersion) -> Option, diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index ff83c905688a..bb4e6928d364 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -186,14 +186,17 @@ impl FrontendEnv { use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter}; let catalog = Arc::new(RwLock::new(Catalog::default())); - let catalog_writer = Arc::new(MockCatalogWriter::new(catalog.clone())); + let meta_client = Arc::new(MockFrontendMetaClient {}); + let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone())); + let catalog_writer = Arc::new(MockCatalogWriter::new( + catalog.clone(), + hummock_snapshot_manager.clone(), + )); let catalog_reader = CatalogReader::new(catalog); let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default())); let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone())); let user_info_reader = UserInfoReader::new(user_info_manager); let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![])); - let meta_client = Arc::new(MockFrontendMetaClient {}); - let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone())); let system_params_manager = Arc::new(LocalSystemParamsManager::for_test()); let compute_client_pool = Arc::new(ComputeClientPool::for_test()); let query_manager = QueryManager::new( diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index c4e0ab709621..aa798a236b5c 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -14,11 +14,12 @@ use core::mem; use core::time::Duration; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::rc::Rc; use std::sync::Arc; use std::time::Instant; +use anyhow::anyhow; use bytes::Bytes; use fixedbitset::FixedBitSet; use futures::StreamExt; @@ -48,7 +49,7 @@ use crate::monitor::{CursorMetrics, PeriodicCursorMetrics}; use crate::optimizer::plan_node::{generic, BatchLogSeqScan}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::PlanRoot; -use crate::scheduler::{DistributedQueryStream, LocalQueryStream, ReadSnapshot}; +use crate::scheduler::{DistributedQueryStream, LocalQueryStream}; use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, PlanRef, TableCatalog}; pub enum CursorDataChunkStream { @@ -256,14 +257,17 @@ impl SubscriptionCursor { // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch? let (chunk_stream, fields, init_query_timer) = Self::initiate_query(None, &dependent_table_id, handle_args.clone()).await?; - let pinned_epoch = match handle_args.session.get_pinned_snapshot().ok_or_else(|| { - ErrorCode::InternalError("Fetch Cursor can't find snapshot epoch".to_string()) - })? { - ReadSnapshot::FrontendPinned { snapshot, .. } => snapshot.committed_epoch(), - ReadSnapshot::Other(_) => { - return Err(ErrorCode::InternalError("Fetch Cursor can't start from specified query epoch. May run `set query_epoch = 0;`".to_string()).into()); - } - }; + let pinned_epoch = handle_args + .session + .env + .hummock_snapshot_manager + .acquire() + .version() + .state_table_info + .info() + .get(&dependent_table_id) + .ok_or_else(|| anyhow!("dependent_table_id {dependent_table_id} not exists"))? + .committed_epoch; let start_timestamp = pinned_epoch; ( @@ -647,6 +651,7 @@ impl SubscriptionCursor { schema, stmt_type: StatementType::SELECT, dependent_relations: table_catalog.dependent_relations.clone(), + read_storage_tables: HashSet::from_iter([table_catalog.id]), }) } diff --git a/src/frontend/src/session/transaction.rs b/src/frontend/src/session/transaction.rs index d14df7f4900a..085a66c01af1 100644 --- a/src/frontend/src/session/transaction.rs +++ b/src/frontend/src/session/transaction.rs @@ -225,15 +225,15 @@ impl SessionImpl { if let Some(query_epoch) = query_epoch { ReadSnapshot::Other(query_epoch) + } else if self.is_barrier_read() { + ReadSnapshot::ReadUncommitted } else { // Acquire hummock snapshot for execution. - let is_barrier_read = self.is_barrier_read(); let hummock_snapshot_manager = self.env().hummock_snapshot_manager(); let pinned_snapshot = hummock_snapshot_manager.acquire(); ReadSnapshot::FrontendPinned { snapshot: pinned_snapshot, - is_barrier_read, } } }) diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index a089f0043591..731c7a486df8 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -72,6 +72,7 @@ use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId}; use crate::error::{ErrorCode, Result}; use crate::handler::RwPgResponse; use crate::meta_client::FrontendMetaClient; +use crate::scheduler::HummockSnapshotManagerRef; use crate::session::{AuthContext, FrontendEnv, SessionImpl}; use crate::user::user_manager::UserInfoManager; use crate::user::user_service::UserInfoWriter; @@ -234,6 +235,7 @@ pub struct MockCatalogWriter { id: AtomicU32, table_id_to_schema_id: RwLock>, schema_id_to_database_id: RwLock>, + hummock_snapshot_manager: HummockSnapshotManagerRef, } #[async_trait::async_trait] @@ -280,6 +282,8 @@ impl CatalogWriter for MockCatalogWriter { table.stream_job_status = PbStreamJobStatus::Created as _; self.catalog.write().create_table(&table); self.add_table_or_source_id(table.id, table.schema_id, table.database_id); + self.hummock_snapshot_manager + .add_table_for_test(TableId::new(table.id)); Ok(()) } @@ -660,7 +664,10 @@ impl CatalogWriter for MockCatalogWriter { } impl MockCatalogWriter { - pub fn new(catalog: Arc>) -> Self { + pub fn new( + catalog: Arc>, + hummock_snapshot_manager: HummockSnapshotManagerRef, + ) -> Self { catalog.write().create_database(&PbDatabase { id: 0, name: DEFAULT_DATABASE_NAME.to_string(), @@ -693,6 +700,7 @@ impl MockCatalogWriter { id: AtomicU32::new(3), table_id_to_schema_id: Default::default(), schema_id_to_database_id: RwLock::new(map), + hummock_snapshot_manager, } } diff --git a/src/storage/hummock_sdk/src/frontend_version.rs b/src/storage/hummock_sdk/src/frontend_version.rs index 2086832d0771..254e151f7ec0 100644 --- a/src/storage/hummock_sdk/src/frontend_version.rs +++ b/src/storage/hummock_sdk/src/frontend_version.rs @@ -15,6 +15,7 @@ use std::collections::{HashMap, HashSet}; use risingwave_common::catalog::TableId; +use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta; use risingwave_pb::hummock::{ PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbTableChangeLog, @@ -28,7 +29,6 @@ use crate::{HummockVersionId, INVALID_VERSION_ID}; #[derive(Clone, Debug)] pub struct FrontendHummockVersion { pub id: HummockVersionId, - pub max_committed_epoch: u64, pub state_table_info: HummockVersionStateTableInfo, pub table_change_log: HashMap>, } @@ -37,7 +37,6 @@ impl FrontendHummockVersion { pub fn from_version(version: &HummockVersion) -> Self { Self { id: version.id, - max_committed_epoch: version.max_committed_epoch, state_table_info: version.state_table_info.clone(), table_change_log: version .table_change_log @@ -66,7 +65,7 @@ impl FrontendHummockVersion { PbHummockVersion { id: self.id.0, levels: Default::default(), - max_committed_epoch: self.max_committed_epoch, + max_committed_epoch: INVALID_EPOCH, table_watermarks: Default::default(), table_change_logs: self .table_change_log @@ -95,7 +94,6 @@ impl FrontendHummockVersion { pub fn from_protobuf(value: PbHummockVersion) -> Self { Self { id: HummockVersionId(value.id), - max_committed_epoch: value.max_committed_epoch, state_table_info: HummockVersionStateTableInfo::from_protobuf(&value.state_table_info), table_change_log: value .table_change_logs @@ -125,7 +123,6 @@ impl FrontendHummockVersion { assert_eq!(self.id, delta.prev_id); } self.id = delta.id; - self.max_committed_epoch = delta.max_committed_epoch; let (changed_table_info, _) = self .state_table_info .apply_delta(&delta.state_table_info_delta, &delta.removed_table_id); @@ -142,7 +139,6 @@ impl FrontendHummockVersion { pub struct FrontendHummockVersionDelta { pub prev_id: HummockVersionId, pub id: HummockVersionId, - pub max_committed_epoch: u64, pub removed_table_id: HashSet, pub state_table_info_delta: HashMap, pub change_log_delta: HashMap>, @@ -153,7 +149,6 @@ impl FrontendHummockVersionDelta { Self { prev_id: delta.prev_id, id: delta.id, - max_committed_epoch: delta.max_committed_epoch, removed_table_id: delta.removed_table_ids.clone(), state_table_info_delta: delta.state_table_info_delta.clone(), change_log_delta: delta @@ -183,7 +178,7 @@ impl FrontendHummockVersionDelta { id: self.id.to_u64(), prev_id: self.prev_id.to_u64(), group_deltas: Default::default(), - max_committed_epoch: self.max_committed_epoch, + max_committed_epoch: INVALID_EPOCH, trivial_move: false, new_table_watermarks: Default::default(), removed_table_ids: self @@ -220,7 +215,6 @@ impl FrontendHummockVersionDelta { Self { prev_id: HummockVersionId::new(delta.prev_id), id: HummockVersionId::new(delta.id), - max_committed_epoch: delta.max_committed_epoch, removed_table_id: delta .removed_table_ids .iter() diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 0a10f7b46e40..7be1cbdb834c 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -36,7 +36,9 @@ use crate::compaction_group::StaticCompactionGroupId; use crate::level::LevelsCommon; use crate::sstable_info::SstableInfo; use crate::table_watermark::TableWatermarks; -use crate::{CompactionGroupId, HummockSstableObjectId, HummockVersionId, FIRST_VERSION_ID}; +use crate::{ + CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, FIRST_VERSION_ID, +}; #[derive(Debug, Clone, PartialEq)] pub struct HummockVersionStateTableInfo { @@ -204,6 +206,13 @@ impl HummockVersionStateTableInfo { pub fn compaction_group_member_tables(&self) -> &HashMap> { &self.compaction_group_member_tables } + + pub fn max_table_committed_epoch(&self) -> Option { + self.state_table_info + .values() + .map(|info| info.committed_epoch) + .max() + } } #[derive(Debug, Clone, PartialEq)]