Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii): token balances subscription #2831

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

Larkooo
Copy link
Collaborator

@Larkooo Larkooo commented Dec 20, 2024

#2819

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced subscription capabilities for tracking token balances via gRPC.
    • Added methods for clients to subscribe and update their token balance subscriptions.
    • Enhanced token balance management with improved error handling and event-driven updates.
    • Implemented a new service for managing token balance subscriptions and updates.
    • Added structures and methods for managing multiple subscribers in a thread-safe manner.
  • Bug Fixes

    • Improved error messages for fetching token balances.
  • Documentation

    • Updated documentation to reflect new subscription methods and message types.
  • Chores

    • Added a new module structure for better organization of token balance subscription functionalities.

Copy link

coderabbitai bot commented Dec 20, 2024

Walkthrough

Ohayo, sensei! This pull request introduces a comprehensive token balance subscription system for the Torii gRPC server. The changes span multiple files, focusing on enhancing token balance management through an event-driven architecture. The implementation enables clients to subscribe to token balance updates, with new methods in the gRPC service, a dedicated token balance manager, and modifications to the executor to publish balance changes.

Changes

File Change Summary
crates/torii/core/src/executor/erc.rs - Added SimpleBroker import
- Modified apply_balance_diff_helper to publish token balance updates
- Enhanced error handling for token balance operations
crates/torii/grpc/proto/world.proto - Added SubscribeTokenBalances RPC method
- Added UpdateTokenBalancesSubscription RPC method
- Introduced new message types for token balance subscriptions
crates/torii/grpc/src/server/mod.rs - Added subscribe_token_balances method
- Added update_token_balances_subscription method
- Integrated TokenBalanceManager
crates/torii/grpc/src/server/subscriptions/mod.rs - Added new token_balance module
crates/torii/grpc/src/server/subscriptions/token_balance.rs - New file with TokenBalanceSubscriber and TokenBalanceManager
- Implemented subscription management and update publishing
crates/torii/client/src/client/mod.rs - Added on_token_balance_updated method
- Added update_token_balance_subscription method
crates/torii/grpc/src/client.rs - Added subscribe_token_balances method
- Added update_token_balances_subscription method
- Introduced TokenBalanceStreaming type

Possibly related issues

Possibly related PRs

Suggested reviewers

  • kariy
  • glihm

Ohayo and happy coding, sensei! 🍵🥷


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
crates/torii/grpc/src/server/subscriptions/token_balance.rs (3)

52-58: Ohayo sensei! Sending an initial empty response might need explicit confirmation.
While this ensures the client receives a subscription ID immediately, you may wish to confirm with client code that they can handle an empty balance gracefully.


139-146: Ohayo sensei! Carefully validate address filters to prevent silent failures.
This logic will skip updates for addresses not present in sub.contract_addresses or sub.account_addresses. Consider adding callbacks or logs on skipped addresses to aid in debugging.


178-192: Ohayo sensei! Good concurrency handling, but watch for dropped broker messages.
The poll method streams token balances from the SimpleBroker to balance_sender. If balance_sender is ever overwhelmed or not polled promptly, messages can be dropped. Monitor whether an unbounded channel suits your throughput.

crates/torii/grpc/src/server/mod.rs (2)

1635-1651: Ohayo sensei! Great job on the subscribe_token_balances method.
The code properly translates the gRPC request into addresses and sets up the subscription. Just be cautious with large lists of addresses, as it could impact performance on insertion or lookups in your manager.


1653-1670: Ohayo sensei! updatable subscriptions look well thought out.
Remember to handle cases where the user attempts to update a subscription that does not exist or is closed. Possibly return a clearer error response if not found.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c0b8033 and 98c8aa5.

