Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cluster): support compression for query flight data #13934

Merged
merged 6 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/query/service/src/api/rpc/exchange/exchange_injector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_pipeline_core::Pipeline;
use common_settings::FlightCompression;

use crate::api::rpc::exchange::exchange_params::MergeExchangeParams;
use crate::api::rpc::exchange::serde::exchange_deserializer::TransformExchangeDeserializer;
Expand All @@ -42,12 +43,14 @@ pub trait ExchangeInjector: Send + Sync + 'static {
fn apply_merge_serializer(
&self,
params: &MergeExchangeParams,
compression: Option<FlightCompression>,
pipeline: &mut Pipeline,
) -> Result<()>;

fn apply_shuffle_serializer(
&self,
params: &ShuffleExchangeParams,
compression: Option<FlightCompression>,
pipeline: &mut Pipeline,
) -> Result<()>;

Expand Down Expand Up @@ -98,20 +101,22 @@ impl ExchangeInjector for DefaultExchangeInjector {
fn apply_merge_serializer(
&self,
params: &MergeExchangeParams,
compression: Option<FlightCompression>,
pipeline: &mut Pipeline,
) -> Result<()> {
pipeline.add_transform(|input, output| {
TransformExchangeSerializer::create(input, output, params)
TransformExchangeSerializer::create(input, output, params, compression)
})
}

fn apply_shuffle_serializer(
&self,
params: &ShuffleExchangeParams,
compression: Option<FlightCompression>,
pipeline: &mut Pipeline,
) -> Result<()> {
pipeline.add_transform(|input, output| {
TransformScatterExchangeSerializer::create(input, output, params)
TransformScatterExchangeSerializer::create(input, output, compression, params)
})
}

Expand Down
34 changes: 0 additions & 34 deletions src/query/service/src/api/rpc/exchange/exchange_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@

use std::sync::Arc;

use common_arrow::arrow::io::ipc::write::default_ipc_fields;
use common_arrow::arrow::io::ipc::write::WriteOptions;
use common_arrow::arrow::io::ipc::IpcField;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataSchemaRef;

use crate::api::rpc::flight_scatter::FlightScatter;
Expand Down Expand Up @@ -58,37 +55,6 @@ pub enum ExchangeParams {
ShuffleExchange(ShuffleExchangeParams),
}

impl MergeExchangeParams {
pub fn create_serialize_params(&self) -> Result<SerializeParams> {
let arrow_schema = self.schema.to_arrow();
let ipc_fields = default_ipc_fields(&arrow_schema.fields);
Ok(SerializeParams {
ipc_fields,
local_executor_pos: 0,
options: WriteOptions { compression: None },
})
}
}

impl ShuffleExchangeParams {
pub fn create_serialize_params(&self) -> Result<SerializeParams> {
let arrow_schema = self.schema.to_arrow();
let ipc_fields = default_ipc_fields(&arrow_schema.fields);

for (index, executor) in self.destination_ids.iter().enumerate() {
if executor == &self.executor_id {
return Ok(SerializeParams {
ipc_fields,
local_executor_pos: index,
options: WriteOptions { compression: None },
});
}
}

Err(ErrorCode::Internal("Not found local executor."))
}
}

impl ExchangeParams {
pub fn get_schema(&self) -> DataSchemaRef {
match self {
Expand Down
6 changes: 4 additions & 2 deletions src/query/service/src/api/rpc/exchange/exchange_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ impl ExchangeSink {
let exchange_injector = &params.exchange_injector;

if !params.ignore_exchange {
exchange_injector.apply_merge_serializer(params, pipeline)?;
let settings = ctx.get_settings();
let compression = settings.get_query_flight_compression()?;
exchange_injector.apply_merge_serializer(params, compression, pipeline)?;
}

if !params.ignore_exchange && exchange_injector.exchange_sorting().is_some() {
Expand Down Expand Up @@ -88,7 +90,7 @@ impl ExchangeSink {
Ok(())
}
ExchangeParams::ShuffleExchange(params) => {
exchange_shuffle(params, pipeline)?;
exchange_shuffle(ctx, params, pipeline)?;

// exchange writer sink
let len = pipeline.output_len();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl ExchangeTransform {
via_exchange_source(ctx.clone(), params, injector, pipeline)
}
ExchangeParams::ShuffleExchange(params) => {
exchange_shuffle(params, pipeline)?;
exchange_shuffle(ctx, params, pipeline)?;

// exchange writer sink and resize and exchange reader
let len = params.destination_ids.len();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::BlockMetaInfo;
Expand All @@ -39,6 +40,7 @@ use crate::api::rpc::exchange::exchange_sorting::ExchangeSorting;
use crate::api::rpc::exchange::exchange_sorting::TransformExchangeSorting;
use crate::api::rpc::exchange::exchange_transform_scatter::ScatterTransform;
use crate::api::rpc::exchange::serde::exchange_serializer::ExchangeSerializeMeta;
use crate::sessions::QueryContext;

pub struct ExchangeShuffleMeta {
pub blocks: Vec<DataBlock>,
Expand Down Expand Up @@ -394,7 +396,11 @@ impl ExchangeShuffleTransform {
}

// Scatter the data block and push it to the corresponding output port
pub fn exchange_shuffle(params: &ShuffleExchangeParams, pipeline: &mut Pipeline) -> Result<()> {
pub fn exchange_shuffle(
ctx: &Arc<QueryContext>,
params: &ShuffleExchangeParams,
pipeline: &mut Pipeline,
) -> Result<()> {
// append scatter transform
pipeline.add_transform(|input, output| {
Ok(ScatterTransform::create(
Expand All @@ -405,7 +411,10 @@ pub fn exchange_shuffle(params: &ShuffleExchangeParams, pipeline: &mut Pipeline)
})?;

let exchange_injector = &params.exchange_injector;
exchange_injector.apply_shuffle_serializer(params, pipeline)?;

let settings = ctx.get_settings();
let compression = settings.get_query_flight_compression()?;
exchange_injector.apply_shuffle_serializer(params, compression, pipeline)?;

let output_len = pipeline.output_len();
if let Some(exchange_sorting) = &exchange_injector.exchange_sorting() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_arrow::arrow::chunk::Chunk;
use common_arrow::arrow::io::flight::default_ipc_fields;
use common_arrow::arrow::io::flight::serialize_batch;
use common_arrow::arrow::io::flight::WriteOptions;
use common_arrow::arrow::io::ipc::write::Compression;
use common_arrow::arrow::io::ipc::IpcField;
use common_exception::ErrorCode;
use common_exception::Result;
Expand All @@ -35,6 +36,7 @@ use common_pipeline_transforms::processors::BlockMetaTransformer;
use common_pipeline_transforms::processors::Transform;
use common_pipeline_transforms::processors::Transformer;
use common_pipeline_transforms::processors::UnknownMode;
use common_settings::FlightCompression;
use serde::Deserializer;
use serde::Serializer;

Expand Down Expand Up @@ -99,15 +101,24 @@ impl TransformExchangeSerializer {
input: Arc<InputPort>,
output: Arc<OutputPort>,
params: &MergeExchangeParams,
compression: Option<FlightCompression>,
) -> Result<ProcessorPtr> {
let arrow_schema = params.schema.to_arrow();
let ipc_fields = default_ipc_fields(&arrow_schema.fields);
let compression = match compression {
None => None,
Some(compression) => match compression {
FlightCompression::Lz4 => Some(Compression::LZ4),
FlightCompression::Zstd => Some(Compression::ZSTD),
},
};

Ok(ProcessorPtr::create(Transformer::create(
input,
output,
TransformExchangeSerializer {
ipc_fields,
options: WriteOptions { compression: None },
options: WriteOptions { compression },
},
)))
}
Expand All @@ -131,17 +142,26 @@ impl TransformScatterExchangeSerializer {
pub fn create(
input: Arc<InputPort>,
output: Arc<OutputPort>,
compression: Option<FlightCompression>,
params: &ShuffleExchangeParams,
) -> Result<ProcessorPtr> {
let local_id = &params.executor_id;
let arrow_schema = params.schema.to_arrow();
let ipc_fields = default_ipc_fields(&arrow_schema.fields);
let compression = match compression {
None => None,
Some(compression) => match compression {
FlightCompression::Lz4 => Some(Compression::LZ4),
FlightCompression::Zstd => Some(Compression::ZSTD),
},
};

Ok(ProcessorPtr::create(BlockMetaTransformer::create(
input,
output,
TransformScatterExchangeSerializer {
ipc_fields,
options: WriteOptions { compression: None },
options: WriteOptions { compression },
local_pos: params
.destination_ids
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_hashtable::HashtableLike;
use common_pipeline_core::processors::ProcessorPtr;
use common_pipeline_core::query_spill_prefix;
use common_pipeline_core::Pipeline;
use common_settings::FlightCompression;
use common_storage::DataOperator;
use strength_reduce::StrengthReducedU64;

Expand Down Expand Up @@ -241,6 +242,7 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> ExchangeInjector
fn apply_merge_serializer(
&self,
_: &MergeExchangeParams,
_compression: Option<FlightCompression>,
pipeline: &mut Pipeline,
) -> Result<()> {
let method = &self.method;
Expand Down Expand Up @@ -289,6 +291,7 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> ExchangeInjector
fn apply_shuffle_serializer(
&self,
shuffle_params: &ShuffleExchangeParams,
compression: Option<FlightCompression>,
pipeline: &mut Pipeline,
) -> Result<()> {
let method = &self.method;
Expand Down Expand Up @@ -316,6 +319,7 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> ExchangeInjector
location_prefix.clone(),
schema.clone(),
local_pos,
compression,
),
false => TransformExchangeAggregateSerializer::create(
self.ctx.clone(),
Expand All @@ -325,6 +329,7 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> ExchangeInjector
operator.clone(),
location_prefix.clone(),
params.clone(),
compression,
schema.clone(),
local_pos,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::Instant;

use common_arrow::arrow::io::flight::default_ipc_fields;
use common_arrow::arrow::io::flight::WriteOptions;
use common_arrow::arrow::io::ipc::write::Compression;
use common_arrow::arrow::io::ipc::IpcField;
use common_base::base::GlobalUniqName;
use common_base::base::ProgressValues;
Expand All @@ -39,6 +40,7 @@ use common_pipeline_core::processors::OutputPort;
use common_pipeline_core::processors::Processor;
use common_pipeline_transforms::processors::BlockMetaTransform;
use common_pipeline_transforms::processors::BlockMetaTransformer;
use common_settings::FlightCompression;
use futures_util::future::BoxFuture;
use log::info;
use opendal::Operator;
Expand Down Expand Up @@ -81,11 +83,19 @@ impl<Method: HashMethodBounds> TransformExchangeAggregateSerializer<Method> {
operator: Operator,
location_prefix: String,
params: Arc<AggregatorParams>,
compression: Option<FlightCompression>,
schema: DataSchemaRef,
local_pos: usize,
) -> Box<dyn Processor> {
let arrow_schema = schema.to_arrow();
let ipc_fields = default_ipc_fields(&arrow_schema.fields);
let compression = match compression {
None => None,
Some(compression) => match compression {
FlightCompression::Lz4 => Some(Compression::LZ4),
FlightCompression::Zstd => Some(Compression::ZSTD),
},
};

BlockMetaTransformer::create(input, output, TransformExchangeAggregateSerializer::<
Method,
Expand All @@ -97,7 +107,7 @@ impl<Method: HashMethodBounds> TransformExchangeAggregateSerializer<Method> {
location_prefix,
local_pos,
ipc_fields,
options: WriteOptions { compression: None },
options: WriteOptions { compression },
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::time::Instant;

use common_arrow::arrow::io::flight::default_ipc_fields;
use common_arrow::arrow::io::flight::WriteOptions;
use common_arrow::arrow::io::ipc::write::Compression;
use common_arrow::arrow::io::ipc::IpcField;
use common_base::base::GlobalUniqName;
use common_base::base::ProgressValues;
Expand All @@ -45,6 +46,7 @@ use common_pipeline_core::processors::Processor;
use common_pipeline_transforms::processors::BlockMetaTransform;
use common_pipeline_transforms::processors::BlockMetaTransformer;
use common_pipeline_transforms::processors::UnknownMode;
use common_settings::FlightCompression;
use futures_util::future::BoxFuture;
use log::info;
use opendal::Operator;
Expand Down Expand Up @@ -84,9 +86,17 @@ impl<Method: HashMethodBounds> TransformExchangeGroupBySerializer<Method> {
location_prefix: String,
schema: DataSchemaRef,
local_pos: usize,
compression: Option<FlightCompression>,
) -> Box<dyn Processor> {
let arrow_schema = schema.to_arrow();
let ipc_fields = default_ipc_fields(&arrow_schema.fields);
let compression = match compression {
None => None,
Some(compression) => match compression {
FlightCompression::Lz4 => Some(Compression::LZ4),
FlightCompression::Zstd => Some(Compression::ZSTD),
},
};

BlockMetaTransformer::create(
input,
Expand All @@ -98,7 +108,7 @@ impl<Method: HashMethodBounds> TransformExchangeGroupBySerializer<Method> {
local_pos,
ipc_fields,
location_prefix,
options: WriteOptions { compression: None },
options: WriteOptions { compression },
},
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System
| 'parquet_max_block_size' | '8192' | '8192' | 'SESSION' | 'Max block size for parquet reader' | 'UInt64' |
| 'parquet_uncompressed_buffer_size' | '2097152' | '2097152' | 'SESSION' | 'Sets the byte size of the buffer used for reading Parquet files.' | 'UInt64' |
| 'prefer_broadcast_join' | '1' | '1' | 'SESSION' | 'Enables broadcast join.' | 'UInt64' |
| 'query_flight_compression' | 'None' | 'None' | 'SESSION' | 'flight compression method' | 'String' |
| 'query_result_cache_allow_inconsistent' | '0' | '0' | 'SESSION' | 'Determines whether Databend will return cached query results that are inconsistent with the underlying data.' | 'UInt64' |
| 'query_result_cache_max_bytes' | '1048576' | '1048576' | 'SESSION' | 'Sets the maximum byte size of cache for a single query result.' | 'UInt64' |
| 'query_result_cache_ttl_secs' | '300' | '300' | 'SESSION' | 'Sets the time-to-live (TTL) in seconds for cached query results. Once the TTL for a cached result has expired, the result is considered stale and will not be used for new queries.' | 'UInt64' |
Expand Down
1 change: 1 addition & 0 deletions src/query/settings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ pub use settings::ScopeLevel;
pub use settings::Settings;
pub use settings_default::ReplaceIntoShuffleStrategy;
pub use settings_default::SettingMode;
pub use settings_getter_setter::FlightCompression;
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,12 @@ impl DefaultSettings {
possible_values: None,
mode: SettingMode::Both,
}),
("query_flight_compression", DefaultSettingValue {
value: UserSettingValue::String(String::from("Lz4")),
desc: "flight compression method",
possible_values: Some(vec!["None", "Lz4", "Zstd"]),
mode: SettingMode::Both,
}),
zhang2014 marked this conversation as resolved.
Show resolved Hide resolved
]);

Ok(Arc::new(DefaultSettings {
Expand Down
Loading
Loading