Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test cloud distributed merge into #13486

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
1f13597
add settings
JackTan25 Oct 9, 2023
879ee6e
right join for merge into first
JackTan25 Oct 10, 2023
92538e1
add distribution optimization for merge into join
JackTan25 Oct 13, 2023
19a1999
split merge into plan
JackTan25 Oct 16, 2023
60f79f7
fix update identify error
JackTan25 Oct 16, 2023
662af10
finish distibuted baisc codes
JackTan25 Oct 16, 2023
76e1352
fix typo
JackTan25 Oct 16, 2023
e4f7450
uniform row_kind and mutation_log
JackTan25 Oct 17, 2023
cd92873
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 17, 2023
93a1cdf
fix MixRowKindAndLog serialize and deserialize
JackTan25 Oct 18, 2023
0a070b1
add tests
JackTan25 Oct 18, 2023
35ff1f5
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 18, 2023
f258ac0
fix check
JackTan25 Oct 18, 2023
fe15639
Merge branch 'distributed_merge_into' of https://github.com/JackTan25…
JackTan25 Oct 18, 2023
f292341
fix check
JackTan25 Oct 18, 2023
137b52f
fix check
JackTan25 Oct 18, 2023
f39bfd1
fix test
JackTan25 Oct 18, 2023
9b114f4
fix test
JackTan25 Oct 18, 2023
d177d15
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 18, 2023
7bcf760
fix
JackTan25 Oct 18, 2023
636825f
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 18, 2023
6bb03e6
remove memory size limit
JackTan25 Oct 19, 2023
df8acbf
Merge branch 'distributed_merge_into' of https://github.com/JackTan25…
JackTan25 Oct 19, 2023
b262af2
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 19, 2023
c08cf13
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 21, 2023
1c64166
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 23, 2023
ceae57f
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 23, 2023
0d0eba9
optmizie merge source and add row_number processor
JackTan25 Oct 23, 2023
74c1ec1
Merge branch 'distributed_merge_into' of https://github.com/JackTan25…
JackTan25 Oct 23, 2023
ab2469d
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 23, 2023
db00e4e
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 24, 2023
e95a280
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 24, 2023
f1f2f94
fix delete bug
JackTan25 Oct 24, 2023
bd3be35
add row number plan
JackTan25 Oct 24, 2023
a2fec57
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 24, 2023
b60479e
fix row number
JackTan25 Oct 25, 2023
c79a04b
refactor merge into pipeline
JackTan25 Oct 25, 2023
551d854
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 25, 2023
126da66
split row_number and log, try to get hash table source data
JackTan25 Oct 26, 2023
4112417
finish distributed codes, need to get data from hashtable
JackTan25 Oct 26, 2023
2450df7
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 26, 2023
622a13a
finish not macthed append data
JackTan25 Oct 27, 2023
8a56035
fix conflict
JackTan25 Oct 27, 2023
a15df12
fix filter
JackTan25 Oct 27, 2023
18d25d0
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 27, 2023
672218e
fix filter
JackTan25 Oct 27, 2023
03ac2a8
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 27, 2023
2ba7b7d
fix distributed bugs,many bugs, need to support insert
JackTan25 Oct 28, 2023
9eebe16
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 28, 2023
4e168db
fix bugs
JackTan25 Oct 29, 2023
73e6210
fix check and clean codes
JackTan25 Oct 29, 2023
6ffdee9
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 29, 2023
1930589
fix check
JackTan25 Oct 29, 2023
f9200b4
Merge branch 'distributed_merge_into' of https://github.com/JackTan25…
JackTan25 Oct 29, 2023
2236178
add more tests
JackTan25 Oct 29, 2023
8f1739d
fix flaky
JackTan25 Oct 29, 2023
68aa9be
fix test result
JackTan25 Oct 29, 2023
ad0be11
remove memory limit
JackTan25 Oct 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ pub async fn init_services(conf: &InnerConfig) -> Result<()> {

async fn precheck_services(conf: &InnerConfig) -> Result<()> {
if conf.query.max_memory_limit_enabled {
let size = conf.query.max_server_memory_usage as i64;
info!("Set memory limit: {}", size);
GLOBAL_MEM_STAT.set_limit(size);
// let size = conf.query.max_server_memory_usage as i64;
// info!("Set memory limit: {}", size);
// GLOBAL_MEM_STAT.set_limit(size);
}

let tenant = conf.query.tenant_id.clone();
Expand Down
7 changes: 1 addition & 6 deletions src/query/ast/src/ast/statements/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,13 @@ use crate::ast::TableReference;

#[derive(Debug, Clone, PartialEq)]
pub struct MergeUpdateExpr {
pub catalog: Option<Identifier>,
pub table: Option<Identifier>,
pub name: Identifier,
pub expr: Expr,
}

impl Display for MergeUpdateExpr {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
if self.catalog.is_some() {
write!(f, "{}.", self.catalog.clone().unwrap())?;
}

if self.table.is_some() {
write!(f, "{}.", self.table.clone().unwrap())?;
}
Expand Down Expand Up @@ -103,7 +98,7 @@ impl Display for MergeIntoStmt {
write!(f, "MERGE INTO ")?;
write_dot_separated_list(
f,
self.catalog
self.database
.iter()
.chain(&self.database)
.chain(Some(&self.table_ident)),
Expand Down
9 changes: 2 additions & 7 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2850,12 +2850,7 @@ pub fn udf_definition(i: Input) -> IResult<UDFDefinition> {

pub fn merge_update_expr(i: Input) -> IResult<MergeUpdateExpr> {
map(
rule! { ( #dot_separated_idents_1_to_3 ~ "=" ~ ^#expr ) },
|((catalog, table, name), _, expr)| MergeUpdateExpr {
catalog,
table,
name,
expr,
},
rule! { ( #dot_separated_idents_1_to_2 ~ "=" ~ ^#expr ) },
|((table, name), _, expr)| MergeUpdateExpr { table, name, expr },
)(i)
}
1 change: 1 addition & 0 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub const SEGMENT_NAME_COLUMN_ID: u32 = u32::MAX - 2;
pub const SNAPSHOT_NAME_COLUMN_ID: u32 = u32::MAX - 3;

pub const ROW_ID_COL_NAME: &str = "_row_id";
pub const ROW_NUMBER_COL_NAME: &str = "_row_number";
pub const SNAPSHOT_NAME_COL_NAME: &str = "_snapshot_name";
pub const SEGMENT_NAME_COL_NAME: &str = "_segment_name";
pub const BLOCK_NAME_COL_NAME: &str = "_block_name";
Expand Down
6 changes: 6 additions & 0 deletions src/query/pipeline/core/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,10 @@ impl TransformPipeBuilder {
}
self.items = items
}

pub fn add_items(&mut self, items: Vec<PipeItem>) {
for item in items {
self.items.push(item)
}
}
}
103 changes: 101 additions & 2 deletions src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::pipelines::PipelineBuilder;
use crate::pipelines::PipelineBuilderData;
use crate::schedulers::QueryFragmentActions;
use crate::schedulers::QueryFragmentsActions;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -210,6 +211,28 @@ impl DataExchangeManager {
}
}

// Create a pipeline based on query plan
#[minitrace::trace]
pub fn init_query_fragments_plan_local(
&self,
ctx: &Arc<QueryContext>,
packet: &QueryFragmentsPlanPacket,
) -> Result<()> {
let queries_coordinator_guard = self.queries_coordinator.lock();
let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() };

// TODO: When the query is not executed for a long time after submission, we need to remove it
match queries_coordinator.get_mut(&packet.query_id) {
None => Err(ErrorCode::Internal(format!(
"Query {} not found in cluster.",
packet.query_id
))),
Some(query_coordinator) => {
query_coordinator.prepare_pipeline_local(ctx, packet.enable_profiling, packet)
}
}
}

#[minitrace::trace]
pub fn handle_statistics_exchange(
&self,
Expand Down Expand Up @@ -297,7 +320,7 @@ impl DataExchangeManager {
.await?;

// Submit tasks to localhost
self.init_query_fragments_plan(&ctx, &local_query_fragments_plan_packet)?;
self.init_query_fragments_plan_local(&ctx, &local_query_fragments_plan_packet)?;

// Get local pipeline of local task
let build_res = self.get_root_pipeline(ctx, enable_profiling, root_actions)?;
Expand Down Expand Up @@ -590,6 +613,45 @@ impl QueryCoordinator {
Ok(())
}

pub fn prepare_pipeline_local(
&mut self,
ctx: &Arc<QueryContext>,
enable_profiling: bool,
packet: &QueryFragmentsPlanPacket,
) -> Result<()> {
self.info = Some(QueryInfo {
query_ctx: ctx.clone(),
query_id: packet.query_id.clone(),
current_executor: packet.executor.clone(),
query_executor: None,
});

for fragment in &packet.fragments {
self.fragments_coordinator.insert(
fragment.fragment_id.to_owned(),
FragmentCoordinator::create(fragment),
);
}

let mut input_builder_data = PipelineBuilderData {
input_join_state: None,
input_probe_schema: None,
};

for fragment in &packet.fragments {
let fragment_id = fragment.fragment_id;
if let Some(coordinator) = self.fragments_coordinator.get_mut(&fragment_id) {
input_builder_data = coordinator.prepare_pipeline_local(
ctx.clone(),
enable_profiling,
input_builder_data,
)?;
}
}

Ok(())
}

pub fn subscribe_fragment(
&mut self,
ctx: &Arc<QueryContext>,
Expand Down Expand Up @@ -821,16 +883,53 @@ impl FragmentCoordinator {
self.initialized = true;

let pipeline_ctx = QueryContext::create_from(ctx);

let pipeline_builder = PipelineBuilder::create(
pipeline_ctx.get_function_context()?,
pipeline_ctx.get_settings(),
pipeline_ctx,
enable_profiling,
SharedProcessorProfiles::default(),
);
self.pipeline_build_res = Some(pipeline_builder.finalize(&self.physical_plan)?);
let res = pipeline_builder.finalize(&self.physical_plan)?;

self.pipeline_build_res = Some(res);
}

Ok(())
}

pub fn prepare_pipeline_local(
&mut self,
ctx: Arc<QueryContext>,
enable_profiling: bool,
input_builder_data: PipelineBuilderData,
) -> Result<PipelineBuilderData> {
if !self.initialized {
self.initialized = true;

let pipeline_ctx = QueryContext::create_from(ctx);

let mut pipeline_builder = PipelineBuilder::create(
pipeline_ctx.get_function_context()?,
pipeline_ctx.get_settings(),
pipeline_ctx,
enable_profiling,
SharedProcessorProfiles::default(),
);

pipeline_builder.join_state = input_builder_data.input_join_state;
pipeline_builder.probe_data_fields = input_builder_data.input_probe_schema;

let (res, builder_data) = pipeline_builder.finalize_local(&self.physical_plan)?;

self.pipeline_build_res = Some(res);
return Ok(builder_data);
}

Ok(PipelineBuilderData {
input_join_state: None,
input_probe_schema: None,
})
}
}
Loading
Loading