Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii): token balances subscription #2831

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use tokio::sync::RwLock as AsyncRwLock;
use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming};
use torii_grpc::client::{
EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming, TokenBalanceStreaming,
};
use torii_grpc::proto::world::{
RetrieveEntitiesResponse, RetrieveEventsResponse, RetrieveTokenBalancesResponse,
RetrieveTokensResponse,
Expand Down Expand Up @@ -209,4 +211,37 @@
.await?;
Ok(stream)
}

/// Subscribes to token balances updates.
/// If no contract addresses are provided, it will subscribe to updates for all contract
/// addresses. If no account addresses are provided, it will subscribe to updates for all
/// account addresses.
pub async fn on_token_balance_updated(
&self,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) -> Result<TokenBalanceStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream =
grpc_client.subscribe_token_balances(contract_addresses, account_addresses).await?;
Ok(stream)
}

Check warning on line 228 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L219-L228

Added lines #L219 - L228 were not covered by tests

/// Update the token balances subscription
pub async fn update_token_balance_subscription(
&self,
subscription_id: u64,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) -> Result<(), Error> {
let mut grpc_client = self.inner.write().await;
grpc_client
.update_token_balances_subscription(
subscription_id,
contract_addresses,
account_addresses,
)
.await?;
Ok(())
}

Check warning on line 246 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L231-L246

Added lines #L231 - L246 were not covered by tests
}
12 changes: 8 additions & 4 deletions crates/torii/core/src/executor/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
use super::{ApplyBalanceDiffQuery, Executor};
use crate::constants::{IPFS_CLIENT_MAX_RETRY, SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE};
use crate::executor::LOG_TARGET;
use crate::simple_broker::SimpleBroker;
use crate::sql::utils::{felt_to_sql_string, sql_string_to_u256, u256_to_sql_string, I256};
use crate::types::ContractType;
use crate::types::{ContractType, TokenBalance};
use crate::utils::fetch_content_from_ipfs;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -159,18 +160,21 @@
}

// write the new balance to the database
sqlx::query(&format!(
let token_balance: TokenBalance = sqlx::query_as(&format!(

Check warning on line 163 in crates/torii/core/src/executor/erc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/erc.rs#L163

Added line #L163 was not covered by tests
"INSERT OR REPLACE INTO {TOKEN_BALANCE_TABLE} (id, contract_address, account_address, \
token_id, balance) VALUES (?, ?, ?, ?, ?)",
token_id, balance) VALUES (?, ?, ?, ?, ?) RETURNING *",

Check warning on line 165 in crates/torii/core/src/executor/erc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/erc.rs#L165

Added line #L165 was not covered by tests
))
.bind(id)
.bind(contract_address)
.bind(account_address)
.bind(token_id)
.bind(u256_to_sql_string(&balance))
.execute(&mut **tx)
.fetch_one(&mut **tx)

Check warning on line 172 in crates/torii/core/src/executor/erc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/erc.rs#L172

Added line #L172 was not covered by tests
.await?;

debug!(target: LOG_TARGET, token_balance = ?token_balance, "Applied balance diff");
SimpleBroker::publish(token_balance);

Check warning on line 177 in crates/torii/core/src/executor/erc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/erc.rs#L175-L177

Added lines #L175 - L177 were not covered by tests
Comment on lines +163 to +177
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo sensei! Watch out for concurrency with "INSERT OR REPLACE".
In high-concurrency environments, multiple updates could lead to race conditions where one subscriber's write might overwrite another's changes. Consider using transactions at a higher level or row-level locking.

- INSERT OR REPLACE INTO {TOKEN_BALANCE_TABLE} (...)
+ /* Evaluate if you need a more robust concurrency pattern, e.g. versioning or row locking */

Committable suggestion skipped: line range outside the PR's diff.

Ok(())
}

