From 22f9ba2447dc6fff60268d0819a4d47968958a79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Fri, 20 Dec 2024 20:41:58 +0800 Subject: [PATCH] feat: databend-meta transaction support generic bool-expression and else-if chain (#17064) Since this commit, application is allowed to specify a complex bool expressions as the transaction predicate. For example, the transaction will execute as if running the following pseudo codes: ``` if (a == 1 || b == 2) && (x == 3 || y == 4) { ops1 } else if (x == 2 || y == 1) { ops2 } else if (y == 3 && z == 4) { ops3 } else { ops4 } ``` ```rust let eq = |key: &str, val: &str| TxnCondition::eq_value(sample(key), b(val)); TxnRequest{ operations: vec![ BoolExpression::new( Some(eq("a", 1).or(eq("b", 2)) .and(eq("x", 3).or(eq("y", 4)))), ops1), BoolExpression::new( Some(eq("x", 2).or(eq("y", 1))), ops2), ], condition: vec![eq("y", 3), eq("z", 4)], if_then: ops3, else_then: ops4, } ``` For backward compatibility, both already existing `condition` and the new `operations` will be evaluated: transaction handler evaluate the `operations` first. If there is a met condition, execute and return. Otherwise, it evaluate `condition` and then execute `if_then` branch or `else_then` branch. TxnReply changes: Add field `execution_path` to indicate the executed branch, which is one of: - `"operation:"`, operation at `index` is executed. - `"then"`: `if_then` is executed. - `"else"`: `else_then` is executed. `TxnReply.success` is set to `false` only when `else` is executed. --- Cargo.lock | 1 + src/meta/api/src/schema_api_impl.rs | 117 +++---- src/meta/api/src/sequence_api_impl.rs | 6 +- src/meta/binaries/metabench/main.rs | 6 +- src/meta/client/src/lib.rs | 4 + src/meta/kvapi/Cargo.toml | 1 + src/meta/kvapi/src/kvapi/test_suite.rs | 326 +++++++++++------- src/meta/process/src/kv_processor.rs | 6 +- src/meta/raft-store/src/applier.rs | 80 ++++- src/meta/service/src/api/grpc/grpc_service.rs | 9 +- .../tests/it/grpc/metasrv_grpc_transaction.rs | 8 +- .../tests/it/grpc/metasrv_grpc_watch.rs | 36 +- src/meta/types/build.rs | 16 + src/meta/types/proto/meta.proto | 63 ++++ src/meta/types/src/cmd/mod.rs | 9 +- src/meta/types/src/proto_display.rs | 135 +++++++- src/meta/types/src/proto_ext/txn_ext.rs | 293 +++++++++++++++- src/meta/types/tests/it/main.rs | 1 + src/meta/types/tests/it/txn_serde.rs | 87 +++++ src/query/management/src/role/role_mgr.rs | 18 +- src/query/management/src/stage/stage_mgr.rs | 25 +- 21 files changed, 955 insertions(+), 292 deletions(-) create mode 100644 src/meta/types/tests/it/txn_serde.rs diff --git a/Cargo.lock b/Cargo.lock index 89c1ab4d034a..140ca65543e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3652,6 +3652,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "databend-common-base", "databend-common-meta-types", "fastrace", "futures-util", diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 3fc0baff53eb..785640894b7b 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -439,18 +439,17 @@ impl + ?Sized> SchemaApi for KV { } db_meta.drop_on = None; - let txn_req = TxnRequest { - condition: vec![ + let txn_req = TxnRequest::new( + vec![ txn_cond_seq(name_key, Eq, 0), txn_cond_seq(&dbid_idlist, Eq, db_id_list_seq), txn_cond_seq(&dbid, Eq, db_meta_seq), ], - if_then: vec![ + vec![ txn_op_put(name_key, serialize_u64(db_id)?), // (tenant, db_name) -> db_id txn_op_put(&dbid, serialize_struct(&db_meta)?), // (db_id) -> db_meta ], - else_then: vec![], - }; + ); let (succ, _responses) = send_txn(self, txn_req).await?; @@ -594,11 +593,7 @@ impl + ?Sized> SchemaApi for KV { ), /* __fd_database_id_to_name/ -> (tenant,db_name) */ ]; - let txn_req = TxnRequest { - condition, - if_then, - else_then: vec![], - }; + let txn_req = TxnRequest::new(condition, if_then); let (succ, _responses) = send_txn(self, txn_req).await?; @@ -1203,19 +1198,19 @@ impl + ?Sized> SchemaApi for KV { txn_cond_seq(&save_key_table_id_list, Eq, tb_id_list_seq), ]); - txn.if_then.extend( vec![ - // Changing a table in a db has to update the seq of db_meta, - // to block the batch-delete-tables when deleting a db. - txn_op_put(&key_dbid, serialize_struct(&db_meta.data)?), /* (db_id) -> db_meta */ - txn_op_put( - key_table_id, - serialize_struct(&req.table_meta)?, - ), /* (tenant, db_id, tb_id) -> tb_meta */ - txn_op_put(&save_key_table_id_list, serialize_struct(&tb_id_list)?), /* _fd_table_id_list/db_id/table_name -> tb_id_list */ - // This record does not need to assert `table_id_to_name_key == 0`, - // Because this is a reverse index for db_id/table_name -> table_id, and it is unique. - txn_op_put(&key_table_id_to_name, serialize_struct(&key_dbid_tbname)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */ - ]); + txn.if_then.extend(vec![ + // Changing a table in a db has to update the seq of db_meta, + // to block the batch-delete-tables when deleting a db. + txn_op_put(&key_dbid, serialize_struct(&db_meta.data)?), /* (db_id) -> db_meta */ + txn_op_put( + key_table_id, + serialize_struct(&req.table_meta)?, + ), /* (tenant, db_id, tb_id) -> tb_meta */ + txn_op_put(&save_key_table_id_list, serialize_struct(&tb_id_list)?), /* _fd_table_id_list/db_id/table_name -> tb_id_list */ + // This record does not need to assert `table_id_to_name_key == 0`, + // Because this is a reverse index for db_id/table_name -> table_id, and it is unique. + txn_op_put(&key_table_id_to_name, serialize_struct(&key_dbid_tbname)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */ + ]); if req.as_dropped { // To create the table in a "dropped" state, @@ -1401,8 +1396,8 @@ impl + ?Sized> SchemaApi for KV { tb_id_list.pop(); new_tb_id_list.append(table_id); - let mut txn = TxnRequest { - condition: vec![ + let mut txn = TxnRequest::new( + vec![ // db has not to change, i.e., no new table is created. // Renaming db is OK and does not affect the seq of db_meta. txn_cond_seq(&seq_db_id.data, Eq, db_meta.seq), @@ -1416,7 +1411,7 @@ impl + ?Sized> SchemaApi for KV { txn_cond_seq(&new_dbid_tbname_idlist, Eq, new_tb_id_list_seq), txn_cond_seq(&table_id_to_name_key, Eq, table_id_to_name_seq), ], - if_then: vec![ + vec![ txn_op_del(&dbid_tbname), // (db_id, tb_name) -> tb_id txn_op_put(&newdbid_newtbname, serialize_u64(table_id)?), /* (db_id, new_tb_name) -> tb_id */ // Changing a table in a db has to update the seq of db_meta, @@ -1426,8 +1421,7 @@ impl + ?Sized> SchemaApi for KV { txn_op_put(&new_dbid_tbname_idlist, serialize_struct(&new_tb_id_list)?), /* _fd_table_id_list/db_id/new_table_name -> tb_id_list */ txn_op_put(&table_id_to_name_key, serialize_struct(&db_id_table_name)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */ ], - else_then: vec![], - }; + ); if *seq_db_id.data != *new_seq_db_id.data { txn.if_then.push( @@ -1909,8 +1903,8 @@ impl + ?Sized> SchemaApi for KV { } tb_meta.drop_on = None; - let txn_req = TxnRequest { - condition: vec![ + let txn_req = TxnRequest::new( + vec![ // db has not to change, i.e., no new table is created. // Renaming db is OK and does not affect the seq of db_meta. txn_cond_seq(&DatabaseId { db_id }, Eq, db_meta_seq), @@ -1921,7 +1915,7 @@ impl + ?Sized> SchemaApi for KV { txn_cond_seq(&orphan_dbid_tbname_idlist, Eq, orphan_tb_id_list.seq), txn_cond_seq(&dbid_tbname_idlist, Eq, tb_id_list.seq), ], - if_then: vec![ + vec![ // Changing a table in a db has to update the seq of db_meta, // to block the batch-delete-tables when deleting a db. txn_op_put(&DatabaseId { db_id }, serialize_struct(&db_meta)?), /* (db_id) -> db_meta */ @@ -1931,8 +1925,7 @@ impl + ?Sized> SchemaApi for KV { txn_op_del(&orphan_dbid_tbname_idlist), // del orphan table idlist txn_op_put(&dbid_tbname_idlist, serialize_struct(&tb_id_list.data)?), /* _fd_table_id_list/db_id/table_name -> tb_id_list */ ], - else_then: vec![], - }; + ); let (succ, _responses) = send_txn(self, txn_req).await?; @@ -2061,16 +2054,15 @@ impl + ?Sized> SchemaApi for KV { // non-changed ones. for chunk in copied_files.chunks(chunk_size as usize) { - let txn = TxnRequest { - condition: vec![], - if_then: chunk + let txn = TxnRequest::new( + vec![], + chunk .iter() .map(|(name, seq_file)| { TxnOp::delete_exact(name.to_string_key(), Some(seq_file.seq())) }) .collect(), - else_then: vec![], - }; + ); let (_succ, _responses) = send_txn(self, txn).await?; } @@ -2377,16 +2369,15 @@ impl + ?Sized> SchemaApi for KV { } } - let mut txn_req = TxnRequest { - condition: vec![ + let mut txn_req = TxnRequest::new( + vec![ // table is not changed txn_cond_seq(&tbid, Eq, seq_meta.seq), ], - if_then: vec![ + vec![ txn_op_put(&tbid, serialize_struct(&new_table_meta)?), // tb_id -> tb_meta ], - else_then: vec![], - }; + ); let _ = update_mask_policy(self, &req.action, &mut txn_req, &req.tenant, req.table_id) .await; @@ -2485,13 +2476,13 @@ impl + ?Sized> SchemaApi for KV { }; indexes.insert(req.name.clone(), index); - let txn_req = TxnRequest { - condition: vec![txn_cond_eq_seq(&tbid, tb_meta_seq)], - if_then: vec![ + let txn_req = TxnRequest::new( + // + vec![txn_cond_eq_seq(&tbid, tb_meta_seq)], + vec![ txn_op_put_pb(&tbid, &table_meta, None)?, // tb_id -> tb_meta ], - else_then: vec![], - }; + ); let (succ, _responses) = send_txn(self, txn_req).await?; @@ -2540,16 +2531,15 @@ impl + ?Sized> SchemaApi for KV { } indexes.remove(&req.name); - let txn_req = TxnRequest { - condition: vec![ + let txn_req = TxnRequest::new( + vec![ // table is not changed txn_cond_seq(&tbid, Eq, seq_meta.seq), ], - if_then: vec![ + vec![ txn_op_put(&tbid, serialize_struct(&table_meta)?), // tb_id -> tb_meta ], - else_then: vec![], - }; + ); let (succ, _responses) = send_txn(self, txn_req).await?; debug!(id :? =(&tbid),succ = succ;"drop_table_index"); @@ -2742,11 +2732,7 @@ impl + ?Sized> SchemaApi for KV { txn_op_put(&id_generator, b"".to_vec()), txn_op_put_pb(&key, &lock_meta, Some(req.ttl))?, ]; - let txn_req = TxnRequest { - condition, - if_then, - else_then: vec![], - }; + let txn_req = TxnRequest::new(condition, if_then); let (succ, _responses) = send_txn(self, txn_req).await?; if succ { @@ -3053,11 +3039,7 @@ impl + ?Sized> SchemaApi for KV { txn_op_put_pb(&new_name_ident, &dict_id.data, None)?, // put new dict name ]; - let txn_req = TxnRequest { - condition, - if_then, - else_then: vec![], - }; + let txn_req = TxnRequest::new(condition, if_then); let (succ, _responses) = send_txn(self, txn_req).await?; @@ -4042,8 +4024,8 @@ async fn handle_undrop_table( // reset drop on time seq_table_meta.drop_on = None; - let txn = TxnRequest { - condition: vec![ + let txn = TxnRequest::new( + vec![ // db has not to change, i.e., no new table is created. // Renaming db is OK and does not affect the seq of db_meta. txn_cond_eq_seq(&DatabaseId { db_id }, seq_db_meta.seq), @@ -4052,15 +4034,14 @@ async fn handle_undrop_table( // table is not changed txn_cond_eq_seq(&tbid, seq_table_meta.seq), ], - if_then: vec![ + vec![ // Changing a table in a db has to update the seq of db_meta, // to block the batch-delete-tables when deleting a db. txn_op_put_pb(&DatabaseId { db_id }, &seq_db_meta.data, None)?, /* (db_id) -> db_meta */ txn_op_put(&dbid_tbname, serialize_u64(table_id)?), /* (tenant, db_id, tb_name) -> tb_id */ txn_op_put_pb(&tbid, &seq_table_meta.data, None)?, /* (tenant, db_id, tb_id) -> tb_meta */ ], - else_then: vec![], - }; + ); let (succ, _responses) = send_txn(kv_api, txn).await?; diff --git a/src/meta/api/src/sequence_api_impl.rs b/src/meta/api/src/sequence_api_impl.rs index 6183d0cfd5e8..d549f399cdfa 100644 --- a/src/meta/api/src/sequence_api_impl.rs +++ b/src/meta/api/src/sequence_api_impl.rs @@ -142,11 +142,7 @@ impl + ?Sized> SequenceApi for KV { txn_op_put_pb(&ident, &sequence_meta, None)?, // name -> meta ]; - let txn_req = TxnRequest { - condition, - if_then, - else_then: vec![], - }; + let txn_req = TxnRequest::new(condition, if_then); let (succ, _responses) = send_txn(self, txn_req).await?; diff --git a/src/meta/binaries/metabench/main.rs b/src/meta/binaries/metabench/main.rs index 8cfc2fa97420..b9aba4aa4ee3 100644 --- a/src/meta/binaries/metabench/main.rs +++ b/src/meta/binaries/metabench/main.rs @@ -298,11 +298,7 @@ async fn benchmark_table_copy_file( serde_json::from_str(param).unwrap() }; - let mut txn = TxnRequest { - condition: vec![], - if_then: vec![], - else_then: vec![], - }; + let mut txn = TxnRequest::default(); for file_index in 0..param.file_cnt { let copied_file_ident = TableCopiedFileNameIdent { diff --git a/src/meta/client/src/lib.rs b/src/meta/client/src/lib.rs index 7b6ffc037afc..5ed6d5f191f8 100644 --- a/src/meta/client/src/lib.rs +++ b/src/meta/client/src/lib.rs @@ -117,6 +117,10 @@ pub static METACLI_COMMIT_SEMVER: LazyLock = LazyLock::new(|| { /// 🖥 server: add `txn_condition::Target::KeysWithPrefix`, /// to support matching the key count by a prefix. /// +/// - 2024-12-1*: since 1.2.* +/// 🖥 server: add `TxnRequest::condition_tree`, +/// to specify a complex bool expression. +/// /// /// Server feature set: /// ```yaml diff --git a/src/meta/kvapi/Cargo.toml b/src/meta/kvapi/Cargo.toml index 666be11aea23..db292b95b9bc 100644 --- a/src/meta/kvapi/Cargo.toml +++ b/src/meta/kvapi/Cargo.toml @@ -15,6 +15,7 @@ test = true [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +databend-common-base = { workspace = true } databend-common-meta-types = { workspace = true } fastrace = { workspace = true } futures-util = { workspace = true } diff --git a/src/meta/kvapi/src/kvapi/test_suite.rs b/src/meta/kvapi/src/kvapi/test_suite.rs index a5b113f35d6f..86e01ec13379 100644 --- a/src/meta/kvapi/src/kvapi/test_suite.rs +++ b/src/meta/kvapi/src/kvapi/test_suite.rs @@ -14,7 +14,10 @@ use std::time::Duration; +use databend_common_base::display::display_option::DisplayOptionExt; +use databend_common_base::display::display_slice::DisplaySliceExt; use databend_common_meta_types::protobuf as pb; +use databend_common_meta_types::protobuf::BooleanExpression; use databend_common_meta_types::seq_value::KVMeta; use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::txn_condition; @@ -94,6 +97,8 @@ impl kvapi::TestSuite { .await?; self.kv_transaction_condition_keys_with_prefix(&builder.build().await) .await?; + self.kv_transaction_complex_conditions(&builder.build().await) + .await?; self.kv_transaction_delete_match_seq_some_not_match(&builder.build().await) .await?; self.kv_transaction_delete_match_seq_some_match(&builder.build().await) @@ -508,13 +513,7 @@ impl kvapi::TestSuite { let if_then: Vec = vec![TxnOp::put(txn_key.clone(), b("new_v1"))]; - let else_then: Vec = vec![]; - - let txn = TxnRequest { - condition: conditions, - if_then, - else_then, - }; + let txn = TxnRequest::new(conditions, if_then); let resp = kv.transaction(txn).await?; @@ -567,12 +566,7 @@ impl kvapi::TestSuite { })), }]; - let else_then: Vec = vec![]; - let txn = TxnRequest { - condition, - if_then, - else_then, - }; + let txn = TxnRequest::new(condition, if_then); let resp = kv.transaction(txn).await?; @@ -607,18 +601,12 @@ impl kvapi::TestSuite { target: Some(txn_condition::Target::Seq(0)), }]; - let if_then: Vec = vec![]; - let else_then: Vec = vec![TxnOp { request: Some(txn_op::Request::DeleteByPrefix(TxnDeleteByPrefixRequest { prefix: unmatch_prefix.clone(), })), }]; - let txn = TxnRequest { - condition, - if_then, - else_then, - }; + let txn = TxnRequest::new(condition, vec![]).with_else(else_then); let resp = kv.transaction(txn).await?; @@ -663,12 +651,7 @@ impl kvapi::TestSuite { let if_then: Vec = vec![TxnOp::put(txn_key.clone(), b("new_v1"))]; - let else_then: Vec = vec![]; - let txn = TxnRequest { - condition, - if_then, - else_then, - }; + let txn = TxnRequest::new(condition, if_then); let resp = kv.transaction(txn).await?; @@ -708,12 +691,7 @@ impl kvapi::TestSuite { let if_then: Vec = vec![TxnOp::put(txn_key1.clone(), b("new_v1"))]; - let else_then: Vec = vec![]; - let txn = TxnRequest { - condition, - if_then, - else_then, - }; + let txn = TxnRequest::new(condition, if_then); let resp = kv.transaction(txn).await?; @@ -779,12 +757,7 @@ impl kvapi::TestSuite { }, ]; - let else_then: Vec = vec![]; - let txn = TxnRequest { - condition, - if_then, - else_then, - }; + let txn = TxnRequest::new(condition, if_then); let resp = kv.transaction(txn).await?; @@ -875,11 +848,11 @@ impl kvapi::TestSuite { // Test eq value: success = false - let txn = TxnRequest { - condition: vec![TxnCondition::match_value(k1, ConditionResult::Eq, b("v10"))], - if_then: vec![TxnOp::put(k1, b("v2")), TxnOp::get(k1)], - else_then: vec![TxnOp::get(k1)], - }; + let txn = TxnRequest::new(vec![TxnCondition::eq_value(k1, b("v10"))], vec![ + TxnOp::put(k1, b("v2")), + TxnOp::get(k1), + ]) + .with_else(vec![TxnOp::get(k1)]); let resp = kv.transaction(txn).await?; @@ -891,11 +864,10 @@ impl kvapi::TestSuite { // Test eq value: success = true - let txn = TxnRequest { - condition: vec![TxnCondition::match_value(k1, ConditionResult::Eq, b("v1"))], - if_then: vec![TxnOp::put(k1, b("v2")), TxnOp::get(k1)], - else_then: vec![], - }; + let txn = TxnRequest::new(vec![TxnCondition::eq_value(k1, b("v1"))], vec![ + TxnOp::put(k1, b("v2")), + TxnOp::get(k1), + ]); let resp = kv.transaction(txn).await?; @@ -909,11 +881,11 @@ impl kvapi::TestSuite { // Test less than value: success = false - let txn = TxnRequest { - condition: vec![TxnCondition::match_value(k1, ConditionResult::Lt, b("v2"))], - if_then: vec![TxnOp::put(k1, b("v3")), TxnOp::get(k1)], - else_then: vec![TxnOp::get(k1)], - }; + let txn = TxnRequest::new( + vec![TxnCondition::match_value(k1, ConditionResult::Lt, b("v2"))], + vec![TxnOp::put(k1, b("v3")), TxnOp::get(k1)], + ) + .with_else(vec![TxnOp::get(k1)]); let resp = kv.transaction(txn).await?; @@ -925,11 +897,11 @@ impl kvapi::TestSuite { // Test less than value: success = true - let txn = TxnRequest { - condition: vec![TxnCondition::match_value(k1, ConditionResult::Lt, b("v3"))], - if_then: vec![TxnOp::put(k1, b("v3")), TxnOp::get(k1)], - else_then: vec![TxnOp::get(k1)], - }; + let txn = TxnRequest::new( + vec![TxnCondition::match_value(k1, ConditionResult::Lt, b("v3"))], + vec![TxnOp::put(k1, b("v3")), TxnOp::get(k1)], + ) + .with_else(vec![TxnOp::get(k1)]); let resp = kv.transaction(txn).await?; @@ -943,11 +915,11 @@ impl kvapi::TestSuite { // Test less equal value: success = false - let txn = TxnRequest { - condition: vec![TxnCondition::match_value(k1, ConditionResult::Le, b("v0"))], - if_then: vec![TxnOp::put(k1, b("v4")), TxnOp::get(k1)], - else_then: vec![TxnOp::get(k1)], - }; + let txn = TxnRequest::new( + vec![TxnCondition::match_value(k1, ConditionResult::Le, b("v0"))], + vec![TxnOp::put(k1, b("v4")), TxnOp::get(k1)], + ) + .with_else(vec![TxnOp::get(k1)]); let resp = kv.transaction(txn).await?; @@ -959,11 +931,11 @@ impl kvapi::TestSuite { // Test less equal value: success = true - let txn = TxnRequest { - condition: vec![TxnCondition::match_value(k1, ConditionResult::Le, b("v3"))], - if_then: vec![TxnOp::put(k1, b("v4")), TxnOp::get(k1)], - else_then: vec![TxnOp::get(k1)], - }; + let txn = TxnRequest::new( + vec![TxnCondition::match_value(k1, ConditionResult::Le, b("v3"))], + vec![TxnOp::put(k1, b("v4")), TxnOp::get(k1)], + ) + .with_else(vec![TxnOp::get(k1)]); let resp = kv.transaction(txn).await?; @@ -977,11 +949,11 @@ impl kvapi::TestSuite { // Test greater than value: success = false - let txn = TxnRequest { - condition: vec![TxnCondition::match_value(k1, ConditionResult::Gt, b("v5"))], - if_then: vec![TxnOp::put(k1, b("v5")), TxnOp::get(k1)], - else_then: vec![TxnOp::get(k1)], - }; + let txn = TxnRequest::new( + vec![TxnCondition::match_value(k1, ConditionResult::Gt, b("v5"))], + vec![TxnOp::put(k1, b("v5")), TxnOp::get(k1)], + ) + .with_else(vec![TxnOp::get(k1)]); let resp = kv.transaction(txn).await?; @@ -993,11 +965,11 @@ impl kvapi::TestSuite { // Test greater than value: success = true - let txn = TxnRequest { - condition: vec![TxnCondition::match_value(k1, ConditionResult::Gt, b("v3"))], - if_then: vec![TxnOp::put(k1, b("v5")), TxnOp::get(k1)], - else_then: vec![TxnOp::get(k1)], - }; + let txn = TxnRequest::new( + vec![TxnCondition::match_value(k1, ConditionResult::Gt, b("v3"))], + vec![TxnOp::put(k1, b("v5")), TxnOp::get(k1)], + ) + .with_else(vec![TxnOp::get(k1)]); let resp = kv.transaction(txn).await?; @@ -1011,11 +983,11 @@ impl kvapi::TestSuite { // Test greater equal value: success = false - let txn = TxnRequest { - condition: vec![TxnCondition::match_value(k1, ConditionResult::Ge, b("v6"))], - if_then: vec![TxnOp::put(k1, b("v6")), TxnOp::get(k1)], - else_then: vec![TxnOp::get(k1)], - }; + let txn = TxnRequest::new( + vec![TxnCondition::match_value(k1, ConditionResult::Ge, b("v6"))], + vec![TxnOp::put(k1, b("v6")), TxnOp::get(k1)], + ) + .with_else(vec![TxnOp::get(k1)]); let resp = kv.transaction(txn).await?; @@ -1027,11 +999,11 @@ impl kvapi::TestSuite { // Test greater equal value: success = true - let txn = TxnRequest { - condition: vec![TxnCondition::match_value(k1, ConditionResult::Ge, b("v5"))], - if_then: vec![TxnOp::put(k1, b("v6")), TxnOp::get(k1)], - else_then: vec![TxnOp::get(k1)], - }; + let txn = TxnRequest::new( + vec![TxnCondition::match_value(k1, ConditionResult::Ge, b("v5"))], + vec![TxnOp::put(k1, b("v6")), TxnOp::get(k1)], + ) + .with_else(vec![TxnOp::get(k1)]); let resp = kv.transaction(txn).await?; @@ -1052,15 +1024,11 @@ impl kvapi::TestSuite { info!("--- {}", func_path!()); - let txn = TxnRequest { - condition: vec![], - if_then: vec![TxnOp::put_with_ttl( - "k1", - b("v1"), - Some(Duration::from_millis(2_000)), - )], - else_then: vec![], - }; + let txn = TxnRequest::new(vec![], vec![TxnOp::put_with_ttl( + "k1", + b("v1"), + Some(Duration::from_millis(2_000)), + )]); let _resp = kv.transaction(txn).await?; @@ -1101,14 +1069,16 @@ impl kvapi::TestSuite { // A transaction that set positive key if succeeded, // otherwise set the negative key. - let txn = |op: ConditionResult, n: u64| TxnRequest { - condition: vec![TxnCondition::match_keys_with_prefix( - &sample_keys_prefix, - op, - n, - )], - if_then: vec![TxnOp::put(&positive, b(format!("{op:?}")))], - else_then: vec![TxnOp::put(&negative, b(format!("{op:?}")))], + let txn = |op: ConditionResult, n: u64| { + TxnRequest::new( + vec![TxnCondition::match_keys_with_prefix( + &sample_keys_prefix, + op, + n, + )], + vec![TxnOp::put(&positive, b(format!("{op:?}")))], + ) + .with_else(vec![TxnOp::put(&negative, b(format!("{op:?}")))]) }; for (op, n, expected) in [ @@ -1152,6 +1122,134 @@ impl kvapi::TestSuite { Ok(()) } + pub async fn kv_transaction_complex_conditions( + &self, + kv: &KV, + ) -> anyhow::Result<()> { + let prefix = func_name!(); + + let sample = |suffix: &str| format!("{}/{}", prefix, suffix); + let result = format!("{prefix}/result"); + + kv.upsert_kv(UpsertKV::update(sample("a"), &b("a"))).await?; + kv.upsert_kv(UpsertKV::update(sample("b"), &b("b"))).await?; + kv.upsert_kv(UpsertKV::update(sample("c"), &b("c"))).await?; + + // Build a simple equal-value condition + let eq = |key: &str, val: &str| TxnCondition::eq_value(sample(key), b(val)); + + let txn = |bools: Vec>, conditions: Vec| { + let mut txn = TxnRequest::default(); + for (i, cond) in bools.into_iter().enumerate() { + txn = txn.push_branch(cond, vec![pb::TxnOp::put( + &result, + b(format!("operation:{}", i)), + )]); + } + + txn.push_if_then(conditions, vec![pb::TxnOp::put(&result, b("then"))]) + .with_else(vec![pb::TxnOp::put(&result, b("else"))]) + }; + + for (bools, condition, expected, index) in [ + ( + vec![], + // empty condition is always true + vec![], + "then", + None, + ), + (vec![], vec![eq("a", "a")], "then", None), + (vec![], vec![eq("a", "b")], "else", None), + (vec![], vec![eq("a", "a"), eq("b", "b")], "then", None), + (vec![], vec![eq("a", "a"), eq("b", "c")], "else", None), + ( + vec![ + Some(eq("a", "a").and(eq("b", "b"))), + Some(eq("b", "b").or(eq("c", "c"))), + ], + vec![eq("a", "a")], + "operation:0", + Some(0), + ), + ( + vec![ + Some(eq("a", "a").and(eq("b", "c"))), + Some(eq("b", "b").and(eq("c", "c"))), + ], + vec![eq("a", "a")], + "operation:1", + Some(1), + ), + ( + vec![ + Some(eq("a", "a").and(eq("b", "c"))), + Some(eq("b", "b").and(eq("x", "x"))), + ], + vec![eq("a", "a")], + "then", + None, + ), + ( + vec![ + Some(eq("a", "a").and(eq("b", "c"))), + Some(eq("b", "b").and(eq("x", "x"))), + ], + // empty condition is always true + vec![], + "then", + None, + ), + ( + vec![ + Some(eq("a", "a").and(eq("b", "c"))), + // None condition is always true + None, + ], + vec![eq("a", "a")], + "operation:1", + Some(1), + ), + ( + vec![Some( + eq("a", "a") + .or(eq("x", "x")) + .and(eq("b", "b").or(eq("y", "y"))), + )], + vec![eq("a", "b")], + "operation:0", + Some(0), + ), + ] { + kv.upsert_kv(UpsertKV::update(&result, &b(""))).await?; + + let resp = kv + .transaction(txn(bools.clone(), condition.clone())) + .await?; + + let message = format!( + "case: {} {}, expected: {expected}", + bools + .into_iter() + .map(|b| b.display().to_string()) + .collect::>() + .display(), + condition.display() + ); + + let want_success = expected != "else"; + + assert_eq!(resp.success, want_success, "{}", message); + assert_eq!(resp.execution_path, expected, "{}", message); + assert_eq!(resp.executed_branch_index().unwrap(), index, "{}", message); + + let got = kv.get_kv(&result).await?.unwrap().data; + assert_eq!(got, b(expected), "{}", message); + } + + Ok(()) + } + /// If `TxnDeleteRequest.match_seq` is not set, /// the delete operation will always be executed. pub async fn kv_transaction_delete_match_seq_none( @@ -1164,11 +1262,7 @@ impl kvapi::TestSuite { kv.upsert_kv(UpsertKV::update(key(), &val())).await?; - let txn = TxnRequest { - condition: vec![], - if_then: vec![TxnOp::delete(key())], - else_then: vec![], - }; + let txn = TxnRequest::new(vec![], vec![TxnOp::delete(key())]); let resp = kv.transaction(txn).await?; @@ -1198,11 +1292,7 @@ impl kvapi::TestSuite { kv.upsert_kv(UpsertKV::update(key(), &val())).await?; - let txn = TxnRequest { - condition: vec![], - if_then: vec![TxnOp::delete_exact(key(), Some(100))], - else_then: vec![], - }; + let txn = TxnRequest::new(vec![], vec![TxnOp::delete_exact(key(), Some(100))]); let resp = kv.transaction(txn).await?; @@ -1236,11 +1326,7 @@ impl kvapi::TestSuite { kv.upsert_kv(UpsertKV::update(key(), &val())).await?; - let txn = TxnRequest { - condition: vec![], - if_then: vec![TxnOp::delete_exact(key(), Some(1))], - else_then: vec![], - }; + let txn = TxnRequest::new(vec![], vec![TxnOp::delete_exact(key(), Some(1))]); let resp = kv.transaction(txn).await?; diff --git a/src/meta/process/src/kv_processor.rs b/src/meta/process/src/kv_processor.rs index cad80935c285..7a70ddfc71bc 100644 --- a/src/meta/process/src/kv_processor.rs +++ b/src/meta/process/src/kv_processor.rs @@ -164,11 +164,7 @@ where F: Fn(&str, Vec) -> Result, anyhow::Error> Ok(Some(LogEntry { txid: log_entry.txid, time_ms: log_entry.time_ms, - cmd: Cmd::Transaction(TxnRequest { - condition, - if_then, - else_then, - }), + cmd: Cmd::Transaction(TxnRequest::new(condition, if_then).with_else(else_then)), })) } } diff --git a/src/meta/raft-store/src/applier.rs b/src/meta/raft-store/src/applier.rs index 6bfa2dbd40d6..d03eab41dadb 100644 --- a/src/meta/raft-store/src/applier.rs +++ b/src/meta/raft-store/src/applier.rs @@ -18,6 +18,8 @@ use std::time::Duration; use databend_common_base::display::display_unix_epoch::DisplayUnixTimeStampExt; use databend_common_meta_types::protobuf as pb; +use databend_common_meta_types::protobuf::boolean_expression::CombiningOperator; +use databend_common_meta_types::protobuf::BooleanExpression; use databend_common_meta_types::raft_types::Entry; use databend_common_meta_types::raft_types::EntryPayload; use databend_common_meta_types::raft_types::StoredMembership; @@ -50,6 +52,7 @@ use databend_common_meta_types::TxnRequest; use databend_common_meta_types::UpsertKV; use databend_common_meta_types::With; use futures::stream::TryStreamExt; +use futures_util::future::BoxFuture; use futures_util::StreamExt; use log::debug; use log::error; @@ -247,19 +250,39 @@ where SM: StateMachineApi + 'static pub(crate) async fn apply_txn(&mut self, req: &TxnRequest) -> Result { debug!(txn :% =(req); "apply txn cmd"); + // 1. Evaluate conditional operations one by one. + // Once one of them is successful, execute the corresponding operations and retrun. + // Otherwise, try next. + for (i, conditional) in req.operations.iter().enumerate() { + let success = if let Some(predicate) = &conditional.predicate { + self.eval_bool_expression(predicate).await? + } else { + true + }; + + if success { + let mut resp: TxnReply = TxnReply::new(format!("operation:{i}")); + + for op in &conditional.operations { + self.txn_execute_operation(op, &mut resp).await?; + } + + return Ok(AppliedState::TxnReply(resp)); + } + } + + // 2. For backward compatibility, evaluate the `condition` as the last conditional-operation. + // If success, execute the `if_then` operations + let success = self.eval_txn_conditions(&req.condition).await?; - let ops = if success { - &req.if_then + let (ops, path) = if success { + (&req.if_then, "then") } else { - &req.else_then + (&req.else_then, "else") }; - let mut resp: TxnReply = TxnReply { - success, - error: "".to_string(), - responses: vec![], - }; + let mut resp: TxnReply = TxnReply::new(path); for op in ops { self.txn_execute_operation(op, &mut resp).await?; @@ -284,6 +307,47 @@ where SM: StateMachineApi + 'static Ok(true) } + fn eval_bool_expression<'x>( + &'x mut self, + tree: &'x BooleanExpression, + ) -> BoxFuture<'x, Result> { + let op = tree.operator(); + + let fu = async move { + match op { + CombiningOperator::And => { + for expr in tree.sub_expressions.iter() { + if !self.eval_bool_expression(expr).await? { + return Ok(false); + } + } + + for cond in tree.conditions.iter() { + if !self.eval_one_condition(cond).await? { + return Ok(false); + } + } + } + CombiningOperator::Or => { + for expr in tree.sub_expressions.iter() { + if self.eval_bool_expression(expr).await? { + return Ok(true); + } + } + + for cond in tree.conditions.iter() { + if self.eval_one_condition(cond).await? { + return Ok(true); + } + } + } + } + Ok(true) + }; + + Box::pin(fu) + } + #[fastrace::trace] async fn eval_one_condition(&self, cond: &TxnCondition) -> Result { debug!(cond :% =(cond); "txn_execute_one_condition"); diff --git a/src/meta/service/src/api/grpc/grpc_service.rs b/src/meta/service/src/api/grpc/grpc_service.rs index 28d7de4bf893..3eea8a71a8e8 100644 --- a/src/meta/service/src/api/grpc/grpc_service.rs +++ b/src/meta/service/src/api/grpc/grpc_service.rs @@ -57,6 +57,7 @@ use futures::stream::TryChunksError; use futures::StreamExt; use futures::TryStreamExt; use log::debug; +use log::error; use prost::Message; use tokio_stream; use tokio_stream::Stream; @@ -177,12 +178,8 @@ impl MetaServiceImpl { (endpoint, txn_reply) } Err(err) => { - let txn_reply = TxnReply { - success: false, - error: serde_json::to_string(&err).expect("fail to serialize"), - responses: vec![], - }; - (None, txn_reply) + error!("txn request failed: {:?}", err); + return Err(Status::internal(err.to_string())); } }; diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_transaction.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_transaction.rs index cb9b92f9f923..f58fb21bd274 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_transaction.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_transaction.rs @@ -50,13 +50,7 @@ async fn test_transaction_follower_responds_leader_endpoint() -> anyhow::Result< assert_eq!(a1(), eclient.target_endpoint(),); } - let _res = client - .request(TxnRequest { - condition: vec![], - if_then: vec![], - else_then: vec![], - }) - .await?; + let _res = client.request(TxnRequest::new(vec![], vec![])).await?; // Current leader endpoint updated, will connect to a0. { diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs index cc34cba7ad8b..ac1010d0b02f 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs @@ -307,13 +307,7 @@ async fn test_watch() -> anyhow::Result<()> { }, ]; - let else_then: Vec = vec![]; - - let txn = TxnRequest { - condition: conditions, - if_then, - else_then, - }; + let txn = TxnRequest::new(conditions, if_then); seq = 7; @@ -361,11 +355,7 @@ async fn test_watch_expired_events() -> anyhow::Result<()> { { let client = make_client(&addr)?; - let mut txn = TxnRequest { - condition: vec![], - if_then: vec![], - else_then: vec![], - }; + let mut txn = TxnRequest::new(vec![], vec![]); // Every apply() will clean all expired keys. for i in 0..(32 + 1) { @@ -405,19 +395,15 @@ async fn test_watch_expired_events() -> anyhow::Result<()> { info!("--- apply another txn in another thread to override keys"); { - let txn = TxnRequest { - condition: vec![], - if_then: vec![ - TxnOp::put("w_b1", b("w_b1_override")), - TxnOp::delete("w_b2"), - TxnOp { - request: Some(txn_op::Request::DeleteByPrefix(TxnDeleteByPrefixRequest { - prefix: s("w_b3"), - })), - }, - ], - else_then: vec![], - }; + let txn = TxnRequest::new(vec![], vec![ + TxnOp::put("w_b1", b("w_b1_override")), + TxnOp::delete("w_b2"), + TxnOp { + request: Some(txn_op::Request::DeleteByPrefix(TxnDeleteByPrefixRequest { + prefix: s("w_b3"), + })), + }, + ]); let client = make_client(&addr)?; let _h = databend_common_base::runtime::spawn(async move { diff --git a/src/meta/types/build.rs b/src/meta/types/build.rs index 3f5a4e4d1185..44e750d703a1 100644 --- a/src/meta/types/build.rs +++ b/src/meta/types/build.rs @@ -83,6 +83,18 @@ fn build_proto() { "TxnCondition", "#[derive(Eq, serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]", ) + .type_attribute( + "ConditionalOperation", + "#[derive(Eq, serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]", + ) + .type_attribute( + "BooleanExpression", + "#[derive(Eq, serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]", + ) + .type_attribute( + "BooleanExpression.CombiningOperator", + "#[derive(serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]", + ) .type_attribute( "TxnOp", "#[derive(Eq, serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]", @@ -139,6 +151,10 @@ fn build_proto() { "TxnPutRequest.ttl_ms", r#"#[serde(skip_serializing_if = "Option::is_none")]"#, ) + .field_attribute( + "TxnRequest.operations", + r#"#[serde(skip_serializing_if = "Vec::is_empty")] #[serde(default)]"#, + ) .compile_protos_with_config(config, &protos, &[&proto_dir]) .unwrap(); } diff --git a/src/meta/types/proto/meta.proto b/src/meta/types/proto/meta.proto index ade20569ec7a..fd323ee4cc0f 100644 --- a/src/meta/types/proto/meta.proto +++ b/src/meta/types/proto/meta.proto @@ -130,8 +130,31 @@ message TxnCondition { ConditionResult expected = 4; } +// BooleanExpression represents a tree of transaction conditions combined with logical operators. +// It enables complex condition checking by allowing both simple conditions and nested expressions. +message BooleanExpression { + // Logical operator to combine multiple conditions, including sub compound conditions or simple conditions. + enum CombiningOperator { + AND = 0; + OR = 1; + } + + // Operator determining how child expressions and conditions are combined + CombiningOperator operator = 1; + + // Nested boolean expressions, allowing tree-like structure + // Example: (A AND B) OR (C AND D) where A,B,C,D are conditions + repeated BooleanExpression sub_expressions = 2; + + // Leaf-level transaction conditions + // These are the actual checks performed against the state. + repeated TxnCondition conditions = 3; +} + + message TxnOp { oneof request { + // TODO add Echo to get a response TxnGetRequest get = 1; TxnPutRequest put = 2; TxnDeleteRequest delete = 3; @@ -148,7 +171,33 @@ message TxnOpResponse { } } +// Represents a set of operations that execute only when specified conditions are met. +message ConditionalOperation { + // Tree of conditions that must be satisfied for operations to execute + BooleanExpression predicate = 1; + + // Operations to execute when condition_tree evaluates to true + // These operations are executed in sequence + repeated TxnOp operations = 2; +} + +// A Transaction request sent to the databend-meta service. +// +// To provide backward compatibility, the `TxnRequest` is processed in the following order: +// +// - Loop and evaluate the condition in the `operations`, and execute the corresponding operation, if the condition is met. +// This will stop and return once a condition is met and one of the corresponding operations is executed. +// +// - If none of the conditions are met, the `condition` as the **last** condition will be evaluated. +// And the `if_then` will be executed and return. +// +// - If none operation are executed, `else_then` will be executed. message TxnRequest { + + // Series of conditional operations to execute. + // It will stop once a condition is met and one of the corresponding operations is executed + repeated ConditionalOperation operations = 4; + // `condition` is a list of predicates. // If all of them success, the `if_then` will be executed, // otherwise `else_then` op will be executed. @@ -163,10 +212,24 @@ message TxnRequest { repeated TxnOp else_then = 3; } +// TransactionResponse represents the result of executing a transaction. +// +// `execution_path` identifies condition that is executed and is identified as: +// - "operation:{index}": executed operation at index +// - "then": `condition` field were met, and the `if_then` operation is executed. +// - "else": neither operations nor condition are met and `else_then` is executed. +// +// `success` is set to false only when `else_then` is executed. message TxnReply { bool success = 1; + repeated TxnOpResponse responses = 2; + + // Not used string error = 3; + + // Identifies which execution path was taken + string execution_path = 4; } message ClusterStatus { diff --git a/src/meta/types/src/cmd/mod.rs b/src/meta/types/src/cmd/mod.rs index 8aa4c90ab2fb..75fe8849e7d6 100644 --- a/src/meta/types/src/cmd/mod.rs +++ b/src/meta/types/src/cmd/mod.rs @@ -131,15 +131,14 @@ mod tests { assert_eq!(cmd, serde_json::from_str(want)?); // Transaction - let cmd = super::Cmd::Transaction(TxnRequest { - condition: vec![TxnCondition::eq_value("k", b("v"))], - if_then: vec![TxnOp::put_with_ttl( + let cmd = super::Cmd::Transaction(TxnRequest::new( + vec![TxnCondition::eq_value("k", b("v"))], + vec![TxnOp::put_with_ttl( "k", b("v"), Some(Duration::from_millis(100)), )], - else_then: vec![], - }); + )); let want = concat!( r#"{"Transaction":{"#, r#""condition":[{"key":"k","expected":0,"target":{"Value":[118]}}],"#, diff --git a/src/meta/types/src/proto_display.rs b/src/meta/types/src/proto_display.rs index 17bd20d69d91..5707fdcc6d47 100644 --- a/src/meta/types/src/proto_display.rs +++ b/src/meta/types/src/proto_display.rs @@ -16,9 +16,14 @@ use std::fmt::Display; use std::fmt::Formatter; use std::time::Duration; +use databend_common_base::display::display_option::DisplayOptionExt; +use databend_common_base::display::display_slice::DisplaySliceExt; use databend_common_base::display::display_unix_epoch::DisplayUnixTimeStampExt; use num_traits::FromPrimitive; +use crate::protobuf::boolean_expression::CombiningOperator; +use crate::protobuf::BooleanExpression; +use crate::protobuf::ConditionalOperation; use crate::txn_condition::Target; use crate::txn_op; use crate::txn_op::Request; @@ -98,13 +103,17 @@ impl Display for VecDisplay<'_, T> { impl Display for TxnRequest { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!( - f, - "TxnRequest{{ if:{} then:{} else:{} }}", - VecDisplay::new_at_most(&self.condition, 5), - VecDisplay::new_at_most(&self.if_then, 5), - VecDisplay::new_at_most(&self.else_then, 5), - ) + write!(f, "TxnRequest{{",)?; + + for op in self.operations.iter() { + write!(f, "{{ {} }}, ", op)?; + } + + write!(f, "if:{} ", VecDisplay::new_at_most(&self.condition, 10),)?; + write!(f, "then:{} ", VecDisplay::new_at_most(&self.if_then, 10),)?; + write!(f, "else:{}", VecDisplay::new_at_most(&self.else_then, 10),)?; + + write!(f, "}}",) } } @@ -287,8 +296,52 @@ impl Display for TxnDeleteByPrefixResponse { } } +impl Display for BooleanExpression { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let op = self.operator(); + let op = match op { + CombiningOperator::And => "AND", + CombiningOperator::Or => "OR", + }; + + let mut printed = false; + + for expr in self.sub_expressions.iter() { + if printed { + write!(f, " {} ", op)?; + } + write!(f, "({})", expr)?; + printed = true; + } + + for cond in self.conditions.iter() { + if printed { + write!(f, " {} ", op)?; + } + write!(f, "{}", cond)?; + printed = true; + } + Ok(()) + } +} + +impl Display for ConditionalOperation { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "if:({}) then:{}", + self.predicate.display(), + self.operations.display_n::<10>() + ) + } +} + #[cfg(test)] mod tests { + use crate::protobuf::BooleanExpression; + use crate::protobuf::TxnCondition; + use crate::TxnOp; + #[test] fn test_vec_display() { assert_eq!( @@ -311,4 +364,72 @@ mod tests { "[...]" ); } + + #[test] + fn test_tx_display_with_bool_expression() { + let expr = BooleanExpression::from_conditions_and([ + TxnCondition::eq_seq("k1", 1), + TxnCondition::eq_seq("k2", 2), + ]) + .and(BooleanExpression::from_conditions_or([ + TxnCondition::eq_seq("k3", 3), + TxnCondition::eq_seq("k4", 4), + TxnCondition::keys_with_prefix("k5", 10), + ])); + + assert_eq!( + format!("{}", expr), + "(k3 == seq(3) OR k4 == seq(4) OR k5 == keys_with_prefix(10)) AND k1 == seq(1) AND k2 == seq(2)" + ); + } + + #[test] + fn test_display_conditional_operation() { + let op = crate::protobuf::ConditionalOperation { + predicate: Some(BooleanExpression::from_conditions_and([ + TxnCondition::eq_seq("k1", 1), + TxnCondition::eq_seq("k2", 2), + ])), + operations: vec![ + // + TxnOp::put("k1", b"v1".to_vec()), + TxnOp::put("k2", b"v2".to_vec()), + ], + }; + + assert_eq!( + format!("{}", op), + "if:(k1 == seq(1) AND k2 == seq(2)) then:[Put(Put key=k1),Put(Put key=k2)]" + ); + } + + #[test] + fn test_display_txn_request() { + let op = crate::protobuf::ConditionalOperation { + predicate: Some(BooleanExpression::from_conditions_and([ + TxnCondition::eq_seq("k1", 1), + TxnCondition::eq_seq("k2", 2), + ])), + operations: vec![ + // + TxnOp::put("k1", b"v1".to_vec()), + TxnOp::put("k2", b"v2".to_vec()), + ], + }; + + let req = crate::TxnRequest { + operations: vec![op], + condition: vec![TxnCondition::eq_seq("k1", 1), TxnCondition::eq_seq("k2", 2)], + if_then: vec![ + TxnOp::put("k1", b"v1".to_vec()), + TxnOp::put("k2", b"v2".to_vec()), + ], + else_then: vec![TxnOp::put("k3", b"v1".to_vec())], + }; + + assert_eq!( + format!("{}", req), + "TxnRequest{{ if:(k1 == seq(1) AND k2 == seq(2)) then:[Put(Put key=k1),Put(Put key=k2)] }, if:[k1 == seq(1),k2 == seq(2)] then:[Put(Put key=k1),Put(Put key=k2)] else:[Put(Put key=k3)]}", + ); + } } diff --git a/src/meta/types/src/proto_ext/txn_ext.rs b/src/meta/types/src/proto_ext/txn_ext.rs index 9ef3c5bde9f7..80d1598b33ae 100644 --- a/src/meta/types/src/proto_ext/txn_ext.rs +++ b/src/meta/types/src/proto_ext/txn_ext.rs @@ -14,18 +14,58 @@ use std::time::Duration; +use pb::boolean_expression::CombiningOperator; use pb::txn_condition::ConditionResult; use pb::txn_condition::Target; use crate::protobuf as pb; use crate::seq_value::SeqV; -use crate::TxnRequest; -impl TxnRequest { +impl pb::TxnRequest { + /// Push a new conditional operation branch to the transaction. + pub fn push_branch( + mut self, + expr: Option, + ops: impl IntoIterator, + ) -> Self { + self.operations + .push(pb::ConditionalOperation::new(expr, ops)); + self + } + + /// Push the old version of `condition` and `if_then` to the transaction. + pub fn push_if_then( + mut self, + conditions: impl IntoIterator, + ops: impl IntoIterator, + ) -> Self { + assert!(self.condition.is_empty()); + assert!(self.if_then.is_empty()); + self.condition.extend(conditions); + self.if_then.extend(ops); + self + } + + pub fn new(conditions: Vec, ops: Vec) -> Self { + Self { + operations: vec![], + condition: conditions, + if_then: ops, + else_then: vec![], + } + } + + /// Adds operations to execute when the conditions are not met. + pub fn with_else(mut self, ops: Vec) -> Self { + self.else_then = ops; + self + } + /// Creates a transaction request that performs the specified operations /// unconditionally. pub fn unconditional(ops: Vec) -> Self { Self { + operations: vec![], condition: vec![], if_then: ops, else_then: vec![], @@ -33,6 +73,128 @@ impl TxnRequest { } } +impl pb::TxnReply { + pub fn new(execution_path: impl ToString) -> Self { + let execution_path = execution_path.to_string(); + Self { + success: execution_path != "else", + responses: vec![], + error: "".to_string(), + execution_path, + } + } + + /// Return the index of the branch that was executed in [`pb::TxnRequest::operations`]. + /// + /// If none of the branches were executed, return `None`, + /// i.e., the `condition` is met and `if_then` is executed, or `else_then` is executed. + /// In such case, the caller should then compare `execution_path` against "then" or "else` to determine which branch was executed. + /// + /// If there is an error parsing the index, return the original `execution_path`. + pub fn executed_branch_index(&self) -> Result, &str> { + // if self.execution_path is in form "operation:", return the index. + if let Some(index) = self.execution_path.strip_prefix("operation:") { + index + .parse() + .map(Some) + .map_err(|_| self.execution_path.as_str()) + } else { + Ok(None) + } + } +} + +#[derive(derive_more::From)] +pub enum ExpressionOrCondition { + Expression(#[from] pb::BooleanExpression), + Condition(#[from] pb::TxnCondition), +} + +impl pb::BooleanExpression { + pub fn from_conditions_and(conditions: impl IntoIterator) -> Self { + Self::from_conditions(CombiningOperator::And, conditions) + } + + pub fn from_conditions_or(conditions: impl IntoIterator) -> Self { + Self::from_conditions(CombiningOperator::Or, conditions) + } + + fn from_conditions( + op: CombiningOperator, + conditions: impl IntoIterator, + ) -> Self { + Self { + conditions: conditions.into_iter().collect(), + operator: op as i32, + sub_expressions: vec![], + } + } + + pub fn and(self, expr_or_condition: impl Into) -> Self { + self.merge(CombiningOperator::And, expr_or_condition) + } + + pub fn or(self, expr_or_condition: impl Into) -> Self { + self.merge(CombiningOperator::Or, expr_or_condition) + } + + pub fn and_many(self, others: impl IntoIterator) -> Self { + self.merge_expressions(CombiningOperator::And, others) + } + + pub fn or_many(self, others: impl IntoIterator) -> Self { + self.merge_expressions(CombiningOperator::Or, others) + } + + fn merge( + self, + op: CombiningOperator, + expr_or_condition: impl Into, + ) -> Self { + let x = expr_or_condition.into(); + match x { + ExpressionOrCondition::Expression(expr) => self.merge_expressions(op, [expr]), + ExpressionOrCondition::Condition(cond) => self.merge_conditions(op, [cond]), + } + } + + fn merge_conditions( + mut self, + op: CombiningOperator, + condition: impl IntoIterator, + ) -> Self { + if self.operator == op as i32 { + self.conditions.extend(condition); + self + } else { + pb::BooleanExpression { + operator: op as i32, + sub_expressions: vec![self], + conditions: condition.into_iter().collect(), + } + } + } + + fn merge_expressions( + mut self, + op: CombiningOperator, + other: impl IntoIterator, + ) -> Self { + if self.operator == op as i32 { + self.sub_expressions.extend(other); + self + } else { + let mut expressions = vec![self]; + expressions.extend(other); + Self { + conditions: vec![], + operator: op as i32, + sub_expressions: expressions, + } + } + } +} + impl pb::TxnCondition { /// Create a txn condition that checks if the `seq` matches. pub fn eq_seq(key: impl ToString, seq: u64) -> Self { @@ -77,6 +239,14 @@ impl pb::TxnCondition { target: Some(Target::KeysWithPrefix(count)), } } + + pub fn and(self, other: pb::TxnCondition) -> pb::BooleanExpression { + pb::BooleanExpression::from_conditions_and([self, other]) + } + + pub fn or(self, other: pb::TxnCondition) -> pb::BooleanExpression { + pb::BooleanExpression::from_conditions_or([self, other]) + } } impl pb::TxnOp { @@ -183,3 +353,122 @@ impl pb::TxnGetResponse { } } } + +impl pb::ConditionalOperation { + pub fn new( + expr: Option, + ops: impl IntoIterator, + ) -> Self { + Self { + predicate: expr, + operations: ops.into_iter().collect(), + } + } +} + +#[cfg(test)] +mod tests { + use crate::protobuf::BooleanExpression; + use crate::TxnCondition; + + #[test] + fn test_bool_expression() { + use BooleanExpression as Expr; + + let cond = |k: &str, seq| TxnCondition::eq_seq(k, seq); + + // from_conditions_and + let expr = Expr::from_conditions_and([cond("a", 1), cond("b", 2)]); + assert_eq!(expr.to_string(), "a == seq(1) AND b == seq(2)"); + + // from_conditions_or + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]); + assert_eq!(expr.to_string(), "a == seq(1) OR b == seq(2)"); + + // and_condition + { + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]).and(cond("c", 3)); + assert_eq!( + expr.to_string(), + "(a == seq(1) OR b == seq(2)) AND c == seq(3)" + ); + } + { + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]).and(cond("e", 5)); + assert_eq!( + expr.to_string(), + "(a == seq(1) OR b == seq(2)) AND e == seq(5)" + ); + } + + // or_condition + { + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]).or(cond("c", 3)); + assert_eq!( + expr.to_string(), + "a == seq(1) OR b == seq(2) OR c == seq(3)" + ); + } + { + let expr = Expr::from_conditions_and([cond("a", 1), cond("b", 2)]).or(cond("e", 5)); + assert_eq!( + expr.to_string(), + "(a == seq(1) AND b == seq(2)) OR e == seq(5)" + ); + } + + // and + { + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]) + .and(Expr::from_conditions_or([cond("c", 3), cond("d", 4)])); + assert_eq!( + expr.to_string(), + "(a == seq(1) OR b == seq(2)) AND (c == seq(3) OR d == seq(4))" + ); + } + // or + { + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]) + .or(Expr::from_conditions_or([cond("c", 3), cond("d", 4)])); + assert_eq!( + expr.to_string(), + "(c == seq(3) OR d == seq(4)) OR a == seq(1) OR b == seq(2)" + ); + } + // and_many + { + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]).and_many([ + Expr::from_conditions_or([cond("c", 3), cond("d", 4)]), + Expr::from_conditions_or([cond("e", 5), cond("f", 6)]), + ]); + assert_eq!( + expr.to_string(), + "(a == seq(1) OR b == seq(2)) AND (c == seq(3) OR d == seq(4)) AND (e == seq(5) OR f == seq(6))" + ); + } + // or_many + { + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]).or_many([ + Expr::from_conditions_or([cond("c", 3), cond("d", 4)]), + Expr::from_conditions_or([cond("e", 5), cond("f", 6)]), + ]); + assert_eq!( + expr.to_string(), + "(c == seq(3) OR d == seq(4)) OR (e == seq(5) OR f == seq(6)) OR a == seq(1) OR b == seq(2)" + ); + } + // complex + { + let expr = cond("a", 1) + .or(cond("b", 2)) + .and(cond("c", 3).or(cond("d", 4))) + .or(cond("e", 5) + .or(cond("f", 6)) + .and(cond("g", 7).or(cond("h", 8)))); + assert_eq!( + expr.to_string(), + "((a == seq(1) OR b == seq(2)) AND (c == seq(3) OR d == seq(4))) OR ((e == seq(5) OR f == seq(6)) AND (g == seq(7) OR h == seq(8)))" + ); + } + } +} diff --git a/src/meta/types/tests/it/main.rs b/src/meta/types/tests/it/main.rs index 6a905b2a019b..03e4fde3c92f 100644 --- a/src/meta/types/tests/it/main.rs +++ b/src/meta/types/tests/it/main.rs @@ -13,6 +13,7 @@ // limitations under the License. mod cluster; +mod txn_serde; #[test] fn test_bin_commit_version() -> anyhow::Result<()> { diff --git a/src/meta/types/tests/it/txn_serde.rs b/src/meta/types/tests/it/txn_serde.rs new file mode 100644 index 000000000000..ad9286522e7c --- /dev/null +++ b/src/meta/types/tests/it/txn_serde.rs @@ -0,0 +1,87 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use databend_common_meta_types::protobuf::BooleanExpression; +use databend_common_meta_types::TxnCondition; +use databend_common_meta_types::TxnOp; +use databend_common_meta_types::TxnRequest; + +#[test] +fn test_txn_request_serde() -> anyhow::Result<()> { + // Empty operations, with condition + let txn = TxnRequest::new(vec![TxnCondition::eq_value("k", b("v"))], vec![ + TxnOp::put_with_ttl("k", b("v"), Some(Duration::from_millis(100))), + ]); + let want = concat!( + r#"{"#, + r#""condition":[{"key":"k","expected":0,"target":{"Value":[118]}}],"#, + r#""if_then":[{"request":{"Put":{"key":"k","value":[118],"prev_value":true,"expire_at":null,"ttl_ms":100}}}],"#, + r#""else_then":[]"#, + r#"}"# + ); + assert_eq!(want, serde_json::to_string(&txn)?); + assert_eq!(txn, serde_json::from_str(want)?); + + // Only operations + + let txn = TxnRequest::default().push_branch( + Some(BooleanExpression::from_conditions_or([ + TxnCondition::eq_value("k", b("v")), + ])), + [TxnOp::get("k")], + ); + let want = r#"{ + "operations": [ + { + "predicate": { + "operator": 1, + "sub_expressions": [], + "conditions": [ + { + "key": "k", + "expected": 0, + "target": { + "Value": [ + 118 + ] + } + } + ] + }, + "operations": [ + { + "request": { + "Get": { + "key": "k" + } + } + } + ] + } + ], + "condition": [], + "if_then": [], + "else_then": [] +}"#; + assert_eq!(want, serde_json::to_string_pretty(&txn)?); + assert_eq!(txn, serde_json::from_str(want)?); + + Ok(()) +} + +fn b(x: impl ToString) -> Vec { + x.to_string().into_bytes() +} diff --git a/src/query/management/src/role/role_mgr.rs b/src/query/management/src/role/role_mgr.rs index c0a79e9395bc..43c4666bd4e4 100644 --- a/src/query/management/src/role/role_mgr.rs +++ b/src/query/management/src/role/role_mgr.rs @@ -311,11 +311,7 @@ impl RoleApi for RoleMgr { } if need_transfer { - let txn_req = TxnRequest { - condition: condition.clone(), - if_then: if_then.clone(), - else_then: vec![], - }; + let txn_req = TxnRequest::new(condition.clone(), if_then.clone()); let tx_reply = self.kv_api.transaction(txn_req.clone()).await?; let (succ, _) = txn_reply_to_api_result(tx_reply)?; debug!( @@ -378,11 +374,7 @@ impl RoleApi for RoleMgr { } } - let txn_req = TxnRequest { - condition: condition.clone(), - if_then: if_then.clone(), - else_then: vec![], - }; + let txn_req = TxnRequest::new(condition.clone(), if_then.clone()); let tx_reply = self.kv_api.transaction(txn_req.clone()).await?; let (succ, _) = txn_reply_to_api_result(tx_reply)?; @@ -467,11 +459,7 @@ impl RoleApi for RoleMgr { } } - let txn_req = TxnRequest { - condition: condition.clone(), - if_then: if_then.clone(), - else_then: vec![], - }; + let txn_req = TxnRequest::new(condition.clone(), if_then.clone()); let tx_reply = self.kv_api.transaction(txn_req.clone()).await?; let (succ, _) = txn_reply_to_api_result(tx_reply)?; diff --git a/src/query/management/src/stage/stage_mgr.rs b/src/query/management/src/stage/stage_mgr.rs index b9a85a3c365b..7f86a20e298f 100644 --- a/src/query/management/src/stage/stage_mgr.rs +++ b/src/query/management/src/stage/stage_mgr.rs @@ -144,14 +144,13 @@ impl StageApi for StageMgr { .collect(); dels.push(txn_op_del(&stage_ident)); - let txn_req = TxnRequest { - condition: vec![ + let txn_req = TxnRequest::new( + vec![ // stage is not change, prevent add file to stage txn_cond_eq_seq(&stage_ident, stage_seq), ], - if_then: dels, - else_then: vec![], - }; + dels, + ); let tx_reply = self.kv_api.transaction(txn_req).await?; let (succ, _) = txn_reply_to_api_result(tx_reply)?; @@ -195,14 +194,14 @@ impl StageApi for StageMgr { }; old_stage.number_of_files += 1; - let txn_req = TxnRequest { - condition: vec![ + let txn_req = TxnRequest::new( + vec![ // file does not exist txn_cond_seq(&file_ident, Eq, 0), // stage is not changed txn_cond_seq(&stage_ident, Eq, stage_seq), ], - if_then: vec![ + vec![ txn_op_put( &file_ident, serialize_struct(&file, ErrorCode::IllegalStageFileFormat, || "")?, @@ -212,8 +211,7 @@ impl StageApi for StageMgr { serialize_struct(&old_stage, ErrorCode::IllegalUserStageFormat, || "")?, ), ], - else_then: vec![], - }; + ); let tx_reply = self.kv_api.transaction(txn_req).await?; let (succ, _) = txn_reply_to_api_result(tx_reply)?; @@ -269,14 +267,13 @@ impl StageApi for StageMgr { serialize_struct(&old_stage, ErrorCode::IllegalUserStageFormat, || "")?, )); - let txn_req = TxnRequest { - condition: vec![ + let txn_req = TxnRequest::new( + vec![ // stage is not change txn_cond_seq(&stage_ident, Eq, stage_seq), ], if_then, - else_then: vec![], - }; + ); let tx_reply = self.kv_api.transaction(txn_req).await?; let (succ, _) = txn_reply_to_api_result(tx_reply)?;