📒 Files selected for processing (5)
  • crates/torii/core/src/executor/erc.rs (2 hunks)
  • crates/torii/grpc/proto/world.proto (2 hunks)
  • crates/torii/grpc/src/server/mod.rs (10 hunks)
  • crates/torii/grpc/src/server/subscriptions/mod.rs (1 hunks)
  • crates/torii/grpc/src/server/subscriptions/token_balance.rs (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • crates/torii/grpc/src/server/subscriptions/mod.rs
🔇 Additional comments (7)
crates/torii/grpc/src/server/subscriptions/token_balance.rs (2)

164-167: Ohayo sensei! Confirm channel closure conditions.
When sub.sender.send(...) fails, the subscriber is removed. Ensure no unintentional edge cases occur if a temporary network issue or other ephemeral condition leads to a channel error.

✅ Verification successful

Let me gather more context about how these subscriptions are managed.


Let me check one more thing about the subscription cleanup mechanism.


Ohayo sensei! The channel closure handling is working as intended!

The current implementation is correct for these reasons:

  • The closed_stream mechanism is consistently implemented across all subscription types (events, models, token balance, etc.)
  • When a channel send fails, the subscriber is properly removed via subs.remove_subscriber(id).await
  • This is expected behavior for gRPC streaming, where client disconnections should lead to resource cleanup
  • The channel errors here represent permanent conditions (client disconnected or stream closed) rather than temporary network issues
  • Retry mechanisms in the codebase are implemented where needed (like in sync stages) but aren't applicable for subscription streams where the client has disconnected
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Searching the codebase to see how sub.sender is used elsewhere
rg -A 5 -B 5 'sub\.sender'

Length of output: 6402


Script:

#!/bin/bash
# Let's look for subscription management code
ast-grep --pattern 'struct $NAME {
  $$$
  sender: $TYPE,
  $$$
}'

# Also check how these subscriptions are created/initialized
rg -A 5 "Subscription.*new" 

Length of output: 4475


Script:

#!/bin/bash
# Let's see how subscriptions are cleaned up
rg -A 10 "closed_stream" 

# And check for any retry mechanisms
rg "retry|reconnect" --type rust

Length of output: 16615


48-49: Ohayo sensei! Consider ensuring ID uniqueness for subscribers.
Using a random u64 value as a subscription ID might result in collisions, which can lead to unexpected behavior or overwritten subscriptions in rare cases.

Here's a script to scan for references to the subscription_id and confirm no collisions:

crates/torii/core/src/executor/erc.rs (1)

18-20: Ohayo sensei! Excellent event-driven approach with SimpleBroker.
This addition allows for consistent publication and subscription to real-time token balance updates. Just be mindful of potential performance impacts if the throughput is large.

crates/torii/grpc/src/server/mod.rs (2)

37-37: Ohayo sensei! Adding TokenBalanceManager is a neat integration.
This new reference helps unify subscription logic in a single point. Ensure the manager is only created once and not repeatedly, to avoid duplication.


158-159: Ohayo sensei! Token balance subscription service launched.
Spawning this subscription Service ensures updates are always processed. Confirm robust error-handling in case the task fails unexpectedly.

✅ Verification successful

Ohayo sensei! Your token balance subscription service is well-protected!

The implementation includes robust error handling:

  • Errors during balance updates are properly logged: error!(target = LOG_TARGET, error = %e, "Processing balance update.")
  • Failed message deliveries are gracefully handled by removing closed streams
  • Parse errors are properly propagated using Result<(), Error>
  • The service continues running even if individual updates fail
  • Task failure is handled by Tokio's task system with automatic cleanup
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Searching references to token_balance::Service for potential error handling logic
rg 'token_balance::Service'

Length of output: 171


Script:

#!/bin/bash
# Let's check the implementation of token_balance::Service to see error handling
ast-grep --pattern 'impl Service {
  $$$
}'

# Also check the new method implementation
ast-grep --pattern 'fn new($$$) -> $$ {
  $$$
}'

# Let's also check for any error handling patterns in the module
rg -A 5 'error|Error|unwrap|expect|panic' crates/torii/grpc/src/server/subscriptions/token_balance.rs

Length of output: 35857

crates/torii/grpc/proto/world.proto (2)

37-42: Ohayo sensei! New RPC methods for token balances look consistent.
The introduction of SubscribeTokenBalances and UpdateTokenBalancesSubscription effectively complements the rest of the subscription architecture.


59-76: Ohayo sensei! These dedicated message types are well-structured.
clear fields for subscription_id, contract_addresses, and account_addresses. This fosters clarity and maintainability.

