Skip to content

Commit

Permalink
feat(cluster): support compression for query flight data (#13934)
Browse files Browse the repository at this point in the history
* feat(cluster): support compression for query flight data

* feat(cluster): support compression for query flight data

* feat(cluster): support compression for query flight data

* feat(cluster): support compression for query flight data
  • Loading branch information
zhang2014 authored Dec 7, 2023
1 parent ae4da22 commit 341ebe8
Show file tree
Hide file tree
Showing 13 changed files with 95 additions and 45 deletions.
9 changes: 7 additions & 2 deletions src/query/service/src/api/rpc/exchange/exchange_injector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_pipeline_core::Pipeline;
use common_settings::FlightCompression;

use crate::api::rpc::exchange::exchange_params::MergeExchangeParams;
use crate::api::rpc::exchange::serde::exchange_deserializer::TransformExchangeDeserializer;
Expand All @@ -42,12 +43,14 @@ pub trait ExchangeInjector: Send + Sync + 'static {
fn apply_merge_serializer(
&self,
params: &MergeExchangeParams,
compression: Option<FlightCompression>,
pipeline: &mut Pipeline,
) -> Result<()>;

fn apply_shuffle_serializer(
&self,
params: &ShuffleExchangeParams,
compression: Option<FlightCompression>,
pipeline: &mut Pipeline,
) -> Result<()>;

Expand Down Expand Up @@ -98,20 +101,22 @@ impl ExchangeInjector for DefaultExchangeInjector {
fn apply_merge_serializer(
&self,
params: &MergeExchangeParams,
compression: Option<FlightCompression>,
pipeline: &mut Pipeline,
) -> Result<()> {
pipeline.add_transform(|input, output| {
TransformExchangeSerializer::create(input, output, params)
TransformExchangeSerializer::create(input, output, params, compression)
})
}

fn apply_shuffle_serializer(
&self,
params: &ShuffleExchangeParams,
compression: Option<FlightCompression>,
pipeline: &mut Pipeline,
) -> Result<()> {
pipeline.add_transform(|input, output| {
TransformScatterExchangeSerializer::create(input, output, params)
TransformScatterExchangeSerializer::create(input, output, compression, params)
})
}

Expand Down
34 changes: 0 additions & 34 deletions src/query/service/src/api/rpc/exchange/exchange_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@

use std::sync::Arc;

use common_arrow::arrow::io::ipc::write::default_ipc_fields;
use common_arrow::arrow::io::ipc::write::WriteOptions;
use common_arrow::arrow::io::ipc::IpcField;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataSchemaRef;

use crate::api::rpc::flight_scatter::FlightScatter;
Expand Down Expand Up @@ -58,37 +55,6 @@ pub enum ExchangeParams {
ShuffleExchange(ShuffleExchangeParams),
}

impl MergeExchangeParams {
pub fn create_serialize_params(&self) -> Result<SerializeParams> {
let arrow_schema = self.schema.to_arrow();
let ipc_fields = default_ipc_fields(&arrow_schema.fields);
Ok(SerializeParams {
ipc_fields,
local_executor_pos: 0,
options: WriteOptions { compression: None },
})
}
}

impl ShuffleExchangeParams {
pub fn create_serialize_params(&self) -> Result<SerializeParams> {
let arrow_schema = self.schema.to_arrow();
let ipc_fields = default_ipc_fields(&arrow_schema.fields);

for (index, executor) in self.destination_ids.iter().enumerate() {
if executor == &self.executor_id {
return Ok(SerializeParams {
ipc_fields,
local_executor_pos: index,
options: WriteOptions { compression: None },
});
}
}

Err(ErrorCode::Internal("Not found local executor."))
}
}

impl ExchangeParams {
pub fn get_schema(&self) -> DataSchemaRef {
match self {
Expand Down
6 changes: 4 additions & 2 deletions src/query/service/src/api/rpc/exchange/exchange_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ impl ExchangeSink {
let exchange_injector = &params.exchange_injector;

if !params.ignore_exchange {
exchange_injector.apply_merge_serializer(params, pipeline)?;
let settings = ctx.get_settings();
let compression = settings.get_query_flight_compression()?;
exchange_injector.apply_merge_serializer(params, compression, pipeline)?;
}

if !params.ignore_exchange && exchange_injector.exchange_sorting().is_some() {
Expand Down Expand Up @@ -88,7 +90,7 @@ impl ExchangeSink {
Ok(())
}
ExchangeParams::ShuffleExchange(params) => {
exchange_shuffle(params, pipeline)?;
exchange_shuffle(ctx, params, pipeline)?;

// exchange writer sink
let len = pipeline.output_len();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl ExchangeTransform {
via_exchange_source(ctx.clone(), params, injector, pipeline)
}
ExchangeParams::ShuffleExchange(params) => {
exchange_shuffle(params, pipeline)?;
exchange_shuffle(ctx, params, pipeline)?;

// exchange writer sink and resize and exchange reader
let len = params.destination_ids.len();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::BlockMetaInfo;
Expand All @@ -39,6 +40,7 @@ use crate::api::rpc::exchange::exchange_sorting::ExchangeSorting;
use crate::api::rpc::exchange::exchange_sorting::TransformExchangeSorting;
use crate::api::rpc::exchange::exchange_transform_scatter::ScatterTransform;
use crate::api::rpc::exchange::serde::exchange_serializer::ExchangeSerializeMeta;
use crate::sessions::QueryContext;

pub struct ExchangeShuffleMeta {
pub blocks: Vec<DataBlock>,
Expand Down Expand Up @@ -394,7 +396,11 @@ impl ExchangeShuffleTransform {
}

// Scatter the data block and push it to the corresponding output port
pub fn exchange_shuffle(params: &ShuffleExchangeParams, pipeline: &mut Pipeline) -> Result<()> {
pub fn exchange_shuffle(
ctx: &Arc<QueryContext>,
params: &ShuffleExchangeParams,
pipeline: &mut Pipeline,
) -> Result<()> {
// append scatter transform
pipeline.add_transform(|input, output| {
Ok(ScatterTransform::create(
Expand All @@ -405,7 +411,10 @@ pub fn exchange_shuffle(params: &ShuffleExchangeParams, pipeline: &mut Pipeline)
})?;

let exchange_injector = &params.exchange_injector;
exchange_injector.apply_shuffle_serializer(params, pipeline)?;

let settings = ctx.get_settings();
let compression = settings.get_query_flight_compression()?;
exchange_injector.apply_shuffle_serializer(params, compression, pipeline)?;

let output_len = pipeline.output_len();
if let Some(exchange_sorting) = &exchange_injector.exchange_sorting() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_arrow::arrow::chunk::Chunk;
use common_arrow::arrow::io::flight::default_ipc_fields;
use common_arrow::arrow::io::flight::serialize_batch;
use common_arrow::arrow::io::flight::WriteOptions;
use common_arrow::arrow::io::ipc::write::Compression;
use common_arrow::arrow::io::ipc::IpcField;
use common_exception::ErrorCode;
use common_exception::Result;
Expand All @@ -35,6 +36,7 @@ use common_pipeline_transforms::processors::BlockMetaTransformer;
use common_pipeline_transforms::processors::Transform;
use common_pipeline_transforms::processors::Transformer;
use common_pipeline_transforms::processors::UnknownMode;
use common_settings::FlightCompression;
use serde::Deserializer;
use serde::Serializer;

Expand Down Expand Up @@ -99,15 +101,24 @@ impl TransformExchangeSerializer {
input: Arc<InputPort>,
output: Arc<OutputPort>,
params: &MergeExchangeParams,
compression: Option<FlightCompression>,
) -> Result<ProcessorPtr> {
let arrow_schema = params.schema.to_arrow();
let ipc_fields = default_ipc_fields(&arrow_schema.fields);
let compression = match compression {
None => None,
Some(compression) => match compression {
FlightCompression::Lz4 => Some(Compression::LZ4),
FlightCompression::Zstd => Some(Compression::ZSTD),
},
};

Ok(ProcessorPtr::create(Transformer::create(
input,
output,
TransformExchangeSerializer {
ipc_fields,
options: WriteOptions { compression: None },
options: WriteOptions { compression },
},
)))
}
Expand All @@ -131,17 +142,26 @@ impl TransformScatterExchangeSerializer {
pub fn create(
input: Arc<InputPort>,
output: Arc<OutputPort>,
compression: Option<FlightCompression>,
params: &ShuffleExchangeParams,
) -> Result<ProcessorPtr> {
let local_id = &params.executor_id;
let arrow_schema = params.schema.to_arrow();
let ipc_fields = default_ipc_fields(&arrow_schema.fields);
let compression = match compression {
None => None,
Some(compression) => match compression {
FlightCompression::Lz4 => Some(Compression::LZ4),
FlightCompression::Zstd => Some(Compression::ZSTD),
},
};

Ok(ProcessorPtr::create(BlockMetaTransformer::create(
input,
output,
TransformScatterExchangeSerializer {
ipc_fields,
options: WriteOptions { compression: None },
options: WriteOptions { compression },
local_pos: params
.destination_ids
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_hashtable::HashtableLike;
use common_pipeline_core::processors::ProcessorPtr;
use common_pipeline_core::query_spill_prefix;
use common_pipeline_core::Pipeline;
use common_settings::FlightCompression;
use common_storage::DataOperator;
use strength_reduce::StrengthReducedU64;

Expand Down Expand Up @@ -241,6 +242,7 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> ExchangeInjector
fn apply_merge_serializer(
&self,
_: &MergeExchangeParams,
_compression: Option<FlightCompression>,
pipeline: &mut Pipeline,
) -> Result<()> {
let method = &self.method;
Expand Down Expand Up @@ -289,6 +291,7 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> ExchangeInjector
fn apply_shuffle_serializer(
&self,
shuffle_params: &ShuffleExchangeParams,
compression: Option<FlightCompression>,
pipeline: &mut Pipeline,
) -> Result<()> {
let method = &self.method;
Expand Down Expand Up @@ -316,6 +319,7 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> ExchangeInjector
location_prefix.clone(),
schema.clone(),
local_pos,
compression,
),
false => TransformExchangeAggregateSerializer::create(
self.ctx.clone(),
Expand All @@ -325,6 +329,7 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> ExchangeInjector
operator.clone(),
location_prefix.clone(),
params.clone(),
compression,
schema.clone(),
local_pos,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::Instant;

use common_arrow::arrow::io::flight::default_ipc_fields;
use common_arrow::arrow::io::flight::WriteOptions;
use common_arrow::arrow::io::ipc::write::Compression;
use common_arrow::arrow::io::ipc::IpcField;
use common_base::base::GlobalUniqName;
use common_base::base::ProgressValues;
Expand All @@ -39,6 +40,7 @@ use common_pipeline_core::processors::OutputPort;
use common_pipeline_core::processors::Processor;
use common_pipeline_transforms::processors::BlockMetaTransform;
use common_pipeline_transforms::processors::BlockMetaTransformer;
use common_settings::FlightCompression;
use futures_util::future::BoxFuture;
use log::info;
use opendal::Operator;
Expand Down Expand Up @@ -81,11 +83,19 @@ impl<Method: HashMethodBounds> TransformExchangeAggregateSerializer<Method> {
operator: Operator,
location_prefix: String,
params: Arc<AggregatorParams>,
compression: Option<FlightCompression>,
schema: DataSchemaRef,
local_pos: usize,
) -> Box<dyn Processor> {
let arrow_schema = schema.to_arrow();
let ipc_fields = default_ipc_fields(&arrow_schema.fields);
let compression = match compression {
None => None,
Some(compression) => match compression {
FlightCompression::Lz4 => Some(Compression::LZ4),
FlightCompression::Zstd => Some(Compression::ZSTD),
},
};

BlockMetaTransformer::create(input, output, TransformExchangeAggregateSerializer::<
Method,
Expand All @@ -97,7 +107,7 @@ impl<Method: HashMethodBounds> TransformExchangeAggregateSerializer<Method> {
location_prefix,
local_pos,
ipc_fields,
options: WriteOptions { compression: None },
options: WriteOptions { compression },
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::time::Instant;

use common_arrow::arrow::io::flight::default_ipc_fields;
use common_arrow::arrow::io::flight::WriteOptions;
use common_arrow::arrow::io::ipc::write::Compression;
use common_arrow::arrow::io::ipc::IpcField;
use common_base::base::GlobalUniqName;
use common_base::base::ProgressValues;
Expand All @@ -45,6 +46,7 @@ use common_pipeline_core::processors::Processor;
use common_pipeline_transforms::processors::BlockMetaTransform;
use common_pipeline_transforms::processors::BlockMetaTransformer;
use common_pipeline_transforms::processors::UnknownMode;
use common_settings::FlightCompression;
use futures_util::future::BoxFuture;
use log::info;
use opendal::Operator;
Expand Down Expand Up @@ -84,9 +86,17 @@ impl<Method: HashMethodBounds> TransformExchangeGroupBySerializer<Method> {
location_prefix: String,
schema: DataSchemaRef,
local_pos: usize,
compression: Option<FlightCompression>,
) -> Box<dyn Processor> {
let arrow_schema = schema.to_arrow();
let ipc_fields = default_ipc_fields(&arrow_schema.fields);
let compression = match compression {
None => None,
Some(compression) => match compression {
FlightCompression::Lz4 => Some(Compression::LZ4),
FlightCompression::Zstd => Some(Compression::ZSTD),
},
};

BlockMetaTransformer::create(
input,
Expand All @@ -98,7 +108,7 @@ impl<Method: HashMethodBounds> TransformExchangeGroupBySerializer<Method> {
local_pos,
ipc_fields,
location_prefix,
options: WriteOptions { compression: None },
options: WriteOptions { compression },
},
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System
| 'parquet_max_block_size' | '8192' | '8192' | 'SESSION' | 'Max block size for parquet reader' | 'UInt64' |
| 'parquet_uncompressed_buffer_size' | '2097152' | '2097152' | 'SESSION' | 'Sets the byte size of the buffer used for reading Parquet files.' | 'UInt64' |
| 'prefer_broadcast_join' | '1' | '1' | 'SESSION' | 'Enables broadcast join.' | 'UInt64' |
| 'query_flight_compression' | 'LZ4' | 'LZ4' | 'SESSION' | 'flight compression method' | 'String' |
| 'query_result_cache_allow_inconsistent' | '0' | '0' | 'SESSION' | 'Determines whether Databend will return cached query results that are inconsistent with the underlying data.' | 'UInt64' |
| 'query_result_cache_max_bytes' | '1048576' | '1048576' | 'SESSION' | 'Sets the maximum byte size of cache for a single query result.' | 'UInt64' |
| 'query_result_cache_ttl_secs' | '300' | '300' | 'SESSION' | 'Sets the time-to-live (TTL) in seconds for cached query results. Once the TTL for a cached result has expired, the result is considered stale and will not be used for new queries.' | 'UInt64' |
Expand Down
1 change: 1 addition & 0 deletions src/query/settings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ pub use settings::ScopeLevel;
pub use settings::Settings;
pub use settings_default::ReplaceIntoShuffleStrategy;
pub use settings_default::SettingMode;
pub use settings_getter_setter::FlightCompression;
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,12 @@ impl DefaultSettings {
possible_values: None,
mode: SettingMode::Both,
}),
("query_flight_compression", DefaultSettingValue {
value: UserSettingValue::String(String::from("LZ4")),
desc: "flight compression method",
possible_values: Some(vec!["None", "LZ4", "ZSTD"]),
mode: SettingMode::Both,
}),
]);

Ok(Arc::new(DefaultSettings {
Expand Down
Loading

0 comments on commit 341ebe8

Please sign in to comment.