Skip to content

Commit

Permalink
chore: partially revert "feat: tweak table data life-cycle related sqโ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆl stmts (#13015)" (#13223)

* Revert "feat: tweak table data life-cycle related sql stmts (#13015)"

This reverts commit e14dc7c.

* remove `truncate ... purge`

* clean up

* update doc of ddl-truncate-table
  • Loading branch information
dantengsky authored Oct 12, 2023
1 parent 0dee1ba commit 055c4c6
Show file tree
Hide file tree
Showing 80 changed files with 206 additions and 114 deletions.
3 changes: 1 addition & 2 deletions benchmark/clickbench/hits/clear.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
drop table hits;
VACUUM DROP TABLE retain 0 hours;
drop table hits all;
17 changes: 8 additions & 9 deletions benchmark/clickbench/tpch/clear.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
drop table customer;
drop table lineitem;
drop table nation;
drop table orders;
drop table partsupp;
drop table part;
drop table region;
drop table supplier;
VACUUM DROP TABLE retain 0 hours;
drop table customer all;
drop table lineitem all;
drop table nation all;
drop table orders all;
drop table partsupp all;
drop table part all;
drop table region all;
drop table supplier all;
4 changes: 1 addition & 3 deletions benchmark/tpcds/load_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@ tables=(
# Clear Data
for t in ${tables[@]}
do
echo "DROP TABLE IF EXISTS $t" | $MYSQL_CLIENT_CONNECT
echo "DROP TABLE IF EXISTS $t ALL" | $MYSQL_CLIENT_CONNECT
done

echo "VACUUM DROP TABLE retain 0 hours" | $MYSQL_CLIENT_CONNECT

# Create Tables;
cat "$CURDIR"/tpcds.sql | $MYSQL_CLIENT_CONNECT

Expand Down
3 changes: 3 additions & 0 deletions docs/doc/14-sql-commands/00-ddl/20-table/20-ddl-drop-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ DROP TABLE [IF EXISTS] [db.]name
:::caution

`DROP TABLE` only remove the table schema from meta service, we do not remove the underlying data from the storage.
If you want to delete the data and table all, please use:

`DROP TABLE <table_name> ALL;`

:::

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ FROM
test_truncate

0 row in 0.017 sec. Processed 0 rows, 0B (0 rows/s, 0B/s)
```
```
6 changes: 1 addition & 5 deletions scripts/benchmark/query/load/hits.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ select version();
SQL

cat <<SQL | bendsql
DROP TABLE IF EXISTS hits;
SQL

cat <<SQL | bendsql
VACUUM DROP TABLE retain 0 hours;
DROP TABLE IF EXISTS hits ALL;
SQL

cat <<SQL | bendsql
Expand Down
5 changes: 4 additions & 1 deletion src/query/ast/src/ast/statements/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ pub struct DropTableStmt {
pub catalog: Option<Identifier>,
pub database: Option<Identifier>,
pub table: Identifier,
pub all: bool,
}

impl Display for DropTableStmt {
Expand All @@ -271,6 +272,9 @@ impl Display for DropTableStmt {
.chain(&self.database)
.chain(Some(&self.table)),
)?;
if self.all {
write!(f, " ALL")?;
}

Ok(())
}
Expand Down Expand Up @@ -476,7 +480,6 @@ impl Display for TruncateTableStmt {
.chain(&self.database)
.chain(Some(&self.table)),
)?;

Ok(())
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,14 +578,15 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
);
let drop_table = map(
rule! {
DROP ~ TABLE ~ ( IF ~ ^EXISTS )? ~ #dot_separated_idents_1_to_3
DROP ~ TABLE ~ ( IF ~ ^EXISTS )? ~ #dot_separated_idents_1_to_3 ~ ALL?
},
|(_, _, opt_if_exists, (catalog, database, table))| {
|(_, _, opt_if_exists, (catalog, database, table), opt_all)| {
Statement::DropTable(DropTableStmt {
if_exists: opt_if_exists.is_some(),
catalog,
database,
table,
all: opt_all.is_some(),
})
},
);
Expand Down
4 changes: 4 additions & 0 deletions src/query/ast/tests/it/testdata/statement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,7 @@ DropTable(
11..12,
),
},
all: false,
},
)

Expand Down Expand Up @@ -1544,6 +1545,7 @@ DropTable(
23..26,
),
},
all: false,
},
)