Comment on lines +163 to +177
let token_balance: TokenBalance = sqlx::query_as(&format!(
"INSERT OR REPLACE INTO {TOKEN_BALANCE_TABLE} (id, contract_address, account_address, \
token_id, balance) VALUES (?, ?, ?, ?, ?)",
token_id, balance) VALUES (?, ?, ?, ?, ?) RETURNING *",
))
.bind(id)
.bind(contract_address)
.bind(account_address)
.bind(token_id)
.bind(u256_to_sql_string(&balance))
.execute(&mut **tx)
.fetch_one(&mut **tx)
.await?;

debug!(target: LOG_TARGET, token_balance = ?token_balance, "Applied balance diff");
SimpleBroker::publish(token_balance);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo sensei! Watch out for concurrency with "INSERT OR REPLACE".
In high-concurrency environments, multiple updates could lead to race conditions where one subscriber's write might overwrite another's changes. Consider using transactions at a higher level or row-level locking.

- INSERT OR REPLACE INTO {TOKEN_BALANCE_TABLE} (...)
+ /* Evaluate if you need a more robust concurrency pattern, e.g. versioning or row locking */

Committable suggestion skipped: line range outside the PR's diff.

Copy link

codecov bot commented Dec 20, 2024

Codecov Report

Attention: Patch coverage is 10.75697% with 224 lines in your changes missing coverage. Please review.

Project coverage is 55.72%. Comparing base (c0b8033) to head (66f8a30).

Files with missing lines Patch % Lines
...rii/grpc/src/server/subscriptions/token_balance.rs 18.58% 92 Missing ⚠️
crates/torii/grpc/src/client.rs 0.00% 54 Missing ⚠️
crates/torii/grpc/src/server/mod.rs 11.53% 46 Missing ⚠️
crates/torii/client/src/client/mod.rs 0.00% 26 Missing ⚠️
crates/torii/core/src/executor/erc.rs 0.00% 6 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2831      +/-   ##
==========================================
- Coverage   55.92%   55.72%   -0.21%     
==========================================
  Files         439      440       +1     
  Lines       56156    56404     +248     
==========================================
+ Hits        31408    31431      +23     
- Misses      24748    24973     +225     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
crates/torii/grpc/src/server/mod.rs (2)

1068-1075: Consider adding input validation for addresses, sensei!

While the implementation is clean and follows the codebase patterns, it might be good to validate that at least one of contract_addresses or account_addresses is non-empty to avoid unnecessary subscriptions.

 async fn subscribe_token_balances(
     &self,
     contract_addresses: Vec<Felt>,
     account_addresses: Vec<Felt>,
 ) -> Result<Receiver<Result<proto::world::SubscribeTokenBalancesResponse, tonic::Status>>, Error>
 {
+    if contract_addresses.is_empty() && account_addresses.is_empty() {
+        return Err(Error::from(QueryError::MissingParam("At least one contract or account address required".into())));
+    }
     self.token_balance_manager.add_subscriber(contract_addresses, account_addresses).await
 }

1665-1687: The update implementation is consistent with other managers, sensei!

The update_token_balances_subscription method follows the same pattern as other subscription updates in the codebase. Consider adding the same input validation as suggested for the subscribe method.

 async fn update_token_balances_subscription(
     &self,
     request: Request<UpdateTokenBalancesSubscriptionRequest>,
 ) -> ServiceResult<()> {
     let UpdateTokenBalancesSubscriptionRequest {
         subscription_id,
         contract_addresses,
         account_addresses,
     } = request.into_inner();
+    if contract_addresses.is_empty() && account_addresses.is_empty() {
+        return Err(Status::invalid_argument("At least one contract or account address required"));
+    }
     let contract_addresses = contract_addresses
         .iter()
         .map(|address| Felt::from_bytes_be_slice(address))
         .collect::<Vec<_>>();
     let account_addresses = account_addresses
         .iter()
         .map(|address| Felt::from_bytes_be_slice(address))
         .collect::<Vec<_>>();

     self.token_balance_manager
         .update_subscriber(subscription_id, contract_addresses, account_addresses)
         .await;
     Ok(Response::new(()))
 }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 98c8aa5 and e51ce4d.

📒 Files selected for processing (1)
  • crates/torii/grpc/src/server/mod.rs (10 hunks)
