Skip to content

Commit

Permalink
Merge branch 'main' into temp_table_m_cte
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 authored Nov 28, 2024
2 parents 8e6eecd + 5b4e61a commit abff3c2
Show file tree
Hide file tree
Showing 22 changed files with 156 additions and 102 deletions.
6 changes: 3 additions & 3 deletions .github/actions/publish_binary/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ runs:
# Reference: https://cli.github.com/manual/gh_release_upload
run: |
gh release upload ${{ inputs.version }} ${{ steps.name.outputs.name }}.tar.gz --clobber
if [ -f ${{ steps.name.outputs.name }}-gdb.tar.gz ]; then
gh release upload ${{ inputs.version }} ${{ steps.name.outputs.name }}-dbg.* --clobber
if [ -f ${{ steps.name.outputs.name }}-dbg.tar.gz ]; then
gh release upload ${{ inputs.version }} ${{ steps.name.outputs.name }}-dbg.tar.gz --clobber
fi
- name: Sync normal release to R2
shell: bash
if: inputs.category == 'default'
run: |
aws s3 cp ${{ steps.name.outputs.name }}.tar.gz s3://repo/databend/${{ inputs.version }}/${{ steps.name.outputs.name }}.tar.gz --no-progress
if [ -f ${{ steps.name.outputs.name }}-gdb.tar.gz ]; then
if [ -f ${{ steps.name.outputs.name }}-dbg.tar.gz ]; then
aws s3 cp ${{ steps.name.outputs.name }}-dbg.tar.gz s3://repo/databend/${{ inputs.version }}/${{ steps.name.outputs.name }}-dbg.tar.gz --no-progress
fi
gh api /repos/databendlabs/databend/tags > tags.json
Expand Down
4 changes: 2 additions & 2 deletions .github/actions/setup_bendsql/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ runs:
if bendsql --version; then
exit 0
fi
curl --retry 5 -Lo /tmp/bendsql.tar.gz https://github.com/databendlabs/bendsql/releases/download/v0.18.3/bendsql-x86_64-unknown-linux-gnu.tar.gz
curl --retry 5 -Lo /tmp/bendsql.tar.gz https://github.com/databendlabs/bendsql/releases/download/v0.23.2/bendsql-x86_64-unknown-linux-gnu.tar.gz
tar -xzf /tmp/bendsql.tar.gz -C /tmp
mv /tmp/bendsql /usr/local/bin/bendsql
bendsql --version
Expand All @@ -21,7 +21,7 @@ runs:
if bendsql --version; then
exit 0
fi
curl --retry 5 -Lo /tmp/bendsql.tar.gz https://github.com/databendlabs/bendsql/releases/download/v0.18.3/bendsql-x86_64-apple-darwin.tar.gz
curl --retry 5 -Lo /tmp/bendsql.tar.gz https://github.com/databendlabs/bendsql/releases/download/v0.23.2/bendsql-x86_64-apple-darwin.tar.gz
tar -xzf /tmp/bendsql.tar.gz -C /tmp
mv /tmp/bendsql /usr/local/bin/bendsql
bendsql --version
4 changes: 4 additions & 0 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ impl TableInfo {
&self.meta.options
}

pub fn options_mut(&mut self) -> &mut BTreeMap<String, String> {
&mut self.meta.options
}

