Skip to content

Commit

Permalink
fix: change tracking stream failed (#17072)
Browse files Browse the repository at this point in the history
fix change tracking failed
  • Loading branch information
zhyass authored Dec 19, 2024
1 parent 327cd75 commit aa9a972
Show file tree
Hide file tree
Showing 26 changed files with 216 additions and 104 deletions.
22 changes: 22 additions & 0 deletions src/query/ast/src/ast/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,28 @@ impl Display for WithOptions {
}
}

impl WithOptions {
/// Used for build change query.
pub fn to_change_query_with_clause(&self) -> String {
let mut result = String::from(" WITH (");
for (i, (k, v)) in self.options.iter().enumerate() {
if i > 0 {
result.push_str(", ");
}

if k == "consume" {
// The consume stream will be recorded in QueryContext.
// Skip 'consume' to avoid unnecessary operations.
result.push_str("consume = false");
} else {
result.push_str(&format!("{k} = '{v}'"));
}
}
result.push(')');
result
}
}

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct ChangesInterval {
pub append_only: bool,
Expand Down
8 changes: 8 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,14 @@ pub trait TableContext: Send + Sync {
fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str);

async fn drop_m_cte_temp_table(&self) -> Result<()>;

fn add_streams_ref(&self, _catalog: &str, _database: &str, _stream: &str, _consume: bool) {
unimplemented!()
}

fn get_consume_streams(&self, _query: bool) -> Result<Vec<Arc<dyn Table>>> {
unimplemented!()
}
}

pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;
Expand Down
35 changes: 3 additions & 32 deletions src/query/service/src/interpreters/common/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use chrono::Utc;
Expand All @@ -24,13 +23,10 @@ use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::UpdateStreamMetaReq;
use databend_common_meta_app::schema::UpdateTableMetaReq;
use databend_common_meta_types::MatchSeq;
use databend_common_sql::MetadataRef;
use databend_common_sql::TableEntry;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;
use databend_common_storages_stream::stream_table::StreamTable;
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_NAME;
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
Expand All @@ -42,9 +38,8 @@ use crate::sessions::QueryContext;

pub async fn dml_build_update_stream_req(
ctx: Arc<QueryContext>,
metadata: &MetadataRef,
) -> Result<Vec<UpdateStreamMetaReq>> {
let tables = get_stream_table(metadata, |t| t.table().engine() == STREAM_ENGINE)?;
let tables = ctx.get_consume_streams(false)?;
if tables.is_empty() {
return Ok(vec![]);
}
Expand Down Expand Up @@ -96,38 +91,14 @@ pub async fn dml_build_update_stream_req(
Ok(reqs)
}

fn get_stream_table<F>(metadata: &MetadataRef, pred: F) -> Result<Vec<Arc<dyn Table>>>
where F: Fn(&TableEntry) -> bool {
let r_lock = metadata.read();
let tables = r_lock.tables();
let mut streams = vec![];
let mut streams_ids = HashSet::new();
for t in tables {
if pred(t) {
let stream = t.table();

let stream_id = stream.get_table_info().ident.table_id;
if streams_ids.contains(&stream_id) {
continue;
}
streams_ids.insert(stream_id);

streams.push(stream);
}
}
Ok(streams)
}

pub struct StreamTableUpdates {
pub update_table_metas: Vec<(UpdateTableMetaReq, TableInfo)>,
}

pub async fn query_build_update_stream_req(
ctx: &Arc<QueryContext>,
metadata: &MetadataRef,
) -> Result<Option<StreamTableUpdates>> {
let streams = get_stream_table(metadata, |t| {
t.is_consume() && t.table().engine() == STREAM_ENGINE
})?;
let streams = ctx.get_consume_streams(true)?;
if streams.is_empty() {
return Ok(None);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl CopyIntoLocationInterpreter {
false,
)?;

let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), metadata).await?;
let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?;

Ok((select_interpreter, update_stream_meta))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl CopyIntoTableInterpreter {
v => unreachable!("Input plan must be Query, but it's {}", v),
};

let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), metadata).await?;
let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?;