Expand Down
24 changes: 24 additions & 0 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ service World {
// Update entity subscription
rpc UpdateEventMessagesSubscription (UpdateEventMessagesSubscriptionRequest) returns (google.protobuf.Empty);

// Subscribe to token balance updates.
rpc SubscribeTokenBalances (RetrieveTokenBalancesRequest) returns (stream SubscribeTokenBalancesResponse);

// Update token balance subscription
rpc UpdateTokenBalancesSubscription (UpdateTokenBalancesSubscriptionRequest) returns (google.protobuf.Empty);

// Retrieve entities
rpc RetrieveEventMessages (RetrieveEventMessagesRequest) returns (RetrieveEntitiesResponse);

Expand All @@ -50,6 +56,24 @@ service World {
rpc RetrieveTokenBalances (RetrieveTokenBalancesRequest) returns (RetrieveTokenBalancesResponse);
}

// A request to update a token balance subscription
message UpdateTokenBalancesSubscriptionRequest {
// The subscription ID
uint64 subscription_id = 1;
// The list of contract addresses to subscribe to
repeated bytes contract_addresses = 2;
// The list of account addresses to subscribe to
repeated bytes account_addresses = 3;
}

// A response containing token balances
message SubscribeTokenBalancesResponse {
// The subscription ID
uint64 subscription_id = 1;
// The token balance
types.TokenBalance balance = 2;
}

// A request to retrieve tokens
message RetrieveTokensRequest {
// The list of contract addresses to retrieve tokens for
Expand Down
77 changes: 75 additions & 2 deletions crates/torii/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest,
SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest,
SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse,
UpdateEntitiesSubscriptionRequest, UpdateEventMessagesSubscriptionRequest,
SubscribeTokenBalancesResponse, UpdateEntitiesSubscriptionRequest,
UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest,
WorldMetadataRequest,
};
use crate::types::schema::{Entity, SchemaError};
use crate::types::{EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query};
use crate::types::{
EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query, TokenBalance,
};

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -295,6 +298,76 @@
None => empty_state_update(),
}))))
}

/// Subscribe to token balances.
pub async fn subscribe_token_balances(
&mut self,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) -> Result<TokenBalanceStreaming, Error> {
let request = RetrieveTokenBalancesRequest {
contract_addresses: contract_addresses
.into_iter()
.map(|c| c.to_bytes_be().to_vec())
.collect(),
account_addresses: account_addresses
.into_iter()
.map(|a| a.to_bytes_be().to_vec())
.collect(),
};
let stream = self
.inner
.subscribe_token_balances(request)
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;
Ok(TokenBalanceStreaming(stream.map_ok(Box::new(|res| {
(res.subscription_id, res.balance.unwrap().try_into().expect("must able to serialize"))
}))))
}

Check warning on line 327 in crates/torii/grpc/src/client.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/client.rs#L303-L327

Added lines #L303 - L327 were not covered by tests
Comment on lines +302 to +327
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding validation for the unwrap operation.

Ohayo, sensei! While the implementation is generally good, the unwrap operation on line 325 could be made more robust.

Consider adding proper error handling:

 Ok(TokenBalanceStreaming(stream.map_ok(Box::new(|res| {
-    (res.subscription_id, res.balance.unwrap().try_into().expect("must able to serialize"))
+    (
+        res.subscription_id,
+        res.balance
+            .ok_or_else(|| Error::Schema(SchemaError::MissingExpectedData("balance".to_string())))
+            .and_then(|b| b.try_into())
+            .expect("Failed to deserialize balance")
+    )
 }))))

Committable suggestion skipped: line range outside the PR's diff.


/// Update a token balances subscription.
pub async fn update_token_balances_subscription(
&mut self,
subscription_id: u64,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) -> Result<(), Error> {
let request = UpdateTokenBalancesSubscriptionRequest {
subscription_id,
contract_addresses: contract_addresses
.into_iter()
.map(|c| c.to_bytes_be().to_vec())
.collect(),
account_addresses: account_addresses
.into_iter()
.map(|a| a.to_bytes_be().to_vec())
.collect(),
};
self.inner
.update_token_balances_subscription(request)
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())
}

Check warning on line 352 in crates/torii/grpc/src/client.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/client.rs#L330-L352

Added lines #L330 - L352 were not covered by tests
}

type TokenBalanceMappedStream = MapOk<
tonic::Streaming<SubscribeTokenBalancesResponse>,
Box<dyn Fn(SubscribeTokenBalancesResponse) -> (SubscriptionId, TokenBalance) + Send>,
>;

#[derive(Debug)]
pub struct TokenBalanceStreaming(TokenBalanceMappedStream);

impl Stream for TokenBalanceStreaming {
type Item = <TokenBalanceMappedStream as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}

Check warning on line 370 in crates/torii/grpc/src/client.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/client.rs#L365-L370

Added lines #L365 - L370 were not covered by tests
}

