Skip to content

Commit

Permalink
Merge branch 'main' of github.com:datafuselabs/databend into vectoriz…
Browse files Browse the repository at this point in the history
…ed_hash_join
  • Loading branch information
Dousir9 committed Oct 7, 2023
2 parents ab7139f + 6f76ae6 commit 39e704c
Show file tree
Hide file tree
Showing 49 changed files with 731 additions and 323 deletions.
14 changes: 14 additions & 0 deletions .github/workflows/bindings.python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ jobs:
- x86_64-unknown-linux-gnu
- aarch64-unknown-linux-gnu
steps:
- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@main
with:
# this might remove tools that are actually needed,
# if set to "true" but frees about 6 GB
tool-cache: false
# all of these default to true, but feel free to set to
# "false" if necessary for your workflow
android: true
dotnet: true
haskell: true
large-packages: false
docker-images: true
swap-storage: true
- uses: actions/checkout@v4
with:
fetch-depth: 0
Expand Down
25 changes: 24 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ lazy_static = "1.4.0"
# future and async
futures = "0.3.24"
futures-util = "0.3.24"
futures-async-stream = { version = "0.2.7" }
stream-more = "0.1.3"
bytes = "1.5.0"

Expand Down Expand Up @@ -231,7 +232,7 @@ rpath = false
[patch.crates-io]
# If there are dependencies that need patching, they can be listed below.

arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "231a6fa" }
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "dd80c89" }
arrow-format = { git = "https://github.com/sundy-li/arrow-format", rev = "c8e11341" }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "b0e6545" }
metrics = { git = "https://github.com/datafuse-extras/metrics.git", rev = "fc2ecd1" }
Expand Down
2 changes: 1 addition & 1 deletion benchmark/clickbench/benchmark_local_merge_into.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ function run_query() {
fi
}

TRIES=3
TRIES=1
QUERY_NUM=0
while read -r query; do
echo "Running Q${QUERY_NUM}: ${query}"
Expand Down
2 changes: 2 additions & 0 deletions benchmark/clickbench/merge_into/queries.sql
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
set enable_experimental_merge_into = 1;merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *;
set enable_experimental_merge_into = 1;merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *;
set enable_experimental_merge_into = 1;merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *;
12 changes: 9 additions & 3 deletions src/bendpy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod schema;
mod utils;

use std::env;
use std::path::Path;