pub fn catalog(&self) -> &str {
&self.catalog_info.name_ident.catalog_name
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/ee/src/fail_safe/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl Amender {
Ok(None) => (),
Err(e) => {
if e.code() == ErrorCode::STORAGE_NOT_FOUND {
let snapshot_location = table.snapshot_loc().await?.unwrap();
let snapshot_location = table.snapshot_loc().unwrap();
self.recover_object(&snapshot_location).await?;
let snapshot = table.read_table_snapshot().await?;
let schema = table.schema();
Expand Down
2 changes: 1 addition & 1 deletion src/query/ee/src/storages/fuse/operations/vacuum_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub async fn get_snapshot_referenced_files(
ctx: &Arc<dyn TableContext>,
) -> Result<Option<SnapshotReferencedFiles>> {
// 1. Read the root snapshot.
let root_snapshot_location_op = fuse_table.snapshot_loc().await?;
let root_snapshot_location_op = fuse_table.snapshot_loc();
if root_snapshot_location_op.is_none() {
return Ok(None);
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/common/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub async fn dml_build_update_stream_req(
let table_version = inner_fuse.get_table_info().ident.seq;
let mut options = stream.options().clone();
options.insert(OPT_KEY_TABLE_VER.to_string(), table_version.to_string());
if let Some(snapshot_loc) = inner_fuse.snapshot_loc().await? {
if let Some(snapshot_loc) = inner_fuse.snapshot_loc() {
options.insert(OPT_KEY_SNAPSHOT_LOCATION.to_string(), snapshot_loc);
}

Expand Down Expand Up @@ -147,7 +147,7 @@ pub async fn query_build_update_stream_req(
let table_version = inner_fuse.get_table_info().ident.seq;
let mut options = stream.options().clone();
options.insert(OPT_KEY_TABLE_VER.to_string(), table_version.to_string());
if let Some(snapshot_loc) = inner_fuse.snapshot_loc().await? {
if let Some(snapshot_loc) = inner_fuse.snapshot_loc() {
options.insert(OPT_KEY_SNAPSHOT_LOCATION.to_string(), snapshot_loc);
}
let mut new_table_meta = stream_info.meta.clone();
Expand Down
39 changes: 20 additions & 19 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,15 @@ async fn compact_table(
.await?;
let settings = ctx.get_settings();

let do_recluster = !table.cluster_keys(ctx.clone()).is_empty();
let do_compact = compaction_limits.block_limit.is_some() || !do_recluster;

// evict the table from cache
ctx.evict_table_from_cache(
&compact_target.catalog,
&compact_target.database,
&compact_target.table,
)?;

if do_compact {
{
// do compact.
let compact_block = RelOperator::CompactBlock(OptimizeCompactBlock {
catalog: compact_target.catalog.clone(),
database: compact_target.database.clone(),
Expand Down Expand Up @@ -191,21 +189,24 @@ async fn compact_table(
}
}

if do_recluster {
let recluster = RelOperator::Recluster(Recluster {
catalog: compact_target.catalog,
database: compact_target.database,
table: compact_target.table,
filters: None,
limit: Some(settings.get_auto_compaction_segments_limit()? as usize),
});
let s_expr = SExpr::create_leaf(Arc::new(recluster));
let recluster_interpreter =
ReclusterTableInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?;
// Recluster will be done in `ReclusterTableInterpreter::execute2` directly,
// we do not need to use `PipelineCompleteExecutor` to execute it.
let build_res = recluster_interpreter.execute2().await?;
assert!(build_res.main_pipeline.is_empty());
{
// do recluster.
if !table.cluster_keys(ctx.clone()).is_empty() {
let recluster = RelOperator::Recluster(Recluster {
catalog: compact_target.catalog,
database: compact_target.database,
table: compact_target.table,
filters: None,
limit: Some(settings.get_auto_compaction_segments_limit()? as usize),
});
let s_expr = SExpr::create_leaf(Arc::new(recluster));
let recluster_interpreter =
ReclusterTableInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?;
// Recluster will be done in `ReclusterTableInterpreter::execute2` directly,
// we do not need to use `PipelineCompleteExecutor` to execute it.
let build_res = recluster_interpreter.execute2().await?;
assert!(build_res.main_pipeline.is_empty());
}
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/test_kits/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub async fn check_data_dir(
if check_last_snapshot.is_some() {
let table = fixture.latest_default_table().await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let snapshot_loc = fuse_table.snapshot_loc().await?;
let snapshot_loc = fuse_table.snapshot_loc();
let snapshot_loc = snapshot_loc.unwrap();
assert!(last_snapshot_loc.contains(&snapshot_loc));
assert_eq!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ async fn test_last_snapshot_hint() -> Result<()> {
// check last snapshot hit file
let table = fixture.latest_default_table().await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let last_snapshot_location = fuse_table.snapshot_loc().await?.unwrap();
let last_snapshot_location = fuse_table.snapshot_loc().unwrap();
let operator = fuse_table.get_operator();
let location = fuse_table
.meta_location_generator()
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/tests/it/storages/fuse/operations/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ async fn test_fuse_purge_older_version() -> Result<()> {
let fuse_table = FuseTable::try_from_table(latest_table.as_ref())?;
let snapshot_files = fuse_table.list_snapshot_files().await?;
let time_point = now - Duration::hours(12);
let snapshot_loc = fuse_table.snapshot_loc().await?.unwrap();
let snapshot_loc = fuse_table.snapshot_loc().unwrap();
let table = fuse_table
.navigate_to_time_point(snapshot_loc, time_point, ctx.clone().get_abort_checker())
.await?;
Expand Down
11 changes: 3 additions & 8 deletions src/query/service/tests/it/storages/fuse/operations/navigate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ async fn test_fuse_navigate() -> Result<()> {
let table = fixture.latest_default_table().await?;
let first_snapshot = FuseTable::try_from_table(table.as_ref())?
.snapshot_loc()
.await?
.unwrap();

// take a nap
Expand All @@ -73,15 +72,14 @@ async fn test_fuse_navigate() -> Result<()> {
let table = fixture.latest_default_table().await?;
let second_snapshot = FuseTable::try_from_table(table.as_ref())?
.snapshot_loc()
.await?
.unwrap();
assert_ne!(second_snapshot, first_snapshot);

// 2. grab the history
let table = fixture.latest_default_table().await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let reader = MetaReaders::table_snapshot_reader(fuse_table.get_operator());
let loc = fuse_table.snapshot_loc().await?.unwrap();
let loc = fuse_table.snapshot_loc().unwrap();
assert_eq!(second_snapshot, loc);
let version = TableMetaLocationGenerator::snapshot_version(loc.as_str());
let snapshots: Vec<_> = reader
Expand Down Expand Up @@ -111,7 +109,7 @@ async fn test_fuse_navigate() -> Result<()> {
.await?;

// check we got the snapshot of the first insertion
assert_eq!(first_snapshot, tbl.snapshot_loc().await?.unwrap());
assert_eq!(first_snapshot, tbl.snapshot_loc().unwrap());

// 4. navigate beyond the first snapshot
let (first_insertion, _ver) = &snapshots[1];
Expand Down Expand Up @@ -170,7 +168,6 @@ async fn test_navigate_for_purge() -> Result<()> {
let table = fixture.latest_default_table().await?;
let _first_snapshot = FuseTable::try_from_table(table.as_ref())?
.snapshot_loc()
.await?
.unwrap();

// take a nap
Expand All @@ -187,7 +184,6 @@ async fn test_navigate_for_purge() -> Result<()> {
let table = fixture.latest_default_table().await?;
let second_snapshot = FuseTable::try_from_table(table.as_ref())?
.snapshot_loc()
.await?
.unwrap();

// take a nap
Expand All @@ -203,14 +199,13 @@ async fn test_navigate_for_purge() -> Result<()> {
let table = fixture.latest_default_table().await?;
let third_snapshot = FuseTable::try_from_table(table.as_ref())?
.snapshot_loc()
.await?
.unwrap();

// 2. grab the history
let table = fixture.latest_default_table().await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let reader = MetaReaders::table_snapshot_reader(fuse_table.get_operator());
let loc = fuse_table.snapshot_loc().await?.unwrap();
let loc = fuse_table.snapshot_loc().unwrap();
assert_eq!(third_snapshot, loc);
let version = TableMetaLocationGenerator::snapshot_version(loc.as_str());
let snapshots: Vec<_> = reader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub const OPT_KEY_DATABASE_ID: &str = "database_id";
pub const OPT_KEY_STORAGE_PREFIX: &str = "storage_prefix";
pub const OPT_KEY_TEMP_PREFIX: &str = "temp_prefix";
pub const OPT_KEY_SNAPSHOT_LOCATION: &str = "snapshot_location";
pub const OPT_KEY_SNAPSHOT_LOCATION_FIXED_FLAG: &str = "snapshot_location_fixed";
pub const OPT_KEY_STORAGE_FORMAT: &str = "storage_format";
pub const OPT_KEY_TABLE_COMPRESSION: &str = "compression";
pub const OPT_KEY_COMMENT: &str = "comment";
Expand Down
Loading

0 comments on commit abff3c2

Please sign in to comment.