let select_interpreter = SelectInterpreter::try_create(
self.ctx.clone(),
Expand Down
3 changes: 1 addition & 2 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ impl Interpreter for InsertInterpreter {
.format_pretty()?;
info!("Insert select plan: \n{}", explain_plan);

let update_stream_meta =
dml_build_update_stream_req(self.ctx.clone(), metadata).await?;
let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?;

// here we remove the last exchange merge plan to trigger distribute insert
let insert_select_plan = match (select_plan, table.support_distributed_insert()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ impl Interpreter for InsertMultiTableInterpreter {

impl InsertMultiTableInterpreter {
pub async fn build_physical_plan(&self) -> Result<PhysicalPlan> {
let (mut root, metadata) = self.build_source_physical_plan().await?;
let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), &metadata).await?;
let (mut root, _) = self.build_source_physical_plan().await?;
let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?;
let source_schema = root.output_schema()?;
let branches = self.build_insert_into_branches().await?;
let serializable_tables = branches
Expand Down
3 changes: 1 addition & 2 deletions src/query/service/src/interpreters/interpreter_mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ impl MutationInterpreter {
table_snapshot: Option<Arc<TableSnapshot>>,
) -> Result<MutationBuildInfo> {
let table_info = fuse_table.get_table_info().clone();
let update_stream_meta =
dml_build_update_stream_req(self.ctx.clone(), &mutation.metadata).await?;
let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?;
let partitions = self
.mutation_source_partitions(mutation, fuse_table, table_snapshot.clone())
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ impl ReplaceInterpreter {
v => unreachable!("Input plan must be Query, but it's {}", v),
};

let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), metadata).await?;
let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?;

let select_interpreter = SelectInterpreter::try_create(
ctx.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl SelectInterpreter {
.await?;

// consume stream
let update_stream_metas = query_build_update_stream_req(&self.ctx, &self.metadata).await?;
let update_stream_metas = query_build_update_stream_req(&self.ctx).await?;

let catalog = self.ctx.get_default_catalog()?;
build_res
Expand Down
33 changes: 33 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,39 @@ impl TableContext for QueryContext {
m_cte_temp_table.clear();
Ok(())
}

fn add_streams_ref(&self, catalog: &str, database: &str, stream: &str, consume: bool) {
let mut streams = self.shared.streams_refs.write();
let stream_key = (
catalog.to_string(),
database.to_string(),
stream.to_string(),
);
streams
.entry(stream_key)
.and_modify(|v| {
if consume {
*v = true;
}
})
.or_insert(consume);
}

fn get_consume_streams(&self, query: bool) -> Result<Vec<Arc<dyn Table>>> {
let streams_refs = self.shared.streams_refs.read();
let tables = self.shared.tables_refs.lock();
let mut streams_meta = Vec::with_capacity(streams_refs.len());
for (stream_key, consume) in streams_refs.iter() {
if query && !consume {
continue;
}
let stream = tables
.get(stream_key)
.ok_or_else(|| ErrorCode::Internal("It's a bug"))?;
streams_meta.push(stream.clone());
}
Ok(streams_meta)
}
}

impl TrySpawn for QueryContext {
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub struct QueryContextShared {
pub(in crate::sessions) running_query_parameterized_hash: Arc<RwLock<Option<String>>>,
pub(in crate::sessions) aborting: Arc<AtomicBool>,
pub(in crate::sessions) tables_refs: Arc<Mutex<HashMap<DatabaseAndTable, Arc<dyn Table>>>>,
pub(in crate::sessions) streams_refs: Arc<RwLock<HashMap<DatabaseAndTable, bool>>>,
pub(in crate::sessions) affect: Arc<Mutex<Option<QueryAffect>>>,
pub(in crate::sessions) catalog_manager: Arc<CatalogManager>,
pub(in crate::sessions) data_operator: DataOperator,
Expand Down Expand Up @@ -168,6 +169,7 @@ impl QueryContextShared {
running_query_parameterized_hash: Arc::new(RwLock::new(None)),
aborting: Arc::new(AtomicBool::new(false)),
tables_refs: Arc::new(Mutex::new(HashMap::new())),
streams_refs: Default::default(),
affect: Arc::new(Mutex::new(None)),
executor: Arc::new(RwLock::new(Weak::new())),
stage_attachment: Arc::new(RwLock::new(None)),
Expand Down Expand Up @@ -337,7 +339,6 @@ impl QueryContextShared {
max_batch_size: Option<u64>,
) -> Result<Arc<dyn Table>> {
// Always get same table metadata in the same query

let table_meta_key = (catalog.to_string(), database.to_string(), table.to_string());

let already_in_cache = { self.tables_refs.lock().contains_key(&table_meta_key) };
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/bind_query/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl Binder {
let query_id = self.ctx.get_id();
let database = self.ctx.get_current_database();
let mut table_identifier = cte.alias.name.clone();
table_identifier.name = format!("{}_{}", table_identifier.name, query_id.replace("-", "_"));
table_identifier.name = format!("{}${}", table_identifier.name, query_id.replace("-", ""));
let table_name = normalize_identifier(&table_identifier, &self.name_resolution_ctx).name;
self.m_cte_table_name.insert(
normalize_identifier(&cte.alias.name, &self.name_resolution_ctx).name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Binder {
check_with_opt_valid(with_options)?;
let consume = get_with_opt_consume(with_options)?;
let max_batch_size = get_with_opt_max_batch_size(with_options)?;
let with_opts_str = format!(" {with_options}");
let with_opts_str = with_options.to_change_query_with_clause();
(consume, max_batch_size, with_opts_str)
} else {
(false, None, String::new())
Expand All @@ -74,7 +74,7 @@ impl Binder {
let cte_map = bind_context.cte_context.cte_map.clone();
if let Some(cte_info) = cte_map.get(&table_name) {
if cte_info.materialized {
cte_suffix_name = Some(self.ctx.get_id().replace("-", "_"));
cte_suffix_name = Some(self.ctx.get_id().replace("-", ""));
} else {
if self
.metadata
Expand Down Expand Up @@ -105,7 +105,7 @@ impl Binder {
// Resolve table with catalog
let table_meta = {
let table_name = if let Some(cte_suffix_name) = cte_suffix_name.as_ref() {
format!("{}_{}", &table_name, cte_suffix_name)
format!("{}${}", &table_name, cte_suffix_name)
} else {
table_name.clone()
};
Expand Down Expand Up @@ -161,7 +161,6 @@ impl Binder {
bind_context.view_info.is_some(),
bind_context.planning_agg_index,
false,
consume,
None,
);
let (s_expr, mut bind_context) = self.bind_base_table(
Expand All @@ -186,13 +185,20 @@ impl Binder {
&with_opts_str,
))?;

if table_meta.is_stream() {
self.ctx
.add_streams_ref(&catalog, &database, &table_name, consume);
}
let mut new_bind_context = BindContext::with_parent(Box::new(bind_context.clone()));
let tokens = tokenize_sql(query.as_str())?;
let (stmt, _) = parse_sql(&tokens, self.dialect)?;
let Statement::Query(query) = &stmt else {
unreachable!()
};
let (s_expr, mut new_bind_context) = self.bind_query(&mut new_bind_context, query)?;
bind_context
.cte_context
.set_cte_context(new_bind_context.cte_context.clone());

let cols = table_meta
.schema()
Expand Down Expand Up @@ -240,7 +246,6 @@ impl Binder {
false,
false,
false,
false,
None,
);
let (s_expr, mut new_bind_context) =
Expand Down Expand Up @@ -273,7 +278,6 @@ impl Binder {
bind_context.view_info.is_some(),
bind_context.planning_agg_index,
false,
false,
cte_suffix_name,
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ impl Binder {
false,
false,
false,
false,
None,
);

Expand Down Expand Up @@ -209,7 +208,6 @@ impl Binder {
false,
false,
false,
false,
None,
);

Expand Down
25 changes: 14 additions & 11 deletions src/query/sql/src/planner/binder/binder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use databend_common_ast::parser::parse_sql;
use databend_common_ast::parser::tokenize_sql;
use databend_common_ast::parser::Dialect;
use databend_common_catalog::catalog::CatalogManager;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand All @@ -48,6 +47,7 @@ use databend_common_metrics::storage::metrics_inc_copy_purge_files_counter;
use databend_common_storage::init_stage_operator;
use databend_storages_common_io::Files;
use databend_storages_common_session::TxnManagerRef;
use databend_storages_common_table_meta::table::is_stream_name;
use log::error;
use log::info;
use log::warn;
Expand Down Expand Up @@ -649,17 +649,20 @@ impl<'a> Binder {
}
};

match plan.kind() {
QueryKind::Query | QueryKind::Explain => {}
match &plan {
Plan::Explain { .. }
| Plan::ExplainAnalyze { .. }
| Plan::ExplainAst { .. }
| Plan::ExplainSyntax { .. }
| Plan::Query { .. } => {}
Plan::CreateTable(plan)
if is_stream_name(&plan.table, self.ctx.get_id().replace("-", "").as_str()) => {}
_ => {
let meta_data_guard = self.metadata.read();
let tables = meta_data_guard.tables();
for t in tables {
if t.is_consume() {
return Err(ErrorCode::SyntaxException(
"WITH CONSUME only allowed in query",
));
}
let consume_streams = self.ctx.get_consume_streams(true)?;
if !consume_streams.is_empty() {
return Err(ErrorCode::SyntaxException(
"WITH CONSUME only allowed in query",
));
}
}
}
Expand Down
Loading

0 comments on commit aa9a972

Please sign in to comment.