Expand Down Expand Up @@ -2404,6 +2406,7 @@ DropTable(
11..17,
),
},
all: false,
},
)

Expand All @@ -2425,6 +2428,7 @@ DropTable(
21..27,
),
},
all: false,
},
)

Expand Down
4 changes: 2 additions & 2 deletions src/query/ee/tests/it/aggregating_index/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1110,8 +1110,8 @@ async fn test_fuzz_impl(format: &str, spill: bool) -> Result<()> {
}

// Clear data
execute_sql(fixture.ctx(), "DROP TABLE rt").await?;
execute_sql(fixture.ctx(), "DROP TABLE t").await?;
execute_sql(fixture.ctx(), "DROP TABLE rt ALL").await?;
execute_sql(fixture.ctx(), "DROP TABLE t ALL").await?;
}
}
Ok(())
Expand Down
18 changes: 18 additions & 0 deletions src/query/service/src/interpreters/interpreter_table_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

use std::sync::Arc;

use common_catalog::table::TableExt;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_app::schema::DropTableByIdReq;
use common_sql::plans::DropTablePlan;
use common_storages_fuse::FuseTable;
use common_storages_share::save_share_spec;
use common_storages_view::view_table::VIEW_ENGINE;

Expand Down Expand Up @@ -77,6 +79,22 @@ impl Interpreter for DropTableInterpreter {
})
.await?;

// if `plan.all`, truncate, then purge the historical data
if self.plan.all {
// the above `catalog.drop_table` operation changed the table meta version,
// thus if we do not refresh the table instance, `truncate` will fail
let latest = tbl.as_ref().refresh(self.ctx.as_ref()).await?;
let maybe_fuse_table = FuseTable::try_from_table(latest.as_ref());
// if target table if of type FuseTable, purge its historical data
// otherwise, plain truncate
if let Ok(fuse_table) = maybe_fuse_table {
let purge = true;
fuse_table.do_truncate(self.ctx.clone(), purge).await?
} else {
latest.truncate(self.ctx.clone()).await?
}
}

if let Some((spec_vec, share_table_info)) = resp.spec_vec {
save_share_spec(
&self.ctx.get_tenant(),
Expand Down
6 changes: 3 additions & 3 deletions src/query/service/tests/it/storages/fuse/operations/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ async fn test_fuse_purge_normal_orphan_snapshot() -> Result<()> {
"do_gc: there should be 1 snapshot, 0 segment/block",
expected_num_of_snapshot,
0, // 0 snapshot statistic
1, // 1 segments
1, // 1 blocks
1, // 1 index
1, // 0 segments
1, // 0 blocks
1, // 0 index
Some(()),
None,
)
Expand Down
30 changes: 30 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/purge_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use common_base::base::tokio;
use common_exception::Result;
use databend_query::test_kits::table_test_fixture::append_sample_data;
use databend_query::test_kits::table_test_fixture::check_data_dir;
use databend_query::test_kits::table_test_fixture::execute_command;
use databend_query::test_kits::table_test_fixture::TestFixture;

Expand All @@ -33,3 +34,32 @@ async fn test_fuse_snapshot_truncate_in_drop_stmt() -> Result<()> {
execute_command(ctx.clone(), qry.as_str()).await?;
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_fuse_snapshot_truncate_in_drop_all_stmt() -> Result<()> {
let fixture = TestFixture::new().await;
let db = fixture.default_db_name();
let tbl = fixture.default_table_name();
let ctx = fixture.ctx();
fixture.create_default_table().await?;

// ingests some test data
append_sample_data(1, &fixture).await?;
// let's Drop
let qry = format!("drop table {}.{} all", db, tbl);
execute_command(ctx.clone(), qry.as_str()).await?;

check_data_dir(
&fixture,
"drop table: there should be 1 snapshot, 0 segment/block",
1, // 1 snapshot
0, // 0 snapshot statistic
0, // 0 segments
0, // 0 blocks
0, // 0 index
None,
None,
)
.await?;
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async fn test_fuse_table_truncate_appending_concurrently() -> common_exception::
append_data(s1_table_to_be_truncated.clone()).await?;
let s2_table_to_appended = fixture.latest_default_table().await?;

// 4. perform `truncate purge` operation on s1
// 4. perform `truncate` operation on s1
let r = s1_table_to_be_truncated.truncate(ctx.clone()).await;
// version mismatched, and `truncate purge` should result in error (but nothing should have been removed)
assert!(r.is_err());
Expand Down
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ impl Binder {
catalog,
database,
table,
all,
} = stmt;

let tenant = self.ctx.get_tenant();
Expand All @@ -678,6 +679,7 @@ impl Binder {
catalog,
database,
table,
all: *all,
})))
}

Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/plans/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub struct DropTablePlan {
pub database: String,
/// The table name
pub table: String,
pub all: bool,
}