type ModelDiffMappedStream = MapOk<
Expand Down
69 changes: 68 additions & 1 deletion crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
use starknet::providers::JsonRpcClient;
use subscriptions::event::EventManager;
use subscriptions::indexer::IndexerManager;
use subscriptions::token_balance::TokenBalanceManager;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{channel, Receiver};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
Expand All @@ -59,7 +60,8 @@
RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse,
SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest,
SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse,
UpdateEventMessagesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse,
SubscribeTokenBalancesResponse, UpdateEventMessagesSubscriptionRequest,
UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse,
};
use crate::proto::{self};
use crate::types::schema::SchemaError;
Expand Down Expand Up @@ -123,6 +125,7 @@
event_manager: Arc<EventManager>,
state_diff_manager: Arc<StateDiffManager>,
indexer_manager: Arc<IndexerManager>,
token_balance_manager: Arc<TokenBalanceManager>,
}

impl DojoWorld {
Expand All @@ -138,6 +141,7 @@
let event_manager = Arc::new(EventManager::default());
let state_diff_manager = Arc::new(StateDiffManager::default());
let indexer_manager = Arc::new(IndexerManager::default());
let token_balance_manager = Arc::new(TokenBalanceManager::default());

tokio::task::spawn(subscriptions::model_diff::Service::new_with_block_rcv(
block_rx,
Expand All @@ -156,6 +160,10 @@

tokio::task::spawn(subscriptions::indexer::Service::new(Arc::clone(&indexer_manager)));

tokio::task::spawn(subscriptions::token_balance::Service::new(Arc::clone(
&token_balance_manager,
)));

Self {
pool,
world_address,
Expand All @@ -165,6 +173,7 @@
event_manager,
state_diff_manager,
indexer_manager,
token_balance_manager,
}
}
}
Expand Down Expand Up @@ -1056,6 +1065,15 @@
Ok(RetrieveTokenBalancesResponse { balances })
}

async fn subscribe_token_balances(
&self,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) -> Result<Receiver<Result<proto::world::SubscribeTokenBalancesResponse, tonic::Status>>, Error>
{
self.token_balance_manager.add_subscriber(contract_addresses, account_addresses).await
}

Check warning on line 1075 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1068-L1075

Added lines #L1068 - L1075 were not covered by tests

async fn subscribe_indexer(
&self,
contract_address: Felt,
Expand Down Expand Up @@ -1508,6 +1526,8 @@
Pin<Box<dyn Stream<Item = Result<SubscribeIndexerResponse, Status>> + Send>>;
type RetrieveEntitiesStreamingResponseStream =
Pin<Box<dyn Stream<Item = Result<RetrieveEntitiesStreamingResponse, Status>> + Send>>;
type SubscribeTokenBalancesResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeTokenBalancesResponse, Status>> + Send>>;

#[tonic::async_trait]
impl proto::world::world_server::World for DojoWorld {
Expand All @@ -1517,6 +1537,7 @@
type SubscribeEventsStream = SubscribeEventsResponseStream;
type SubscribeIndexerStream = SubscribeIndexerResponseStream;
type RetrieveEntitiesStreamingStream = RetrieveEntitiesStreamingResponseStream;
type SubscribeTokenBalancesStream = SubscribeTokenBalancesResponseStream;

async fn world_metadata(
&self,
Expand Down Expand Up @@ -1619,6 +1640,52 @@
Ok(Response::new(()))
}

async fn subscribe_token_balances(
&self,
request: Request<RetrieveTokenBalancesRequest>,
) -> ServiceResult<Self::SubscribeTokenBalancesStream> {
let RetrieveTokenBalancesRequest { contract_addresses, account_addresses } =
request.into_inner();
let contract_addresses = contract_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();
let account_addresses = account_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();

let rx = self
.subscribe_token_balances(contract_addresses, account_addresses)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeTokenBalancesStream))
}

Check warning on line 1663 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1646-L1663

Added lines #L1646 - L1663 were not covered by tests

async fn update_token_balances_subscription(
&self,
request: Request<UpdateTokenBalancesSubscriptionRequest>,
) -> ServiceResult<()> {
let UpdateTokenBalancesSubscriptionRequest {
subscription_id,
contract_addresses,
account_addresses,
} = request.into_inner();
let contract_addresses = contract_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();
let account_addresses = account_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();

self.token_balance_manager
.update_subscriber(subscription_id, contract_addresses, account_addresses)
.await;
Ok(Response::new(()))
}

Check warning on line 1687 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1668-L1687

Added lines #L1668 - L1687 were not covered by tests

async fn retrieve_entities(
&self,
request: Request<RetrieveEntitiesRequest>,
Expand Down
1 change: 1 addition & 0 deletions crates/torii/grpc/src/server/subscriptions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod event;
pub mod event_message;
pub mod indexer;
pub mod model_diff;
pub mod token_balance;

pub(crate) fn match_entity_keys(
id: Felt,
Expand Down
Loading
Loading