🔇 Additional comments (2)
crates/torii/grpc/src/server/mod.rs (2)

128-128: Ohayo! The TokenBalanceManager integration looks good, sensei!

The TokenBalanceManager is properly integrated into the DojoWorld struct following the established pattern of other managers. The initialization and service spawning are implemented consistently.

Also applies to: 144-144, 163-165, 176-176


1529-1530: Stream type definition looks perfect, sensei!

The SubscribeTokenBalancesResponseStream type is properly defined and follows the same pattern as other stream types in the codebase. The implementation includes the necessary Send trait for async operations.

Also applies to: 1540-1540

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
crates/torii/grpc/src/server/subscriptions/token_balance.rs (3)

25-35: Consider enhancing documentation for empty set behavior, sensei!

While the comments mention that empty sets mean "all" items, this important behavior should be more prominently documented. Consider adding a dedicated doc comment section explaining this filtering behavior.

 #[derive(Debug)]
+/// Represents a subscriber to token balance updates with filtering capabilities.
+///
+/// # Filtering Behavior
+/// - Empty `contract_addresses`: Matches ALL contracts
+/// - Empty `account_addresses`: Matches ALL accounts
+/// - Non-empty sets: Only matches addresses present in the respective sets
 pub struct TokenBalanceSubscriber {

42-67: Consider adding collision handling for subscription IDs, sensei!

The random ID generation could theoretically result in collisions. Consider implementing a retry mechanism or using a guaranteed unique ID generation approach.

-        let subscription_id = rand::thread_rng().gen::<u64>();
+        let subscription_id = loop {
+            let id = rand::thread_rng().gen::<u64>();
+            if !self.subscribers.read().await.contains_key(&id) {
+                break id;
+            }
+        };

179-193: Consider implementing error recovery strategy, sensei!

The Future implementation only logs errors when sending balance updates. Consider implementing a retry mechanism or error recovery strategy to handle temporary failures.

     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
         let this = self.get_mut();
 
         while let Poll::Ready(Some(balance)) = this.simple_broker.poll_next_unpin(cx) {
-            if let Err(e) = this.balance_sender.send(balance) {
-                error!(target = LOG_TARGET, error = %e, "Sending balance update to processor.");
+            let mut retry_count = 0;
+            while let Err(e) = this.balance_sender.send(balance.clone()) {
+                if retry_count >= 3 {
+                    error!(target = LOG_TARGET, error = %e, "Failed to send balance update after retries.");
+                    break;
+                }
+                retry_count += 1;
+                std::thread::sleep(std::time::Duration::from_millis(100));
             }
         }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e51ce4d and 49d8a39.

📒 Files selected for processing (1)
  • crates/torii/grpc/src/server/subscriptions/token_balance.rs (1 hunks)
🔇 Additional comments (1)
crates/torii/grpc/src/server/subscriptions/token_balance.rs (1)

1-24: Ohayo! The imports and constants look well-organized, sensei!

The selection of dependencies shows good consideration for async operations, thread safety, and error handling.

Comment on lines +68 to +91
pub async fn update_subscriber(
&self,
id: u64,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) {
let sender = {
let subscribers = self.subscribers.read().await;
if let Some(subscriber) = subscribers.get(&id) {
subscriber.sender.clone()
} else {
return; // Subscriber not found, exit early
}
};

self.subscribers.write().await.insert(
id,
TokenBalanceSubscriber {
contract_addresses: contract_addresses.into_iter().collect(),
account_addresses: account_addresses.into_iter().collect(),
sender,
},
);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize sender handling in update_subscriber, sensei!

The current implementation acquires two locks and unnecessarily clones the sender. Consider refactoring to use a single write lock.

     pub async fn update_subscriber(
         &self,
         id: u64,
         contract_addresses: Vec<Felt>,
         account_addresses: Vec<Felt>,
     ) {
-        let sender = {
-            let subscribers = self.subscribers.read().await;
-            if let Some(subscriber) = subscribers.get(&id) {
-                subscriber.sender.clone()
-            } else {
-                return; // Subscriber not found, exit early
-            }
-        };
-
-        self.subscribers.write().await.insert(
-            id,
-            TokenBalanceSubscriber {
-                contract_addresses: contract_addresses.into_iter().collect(),
-                account_addresses: account_addresses.into_iter().collect(),
-                sender,
-            },
-        );
+        let mut subscribers = self.subscribers.write().await;
+        if let Some(subscriber) = subscribers.get_mut(&id) {
+            subscriber.contract_addresses = contract_addresses.into_iter().collect();
+            subscriber.account_addresses = account_addresses.into_iter().collect();
+        }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn update_subscriber(
&self,
id: u64,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) {
let sender = {
let subscribers = self.subscribers.read().await;
if let Some(subscriber) = subscribers.get(&id) {
subscriber.sender.clone()
} else {
return; // Subscriber not found, exit early
}
};
self.subscribers.write().await.insert(
id,
TokenBalanceSubscriber {
contract_addresses: contract_addresses.into_iter().collect(),
account_addresses: account_addresses.into_iter().collect(),
sender,
},
);
}
pub async fn update_subscriber(
&self,
id: u64,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) {
let mut subscribers = self.subscribers.write().await;
if let Some(subscriber) = subscribers.get_mut(&id) {
subscriber.contract_addresses = contract_addresses.into_iter().collect();
subscriber.account_addresses = account_addresses.into_iter().collect();
}
}

Comment on lines +129 to +176
async fn process_balance_update(
subs: &Arc<TokenBalanceManager>,
balance: &TokenBalance,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();

for (idx, sub) in subs.subscribers.read().await.iter() {
let contract_address =
Felt::from_str(&balance.contract_address).map_err(ParseError::FromStr)?;
let account_address =
Felt::from_str(&balance.account_address).map_err(ParseError::FromStr)?;

// Skip if contract address filter doesn't match
if !sub.contract_addresses.is_empty()
&& !sub.contract_addresses.contains(&contract_address)
{
continue;
}

// Skip if account address filter doesn't match
if !sub.account_addresses.is_empty()
&& !sub.account_addresses.contains(&account_address)
{
continue;
}

let resp = SubscribeTokenBalancesResponse {
subscription_id: *idx,
balance: Some(proto::types::TokenBalance {
contract_address: balance.contract_address.clone(),
account_address: balance.account_address.clone(),
token_id: balance.token_id.clone(),
balance: balance.balance.clone(),
}),
};

if sub.sender.send(Ok(resp)).await.is_err() {
closed_stream.push(*idx);
}
}

for id in closed_stream {
trace!(target = LOG_TARGET, id = %id, "Closing balance stream.");
subs.remove_subscriber(id).await
}

Ok(())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize balance update processing performance, sensei!

The current implementation parses addresses from strings for each subscriber. Consider parsing once before the loop.

     async fn process_balance_update(
         subs: &Arc<TokenBalanceManager>,
         balance: &TokenBalance,
     ) -> Result<(), Error> {
+        let contract_address = Felt::from_str(&balance.contract_address)
+            .map_err(ParseError::FromStr)?;
+        let account_address = Felt::from_str(&balance.account_address)
+            .map_err(ParseError::FromStr)?;
+
         let mut closed_stream = Vec::new();
 
         for (idx, sub) in subs.subscribers.read().await.iter() {
-            let contract_address =
-                Felt::from_str(&balance.contract_address).map_err(ParseError::FromStr)?;
-            let account_address =
-                Felt::from_str(&balance.account_address).map_err(ParseError::FromStr)?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn process_balance_update(
subs: &Arc<TokenBalanceManager>,
balance: &TokenBalance,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
for (idx, sub) in subs.subscribers.read().await.iter() {
let contract_address =
Felt::from_str(&balance.contract_address).map_err(ParseError::FromStr)?;
let account_address =
Felt::from_str(&balance.account_address).map_err(ParseError::FromStr)?;
// Skip if contract address filter doesn't match
if !sub.contract_addresses.is_empty()
&& !sub.contract_addresses.contains(&contract_address)
{
continue;
}
// Skip if account address filter doesn't match
if !sub.account_addresses.is_empty()
&& !sub.account_addresses.contains(&account_address)
{
continue;
}
let resp = SubscribeTokenBalancesResponse {
subscription_id: *idx,
balance: Some(proto::types::TokenBalance {
contract_address: balance.contract_address.clone(),
account_address: balance.account_address.clone(),
token_id: balance.token_id.clone(),
balance: balance.balance.clone(),
}),
};
if sub.sender.send(Ok(resp)).await.is_err() {
closed_stream.push(*idx);
}
}
for id in closed_stream {
trace!(target = LOG_TARGET, id = %id, "Closing balance stream.");
subs.remove_subscriber(id).await
}
Ok(())
}
async fn process_balance_update(
subs: &Arc<TokenBalanceManager>,
balance: &TokenBalance,
) -> Result<(), Error> {
let contract_address = Felt::from_str(&balance.contract_address)
.map_err(ParseError::FromStr)?;
let account_address = Felt::from_str(&balance.account_address)
.map_err(ParseError::FromStr)?;
let mut closed_stream = Vec::new();
for (idx, sub) in subs.subscribers.read().await.iter() {
// Skip if contract address filter doesn't match
if !sub.contract_addresses.is_empty()
&& !sub.contract_addresses.contains(&contract_address)
{
continue;
}
// Skip if account address filter doesn't match
if !sub.account_addresses.is_empty()
&& !sub.account_addresses.contains(&account_address)
{
continue;
}
let resp = SubscribeTokenBalancesResponse {
subscription_id: *idx,
balance: Some(proto::types::TokenBalance {
contract_address: balance.contract_address.clone(),
account_address: balance.account_address.clone(),
token_id: balance.token_id.clone(),
balance: balance.balance.clone(),
}),
};
if sub.sender.send(Ok(resp)).await.is_err() {
closed_stream.push(*idx);
}
}
for id in closed_stream {
trace!(target = LOG_TARGET, id = %id, "Closing balance stream.");
subs.remove_subscriber(id).await
}
Ok(())
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
crates/torii/client/src/client/mod.rs (1)

213-226: Consider adding error handling for empty balance updates.

Ohayo, sensei! The implementation looks good, but there's one potential improvement. The method should handle cases where the balance update stream might return empty or invalid responses.

Consider adding error handling:

 pub async fn on_token_balance_updated(
     &self,
     contract_addresses: Vec<Felt>,
     account_addresses: Vec<Felt>,
 ) -> Result<TokenBalanceStreaming, Error> {
     let mut grpc_client = self.inner.write().await;
+    if contract_addresses.is_empty() && account_addresses.is_empty() {
+        log::warn!("Subscribing to all token balance updates - this might be resource-intensive");
+    }
     let stream = grpc_client
         .subscribe_token_balances(contract_addresses, account_addresses)
         .await?;
     Ok(stream)
 }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 49d8a39 and 65f6604.

📒 Files selected for processing (2)
  • crates/torii/client/src/client/mod.rs (2 hunks)
  • crates/torii/grpc/src/client.rs (2 hunks)
🔇 Additional comments (5)
crates/torii/client/src/client/mod.rs (2)

13-13: LGTM! Clean import addition.

The import of TokenBalanceStreaming aligns well with the new token balance subscription functionality.


228-238: LGTM! Clean implementation of subscription updates.

The update method follows the same pattern as other subscription update methods in the codebase, maintaining consistency.

crates/torii/grpc/src/client.rs (3)

19-26: LGTM! Clean import organization.

The imports are well-organized and include all necessary types for token balance functionality.


329-352: LGTM! Clean implementation of subscription updates.

The update method follows the established patterns and properly handles Felt conversions.


355-370: LGTM! Clean stream implementation.

The token balance streaming implementation follows the same pattern as other streaming types in the codebase, maintaining consistency.

Comment on lines +302 to +327
/// Subscribe to token balances.
pub async fn subscribe_token_balances(
&mut self,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) -> Result<TokenBalanceStreaming, Error> {
let request = RetrieveTokenBalancesRequest {
contract_addresses: contract_addresses
.into_iter()
.map(|c| c.to_bytes_be().to_vec())
.collect(),
account_addresses: account_addresses
.into_iter()
.map(|a| a.to_bytes_be().to_vec())
.collect(),
};
let stream = self
.inner
.subscribe_token_balances(request)
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;
Ok(TokenBalanceStreaming(stream.map_ok(Box::new(|res| {
(res.subscription_id, res.balance.unwrap().try_into().expect("must able to serialize"))
}))))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding validation for the unwrap operation.

Ohayo, sensei! While the implementation is generally good, the unwrap operation on line 325 could be made more robust.

Consider adding proper error handling:

 Ok(TokenBalanceStreaming(stream.map_ok(Box::new(|res| {
-    (res.subscription_id, res.balance.unwrap().try_into().expect("must able to serialize"))
+    (
+        res.subscription_id,
+        res.balance
+            .ok_or_else(|| Error::Schema(SchemaError::MissingExpectedData("balance".to_string())))
+            .and_then(|b| b.try_into())
+            .expect("Failed to deserialize balance")
+    )
 }))))

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
crates/torii/client/src/client/mod.rs (2)

215-228: Ohayo! The subscription method implementation looks solid, sensei!

The implementation follows the established pattern and handles the gRPC client lock appropriately. Consider enhancing the documentation with an example usage to make it even more developer-friendly.

Add a usage example to the documentation:

     /// Subscribes to token balances updates.
     /// If no contract addresses are provided, it will subscribe to updates for all contract
     /// addresses. If no account addresses are provided, it will subscribe to updates for all
     /// account addresses.
+    ///
+    /// # Example
+    /// ```rust
+    /// let stream = client
+    ///     .on_token_balance_updated(
+    ///         vec![contract_address],
+    ///         vec![account_address]
+    ///     )
+    ///     .await?;
+    /// ```

230-246: Documentation needs enhancement, sensei!

While the implementation is solid, the method would benefit from more detailed documentation explaining:

  • The purpose and behavior of the subscription_id parameter
  • What happens when empty vectors are provided
  • Any potential error conditions

Add comprehensive documentation:

     /// Update the token balances subscription
+    ///
+    /// Updates an existing token balance subscription identified by the subscription_id.
+    /// Similar to on_token_balance_updated, empty contract_addresses or account_addresses
+    /// vectors will subscribe to all updates for that category.
+    ///
+    /// # Arguments
+    /// * `subscription_id` - The ID of the existing subscription to update
+    /// * `contract_addresses` - New list of contract addresses to monitor
+    /// * `account_addresses` - New list of account addresses to monitor
+    ///
+    /// # Errors
+    /// Returns an error if the subscription_id is invalid or if the gRPC call fails
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 65f6604 and 66f8a30.

📒 Files selected for processing (1)
  • crates/torii/client/src/client/mod.rs (2 hunks)
🔇 Additional comments (2)
crates/torii/client/src/client/mod.rs (2)

13-15: Ohayo! Import changes look good, sensei!

The TokenBalanceStreaming type is appropriately grouped with other streaming types from the same module.


214-246: Verify integration with gRPC server, sensei!

Let's ensure the subscription management is properly integrated with the gRPC server implementation.

✅ Verification successful

Ohayo! The token balance subscription implementation is properly integrated, sensei!

The verification shows that the implementation is correctly integrated with the gRPC server:

  • The proto definition in world.proto properly defines the SubscribeTokenBalances and UpdateTokenBalancesSubscription RPCs
  • The server implementation in crates/torii/grpc/src/server/mod.rs correctly implements these RPCs
  • The subscription management is handled in crates/torii/grpc/src/server/subscriptions/token_balance.rs
  • The client implementation in crates/torii/client/src/client/mod.rs properly connects to these endpoints
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the corresponding gRPC server implementation

# Check for the corresponding gRPC service definition
rg -l "service.*\{.*TokenBalance.*\}" -g "*.proto"

# Check for the server-side implementation of these methods
rg -l "impl.*World.*for.*Server" -g "*.rs"

Length of output: 97


Script:

#!/bin/bash
# Let's try a broader search for proto definitions and implementations

# Search for proto files
fd -e proto -x cat {}

# Search for token balance related implementations in Rust files
rg -l "token_balance" -g "*.rs"

# Search specifically for subscription-related code
rg "subscribe.*token.*balance" -g "*.rs" -i

Length of output: 31414

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant