Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii): token balances subscription #2831

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions crates/torii/core/src/executor/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use tracing::{debug, trace, warn};
use super::{ApplyBalanceDiffQuery, Executor};
use crate::constants::{IPFS_CLIENT_MAX_RETRY, SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE};
use crate::executor::LOG_TARGET;
use crate::simple_broker::SimpleBroker;
use crate::sql::utils::{felt_to_sql_string, sql_string_to_u256, u256_to_sql_string, I256};
use crate::types::ContractType;
use crate::types::{ContractType, TokenBalance};
use crate::utils::fetch_content_from_ipfs;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -159,18 +160,21 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
}

// write the new balance to the database
sqlx::query(&format!(
let token_balance: TokenBalance = sqlx::query_as(&format!(
"INSERT OR REPLACE INTO {TOKEN_BALANCE_TABLE} (id, contract_address, account_address, \
token_id, balance) VALUES (?, ?, ?, ?, ?)",
token_id, balance) VALUES (?, ?, ?, ?, ?) RETURNING *",
))
.bind(id)
.bind(contract_address)
.bind(account_address)
.bind(token_id)
.bind(u256_to_sql_string(&balance))
.execute(&mut **tx)
.fetch_one(&mut **tx)
.await?;

debug!(target: LOG_TARGET, token_balance = ?token_balance, "Applied balance diff");
SimpleBroker::publish(token_balance);

Comment on lines +163 to +177
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo sensei! Watch out for concurrency with "INSERT OR REPLACE".
In high-concurrency environments, multiple updates could lead to race conditions where one subscriber's write might overwrite another's changes. Consider using transactions at a higher level or row-level locking.

- INSERT OR REPLACE INTO {TOKEN_BALANCE_TABLE} (...)
+ /* Evaluate if you need a more robust concurrency pattern, e.g. versioning or row locking */

Committable suggestion skipped: line range outside the PR's diff.

Ok(())
}