use common_config::Config;
use common_config::InnerConfig;
Expand All @@ -37,15 +38,20 @@ use utils::RUNTIME;
/// A Python module implemented in Rust.
#[pymodule]
fn databend(_py: Python, m: &PyModule) -> PyResult<()> {
env::set_var("META_EMBEDDED_DIR", ".databend/_meta");
let data_path = env::var("DATABEND_DATA_PATH").unwrap_or(".databend/".to_string());
let path = Path::new(&data_path);

env::set_var("META_EMBEDDED_DIR", path.join("_meta"));

let mut conf: InnerConfig = Config::load(false).unwrap().try_into().unwrap();
conf.storage.allow_insecure = true;
conf.storage.params = StorageParams::Fs(StorageFsConfig {
root: ".databend/_data".to_string(),
root: path.join("_data").to_str().unwrap().to_owned(),
});

RUNTIME.block_on(async {
MetaEmbedded::init_global_meta_store(".databend/_meta".to_string())
let meta_dir = path.join("_meta");
MetaEmbedded::init_global_meta_store(meta_dir.to_string_lossy().to_string())
.await
.unwrap();
GlobalServices::init(conf.clone()).await.unwrap();
Expand Down
10 changes: 7 additions & 3 deletions src/binaries/metactl/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::net::SocketAddr;
use std::net::ToSocketAddrs;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::anyhow;
use common_base::base::tokio;
Expand Down Expand Up @@ -55,6 +56,7 @@ use common_meta_types::NodeId;
use common_meta_types::StoredMembership;
use databend_meta::store::RaftStore;
use databend_meta::store::StoreInner;
use futures::TryStreamExt;
use tokio::net::TcpSocket;
use url::Url;

Expand Down Expand Up @@ -467,7 +469,7 @@ async fn export_from_dir(config: &Config) -> anyhow::Result<()> {
let raft_config: RaftConfig = config.raft_config.clone().into();

let sto_inn = StoreInner::open_create(&raft_config, Some(()), None).await?;
let lines = sto_inn.export().await?;
let mut lines = Arc::new(sto_inn).export();

eprintln!(" From: {}", raft_config.raft_dir);

Expand All @@ -479,9 +481,11 @@ async fn export_from_dir(config: &Config) -> anyhow::Result<()> {
None
};

let cnt = lines.len();
let mut cnt = 0;

while let Some(line) = lines.try_next().await? {
cnt += 1;

for line in lines {
if file.as_ref().is_none() {
println!("{}", line);
} else {
Expand Down
37 changes: 34 additions & 3 deletions src/meta/client/src/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
Expand Down Expand Up @@ -161,14 +163,23 @@ impl ClientHandle {
Req: Into<message::Request>,
Result<Resp, E>: TryFrom<message::Response>,
<Result<Resp, E> as TryFrom<message::Response>>::Error: std::fmt::Display,
E: From<MetaClientError>,
E: From<MetaClientError> + Debug,
{
static META_REQUEST_ID: AtomicU64 = AtomicU64::new(1);

let request_future = async move {
let (tx, rx) = oneshot::channel();
let req = message::ClientWorkerRequest {
request_id: META_REQUEST_ID.fetch_add(1, Ordering::Relaxed),
resp_tx: tx,
req: req.into(),
};

debug!(
request = as_debug!(&req);
"Meta ClientHandle send request to meta client worker"
);

grpc_metrics::incr_meta_grpc_client_request_inflight(1);

let res = self.req_tx.send(req).await.map_err(|e| {
Expand All @@ -181,12 +192,22 @@ impl ClientHandle {
if let Err(err) = res {
grpc_metrics::incr_meta_grpc_client_request_inflight(-1);

error!(
error = as_debug!(&err);
"Meta ClientHandle send request to meta client worker failed"
);

return Err(err);
}

let res = rx.await.map_err(|e| {
grpc_metrics::incr_meta_grpc_client_request_inflight(-1);

error!(
error = as_debug!(&e);
"Meta ClientHandle recv response from meta client worker failed"
);

MetaClientError::ClientRuntimeError(
AnyError::new(&e).add_context(|| "when recv resp from MetaGrpcClient worker"),
)
Expand Down Expand Up @@ -362,6 +383,7 @@ impl MetaGrpcClient {
continue;
}

let request_id = req.request_id;
let resp_tx = req.resp_tx;
let req = req.req;
let req_name = req.name();
Expand Down Expand Up @@ -428,6 +450,7 @@ impl MetaGrpcClient {
};

debug!(
request_id = as_debug!(&request_id),
resp = as_debug!(&resp);
"MetaGrpcClient send response to the handle"
);
Expand All @@ -446,8 +469,12 @@ impl MetaGrpcClient {
);
if elapsed > 1000_f64 {
warn!(
request_id = as_display!(request_id);
"MetaGrpcClient slow request {} to {} takes {} ms: {}",
req_name, current_endpoint, elapsed, req_str,
req_name,
current_endpoint,
elapsed,
req_str,
);
}

Expand All @@ -457,7 +484,10 @@ impl MetaGrpcClient {
req_name,
&err.to_string(),
);
error!("MetaGrpcClient error: {:?}", err);
error!(
request_id = as_display!(request_id);
"MetaGrpcClient error: {:?}", err
);
} else {
grpc_metrics::incr_meta_grpc_client_request_success(
&current_endpoint,
Expand All @@ -469,6 +499,7 @@ impl MetaGrpcClient {
let send_res = resp_tx.send(resp);
if let Err(err) = send_res {
error!(
request_id = as_display!(request_id),
err = as_debug!(&err);
"MetaGrpcClient failed to send response to the handle. recv-end closed"
);
Expand Down
14 changes: 13 additions & 1 deletion src/meta/client/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use common_base::base::tokio::sync::oneshot::Sender;
use common_meta_kvapi::kvapi::GetKVReply;
use common_meta_kvapi::kvapi::GetKVReq;
Expand All @@ -36,15 +38,25 @@ use tonic::transport::Channel;
use crate::grpc_client::AuthInterceptor;

/// A request that is sent by a meta-client handle to its worker.
#[derive(Debug)]
pub struct ClientWorkerRequest {
pub(crate) request_id: u64,

/// For sending back the response to the handle.
pub(crate) resp_tx: Sender<Response>,

/// Request body
pub(crate) req: Request,
}

impl fmt::Debug for ClientWorkerRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ClientWorkerRequest")
.field("request_id", &self.request_id)
.field("req", &self.req)
.finish()
}
}

/// Meta-client handle-to-worker request body
#[derive(Debug, Clone, derive_more::From)]
pub enum Request {
Expand Down
6 changes: 3 additions & 3 deletions src/meta/client/tests/it/grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct GrpcServiceForTestImpl {}
#[tonic::async_trait]
impl MetaService for GrpcServiceForTestImpl {
type HandshakeStream =
Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send + Sync + 'static>>;
Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send + 'static>>;

async fn handshake(
&self,
Expand All @@ -69,7 +69,7 @@ impl MetaService for GrpcServiceForTestImpl {
}

type ExportStream =
Pin<Box<dyn Stream<Item = Result<ExportedChunk, tonic::Status>> + Send + Sync + 'static>>;
Pin<Box<dyn Stream<Item = Result<ExportedChunk, tonic::Status>> + Send + 'static>>;

async fn export(
&self,
Expand All @@ -79,7 +79,7 @@ impl MetaService for GrpcServiceForTestImpl {
}

type WatchStream =
Pin<Box<dyn Stream<Item = Result<WatchResponse, tonic::Status>> + Send + Sync + 'static>>;
Pin<Box<dyn Stream<Item = Result<WatchResponse, tonic::Status>> + Send + 'static>>;

async fn watch(
&self,
Expand Down
6 changes: 3 additions & 3 deletions src/meta/raft-store/src/sm_v002/leveled_store/leveled_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ impl LeveledMap {
self.frozen = b;
}

pub(crate) fn leveled_ref_mut(&mut self) -> RefMut {
pub(crate) fn to_ref_mut(&mut self) -> RefMut {
RefMut::new(&mut self.writable, &self.frozen)
}

pub(crate) fn leveled_ref(&self) -> Ref {
pub(crate) fn to_ref(&self) -> Ref {
Ref::new(Some(&self.writable), &self.frozen)
}
}
Expand Down Expand Up @@ -141,7 +141,7 @@ where
where
K: Ord,
{
let mut l = self.leveled_ref_mut();
let mut l = self.to_ref_mut();
MapApi::set(&mut l, key, value).await

// (&mut l).set(key, value).await
Expand Down
2 changes: 1 addition & 1 deletion src/meta/raft-store/src/sm_v002/leveled_store/ref_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<'d> RefMut<'d> {
}

#[allow(dead_code)]
pub(in crate::sm_v002) fn to_leveled_ref(&self) -> Ref {
pub(in crate::sm_v002) fn to_ref(&self) -> Ref {
Ref::new(Some(&*self.writable), self.frozen)
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/raft-store/src/sm_v002/sm_v002.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl SMV002 {
///
/// It does not check expiration of the returned entry.
pub async fn get_kv(&self, key: &str) -> Option<SeqV> {
let got = MapApiRO::<String>::get(&self.levels.leveled_ref(), key).await;
let got = MapApiRO::<String>::get(&self.levels.to_ref(), key).await;
Into::<Option<SeqV>>::into(got)
}

Expand Down
1 change: 1 addition & 0 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ backon = "0.4"
clap = { workspace = true }
derive_more = { workspace = true }
futures = "0.3.24"
futures-async-stream = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
Expand Down
Loading

0 comments on commit 39e704c

Please sign in to comment.