impl DropTablePlan {
Expand Down
3 changes: 2 additions & 1 deletion src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,8 @@ impl Table for FuseTable {
#[minitrace::trace(name = "fuse_table_truncate")]
#[async_backtrace::framed]
async fn truncate(&self, ctx: Arc<dyn TableContext>) -> Result<()> {
self.do_truncate(ctx).await
let purge = false;
self.do_truncate(ctx, purge).await
}

#[minitrace::trace(name = "fuse_table_optimize")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl BlockReader {
Suppose the name of table is T
~~~
create table tmp_t as select * from T;
drop table T;
drop table T all;
alter table tmp_t rename to T;
~~~
Please note that the history of table T WILL BE LOST.
Expand Down
6 changes: 4 additions & 2 deletions src/query/storages/fuse/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ impl FuseTable {
};
ctx.get_write_progress().incr(&progress_values);
// deleting the whole table... just a truncate
return self.do_truncate(ctx.clone()).await.map(|_| None);
let purge = false;
return self.do_truncate(ctx.clone(), purge).await.map(|_| None);
}
Some(filters) => filters,
};
Expand All @@ -121,7 +122,8 @@ impl FuseTable {
ctx.get_write_progress().incr(&progress_values);

// deleting the whole table... just a truncate
return self.do_truncate(ctx.clone()).await.map(|_| None);
let purge = false;
return self.do_truncate(ctx.clone(), purge).await.map(|_| None);
}
}
Ok(Some(snapshot.clone()))
Expand Down
20 changes: 18 additions & 2 deletions src/query/storages/fuse/src/operations/truncate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::FuseTable;
impl FuseTable {
#[inline]
#[async_backtrace::framed]
pub async fn do_truncate(&self, ctx: Arc<dyn TableContext>) -> Result<()> {
pub async fn do_truncate(&self, ctx: Arc<dyn TableContext>, purge: bool) -> Result<()> {
if let Some(prev_snapshot) = self.read_table_snapshot().await? {
// 1. prepare new snapshot
let prev_id = prev_snapshot.snapshot_id;
Expand Down Expand Up @@ -82,7 +82,6 @@ impl FuseTable {
})
.await?;

// best effort to remove the table's copied files.
catalog
.truncate_table(&self.table_info, TruncateTableReq {
table_id,
Expand All @@ -97,6 +96,23 @@ impl FuseTable {
new_snapshot_loc,
)
.await;

// best effort to remove historical data. if failed, let `vacuum` to do the job.
// TODO: consider remove the `purge` option from `truncate`
// - it is not a safe operation, there is NO retention interval protection here
// - it is incompatible with time travel features
if purge {
let snapshot_files = self.list_snapshot_files().await?;
let keep_last_snapshot = false;
let ret = self
.do_purge(&ctx, snapshot_files, None, keep_last_snapshot, false)
.await;
if let Err(e) = ret {
return Err(e);
} else {
return Ok(());
}
}
}

Ok(())
Expand Down
1 change: 1 addition & 0 deletions src/tests/sqlsmith/src/sql_gen/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl<'a, R: Rng> SqlGenerator<'a, R> {
catalog: None,
database: None,
table: Identifier::from_name(table_name.clone()),
all: false,
};
let create_table = CreateTableStmt {
if_not_exists: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ select count(*) > 0 from system.query_log
1

statement ok
drop table if exists tbl_01_0002
drop table if exists tbl_01_0002 all

statement ok
create table tbl_01_0002(a int)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
drop table if exists tbl_01_0007
drop table if exists tbl_01_0007 all

statement ok
create table tbl_01_0007(a int not null) cluster by(a)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ statement ok
DROP table t

statement ok
drop table if exists t_datetime
drop table if exists t_datetime all

statement ok
CREATE TABLE t_datetime(created_at Date, created_time DateTime, count Int32)
Expand Down
Loading

1 comment on commit 055c4c6

@vercel
Copy link

@vercel vercel bot commented on 055c4c6 Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.