Expand Down
24 changes: 24 additions & 0 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ service World {
// Update entity subscription
rpc UpdateEventMessagesSubscription (UpdateEventMessagesSubscriptionRequest) returns (google.protobuf.Empty);

// Subscribe to token balance updates.
rpc SubscribeTokenBalances (RetrieveTokenBalancesRequest) returns (stream SubscribeTokenBalancesResponse);

// Update token balance subscription
rpc UpdateTokenBalancesSubscription (UpdateTokenBalancesSubscriptionRequest) returns (google.protobuf.Empty);

// Retrieve entities
rpc RetrieveEventMessages (RetrieveEventMessagesRequest) returns (RetrieveEntitiesResponse);

Expand All @@ -50,6 +56,24 @@ service World {
rpc RetrieveTokenBalances (RetrieveTokenBalancesRequest) returns (RetrieveTokenBalancesResponse);
}

// A request to update a token balance subscription
message UpdateTokenBalancesSubscriptionRequest {
// The subscription ID
uint64 subscription_id = 1;
// The list of contract addresses to subscribe to
repeated bytes contract_addresses = 2;
// The list of account addresses to subscribe to
repeated bytes account_addresses = 3;
}

// A response containing token balances
message SubscribeTokenBalancesResponse {
// The subscription ID
uint64 subscription_id = 1;
// The token balance
types.TokenBalance balance = 2;
}

// A request to retrieve tokens
message RetrieveTokensRequest {
// The list of contract addresses to retrieve tokens for
Expand Down
69 changes: 68 additions & 1 deletion crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use subscriptions::event::EventManager;
use subscriptions::indexer::IndexerManager;
use subscriptions::token_balance::TokenBalanceManager;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{channel, Receiver};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
Expand All @@ -59,7 +60,8 @@ use crate::proto::world::{
RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse,
SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest,
SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse,
UpdateEventMessagesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse,
SubscribeTokenBalancesResponse, UpdateEventMessagesSubscriptionRequest,
UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse,
};
use crate::proto::{self};
use crate::types::schema::SchemaError;
Expand Down Expand Up @@ -123,6 +125,7 @@ pub struct DojoWorld {
event_manager: Arc<EventManager>,
state_diff_manager: Arc<StateDiffManager>,
indexer_manager: Arc<IndexerManager>,
token_balance_manager: Arc<TokenBalanceManager>,
}

impl DojoWorld {
Expand All @@ -138,6 +141,7 @@ impl DojoWorld {
let event_manager = Arc::new(EventManager::default());
let state_diff_manager = Arc::new(StateDiffManager::default());
let indexer_manager = Arc::new(IndexerManager::default());
let token_balance_manager = Arc::new(TokenBalanceManager::default());

tokio::task::spawn(subscriptions::model_diff::Service::new_with_block_rcv(
block_rx,
Expand All @@ -156,6 +160,10 @@ impl DojoWorld {

tokio::task::spawn(subscriptions::indexer::Service::new(Arc::clone(&indexer_manager)));

tokio::task::spawn(subscriptions::token_balance::Service::new(Arc::clone(
&token_balance_manager,
)));

Self {
pool,
world_address,
Expand All @@ -165,6 +173,7 @@ impl DojoWorld {
event_manager,
state_diff_manager,
indexer_manager,
token_balance_manager,
}
}
}
Expand Down Expand Up @@ -1056,6 +1065,15 @@ impl DojoWorld {
Ok(RetrieveTokenBalancesResponse { balances })
}

async fn subscribe_token_balances(
&self,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) -> Result<Receiver<Result<proto::world::SubscribeTokenBalancesResponse, tonic::Status>>, Error>
{
self.token_balance_manager.add_subscriber(contract_addresses, account_addresses).await
}

async fn subscribe_indexer(
&self,
contract_address: Felt,
Expand Down Expand Up @@ -1508,6 +1526,8 @@ type SubscribeIndexerResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeIndexerResponse, Status>> + Send>>;
type RetrieveEntitiesStreamingResponseStream =
Pin<Box<dyn Stream<Item = Result<RetrieveEntitiesStreamingResponse, Status>> + Send>>;
type SubscribeTokenBalancesResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeTokenBalancesResponse, Status>> + Send>>;

#[tonic::async_trait]
impl proto::world::world_server::World for DojoWorld {
Expand All @@ -1517,6 +1537,7 @@ impl proto::world::world_server::World for DojoWorld {
type SubscribeEventsStream = SubscribeEventsResponseStream;
type SubscribeIndexerStream = SubscribeIndexerResponseStream;
type RetrieveEntitiesStreamingStream = RetrieveEntitiesStreamingResponseStream;
type SubscribeTokenBalancesStream = SubscribeTokenBalancesResponseStream;

async fn world_metadata(
&self,
Expand Down Expand Up @@ -1619,6 +1640,52 @@ impl proto::world::world_server::World for DojoWorld {
Ok(Response::new(()))
}

async fn subscribe_token_balances(
&self,
request: Request<RetrieveTokenBalancesRequest>,
) -> ServiceResult<Self::SubscribeTokenBalancesStream> {
let RetrieveTokenBalancesRequest { contract_addresses, account_addresses } =
request.into_inner();
let contract_addresses = contract_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();
let account_addresses = account_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();

let rx = self
.subscribe_token_balances(contract_addresses, account_addresses)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeTokenBalancesStream))
}

async fn update_token_balances_subscription(
&self,
request: Request<UpdateTokenBalancesSubscriptionRequest>,
) -> ServiceResult<()> {
let UpdateTokenBalancesSubscriptionRequest {
subscription_id,
contract_addresses,
account_addresses,
} = request.into_inner();
let contract_addresses = contract_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();
let account_addresses = account_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();

self.token_balance_manager
.update_subscriber(subscription_id, contract_addresses, account_addresses)
.await;
Ok(Response::new(()))
}

async fn retrieve_entities(
&self,
request: Request<RetrieveEntitiesRequest>,
Expand Down
1 change: 1 addition & 0 deletions crates/torii/grpc/src/server/subscriptions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod event;
pub mod event_message;
pub mod indexer;
pub mod model_diff;
pub mod token_balance;

pub(crate) fn match_entity_keys(
id: Felt,
Expand Down
Loading
Loading