Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/cmd console use indexer to query #469

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions src/cmd/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use db3_proto::db3_mutation_proto::{
CollectionMutation, DatabaseAction, DatabaseMutation, DocumentMask, DocumentMutation,
};
use db3_proto::db3_node_proto::NetworkStatus;
use db3_sdk::{faucet_sdk::FaucetSDK, mutation_sdk::MutationSDK, store_sdk::StoreSDK};
use db3_sdk::{
faucet_sdk::FaucetSDK, indexer_sdk::IndexerSDK, mutation_sdk::MutationSDK, store_sdk::StoreSDK,
};
use ethers::signers::LocalWallet;
use ethers::types::Address;
use http::Uri;
Expand All @@ -44,6 +46,7 @@ use tonic::transport::{ClientTlsConfig, Endpoint};
pub struct DB3ClientContext {
pub mutation_sdk: Option<MutationSDK>,
pub store_sdk: Option<StoreSDK>,
pub indexer_sdk: Option<IndexerSDK>,
}

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -560,7 +563,7 @@ impl DB3ClientCommand {
}
DB3ClientCommand::GetDocument { id } => {
match ctx
.store_sdk
.indexer_sdk
.as_mut()
.unwrap()
.get_document(id.as_str())
Expand Down Expand Up @@ -592,7 +595,7 @@ impl DB3ClientCommand {
limit,
};
match ctx
.store_sdk
.indexer_sdk
.as_mut()
.unwrap()
.run_query(addr.as_ref(), query)
Expand All @@ -604,7 +607,7 @@ impl DB3ClientCommand {
}
DB3ClientCommand::ShowCollection { addr } => {
match ctx
.store_sdk
.indexer_sdk
.as_mut()
.unwrap()
.get_database(addr.as_ref())
Expand All @@ -618,7 +621,7 @@ impl DB3ClientCommand {

DB3ClientCommand::ShowDB { addr } => {
match ctx
.store_sdk
.indexer_sdk
.as_mut()
.unwrap()
.get_database(addr.as_ref())
Expand All @@ -639,7 +642,7 @@ impl DB3ClientCommand {
Ok(ks) => {
let owner = ks.get_address().unwrap();
match ctx
.store_sdk
.indexer_sdk
.as_mut()
.unwrap()
.get_my_database(owner.to_hex().as_str())
Expand All @@ -655,7 +658,13 @@ impl DB3ClientCommand {
),
}
} else {
match ctx.store_sdk.as_mut().unwrap().get_my_database(&addr).await {
match ctx
.indexer_sdk
.as_mut()
.unwrap()
.get_my_database(&addr)
.await
{
Ok(databases) => Self::show_database(databases.as_ref()),
Err(_) => Err("fail to get account".to_string()),
}
Expand Down
63 changes: 51 additions & 12 deletions src/node/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ use db3_faucet::{
};
use db3_proto::db3_event_proto::{EventMessage, Subscription};
use db3_proto::db3_faucet_proto::faucet_node_server::FaucetNodeServer;
use db3_proto::db3_indexer_proto::indexer_node_client::IndexerNodeClient;
use db3_proto::db3_indexer_proto::indexer_node_server::IndexerNodeServer;
use db3_proto::db3_node_proto::storage_node_client::StorageNodeClient;
use db3_proto::db3_node_proto::storage_node_server::StorageNodeServer;
use db3_sdk::indexer_sdk::IndexerSDK;
use db3_sdk::mutation_sdk::MutationSDK;
use db3_sdk::store_sdk::StoreSDK;
use db3_storage::event_store::EventStore;
Expand All @@ -61,7 +63,7 @@ use tendermint_abci::ServerBuilder;
use tendermint_rpc::HttpClient;
use tokio::sync::mpsc::Sender;
use tonic::codegen::http::Method;
use tonic::transport::{ClientTlsConfig, Endpoint, Server};
use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Server};
use tonic::Status;
use tower_http::cors::{Any, CorsLayer};
use tracing::{info, warn};
Expand Down Expand Up @@ -123,9 +125,16 @@ pub enum DB3Command {
/// Start db3 interactive console
#[clap(name = "console")]
Console {
/// the url of db3 grpc api
/// the url of db3 grpc api for database write
#[clap(long = "url", global = true, default_value = "http://127.0.0.1:26659")]
public_grpc_url: String,
/// the url of indexer grpc api for database read
#[clap(
long = "indexer_url",
global = true,
default_value = "http://127.0.0.1:26639"
)]
indexer_public_grpc_url: String,
},

/// Start db3 indexer
Expand Down Expand Up @@ -160,6 +169,13 @@ pub enum DB3Command {
/// the url of db3 grpc api
#[clap(long = "url", global = true, default_value = "http://127.0.0.1:26659")]
public_grpc_url: String,
/// the url of db3 grpc api
#[clap(
long = "indexer_url",
global = true,
default_value = "http://127.0.0.1:26639"
)]
indexer_public_grpc_url: String,
/// the subcommand
#[clap(subcommand)]
cmd: Option<DB3ClientCommand>,
Expand Down Expand Up @@ -245,9 +261,9 @@ pub enum DB3Command {
}

impl DB3Command {
fn build_context(public_grpc_url: &str) -> DB3ClientContext {
fn create_endpoint(public_grpc_url: &str) -> Endpoint {
let uri = public_grpc_url.parse::<Uri>().unwrap();
let endpoint = match uri.scheme_str() == Some("https") {
match uri.scheme_str() == Some("https") {
true => {
let rpc_endpoint = Endpoint::new(public_grpc_url.to_string())
.unwrap()
Expand All @@ -259,8 +275,10 @@ impl DB3Command {
let rpc_endpoint = Endpoint::new(public_grpc_url.to_string()).unwrap();
rpc_endpoint
}
};
let channel = endpoint.connect_lazy();
}
}
fn build_context(public_grpc_url: &str, indexer_grpc_url: Option<&str>) -> DB3ClientContext {
let channel = Self::create_endpoint(public_grpc_url).connect_lazy();
let node = Arc::new(StorageNodeClient::new(channel));
if !db3_cmd::keystore::KeyStore::has_key(None) {
db3_cmd::keystore::KeyStore::recover_keypair(None).unwrap();
Expand All @@ -271,9 +289,19 @@ impl DB3Command {
let kp = db3_cmd::keystore::KeyStore::get_keypair(None).unwrap();
let signer = Db3MultiSchemeSigner::new(kp);
let store_sdk = StoreSDK::new(node, signer, true);

let indexer_sdk = match indexer_grpc_url {
Some(url) => {
let channel = Self::create_endpoint(url).connect_lazy();
let node = Arc::new(IndexerNodeClient::new(channel));
Some(IndexerSDK::new(node))
}
None => None,
};
DB3ClientContext {
mutation_sdk: Some(mutation_sdk),
store_sdk: Some(store_sdk),
indexer_sdk,
}
}

Expand Down Expand Up @@ -395,7 +423,7 @@ impl DB3Command {
.block_on(async { watcher.start(sender).await })
.unwrap();
});
let ctx = Self::build_context(db3_storage_grpc_url.as_ref());
let ctx = Self::build_context(db3_storage_grpc_url.as_ref(), None);
let sdk = ctx.mutation_sdk.unwrap();
let minter = StorageChainMinter::new(db, sdk);
minter.start(receiver).await.unwrap();
Expand All @@ -417,8 +445,14 @@ impl DB3Command {
}
}

DB3Command::Console { public_grpc_url } => {
let ctx = Self::build_context(public_grpc_url.as_ref());
DB3Command::Console {
public_grpc_url,
indexer_public_grpc_url,
} => {
let ctx = Self::build_context(
public_grpc_url.as_ref(),
Some(indexer_public_grpc_url.as_ref()),
);
db3_cmd::console::start_console(ctx, &mut stdout(), &mut stderr())
.await
.unwrap();
Expand All @@ -443,7 +477,7 @@ impl DB3Command {
tracing_subscriber::fmt().with_max_level(log_level).init();
info!("{ABOUT}");

let ctx = Self::build_context(db3_storage_grpc_url.as_ref());
let ctx = Self::build_context(db3_storage_grpc_url.as_ref(), None);

let opts = Merk::default_db_opts();
let merk = Merk::open_opt(&db_path, opts, db_tree_level_in_memory).unwrap();
Expand Down Expand Up @@ -488,8 +522,12 @@ impl DB3Command {
DB3Command::Client {
cmd,
public_grpc_url,
indexer_public_grpc_url,
} => {
let mut ctx = Self::build_context(public_grpc_url.as_ref());
let mut ctx = Self::build_context(
public_grpc_url.as_ref(),
Some(indexer_public_grpc_url.as_ref()),
);
if let Some(c) = cmd {
match c.execute(&mut ctx).await {
Ok(table) => table.printstd(),
Expand Down Expand Up @@ -688,7 +726,8 @@ mod tests {

#[tokio::test]
async fn client_cmd_smoke_test() {
let mut ctx = DB3Command::build_context("http://127.0.0.1:26659");
let mut ctx =
DB3Command::build_context("http://127.0.0.1:26659", Some("http://127.0.0.1:26639"));
let cmd = DB3ClientCommand::Init {};
if let Ok(_) = cmd.execute(&mut ctx).await {
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/sdk/src/indexer_sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ mod tests {
let result = msdk.submit_database_mutation(&cm).await;
assert!(result.is_ok());
std::thread::sleep(sleep_seconds);
let (addr, signer) = sdk_test::gen_secp256k1_signer(counter);
let (addr, _) = sdk_test::gen_secp256k1_signer(counter);
let mut sdk = IndexerSDK::new(indexer_client.clone());
let my_dbs = sdk.get_my_database(addr1.to_hex().as_str()).await.unwrap();
assert_eq!(true, my_dbs.len() > 0);
Expand Down
5 changes: 5 additions & 0 deletions tools/start_localnet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ if [ -e ./bridge.db ]
then
rm bridge.db
fi
# clean indexer
if [ -e ./indexer ]
then
rm -rf indexer
fi
echo "start db3 node..."
./tendermint init > tm.log 2>&1
export RUST_BACKTRACE=1
Expand Down