Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve rpc subscription handler #819

Merged
merged 4 commits into from
May 10, 2024
Merged

feat: improve rpc subscription handler #819

merged 4 commits into from
May 10, 2024

Conversation

dinhani-cw
Copy link
Contributor

No description provided.

@dinhani-cw dinhani-cw requested a review from a team as a code owner May 10, 2024 12:04
@dinhani-cw dinhani-cw enabled auto-merge (squash) May 10, 2024 12:05
Copy link

PR Review 🔍

⏱️ Estimated effort to review [1-5]

3, due to the complexity of the changes in the RPC server and subscription handling logic, which requires a good understanding of asynchronous programming and error handling in Rust.

🧪 Relevant tests

No

⚡ Possible issues

Possible Bug: The use of let _ = ... in multiple places might suppress important errors that should be handled or logged. This could lead to silent failures in the system.

🔒 Security concerns

No

Code feedback:
relevant filesrc/eth/rpc/rpc_server.rs
suggestion      

Consider handling or logging the result of spawn_subscriptions_cleaner() instead of ignoring it with let _ = .... This could be important for debugging issues related to subscription cleanup. [important]

relevant linelet _ = Arc::clone(&subs).spawn_subscriptions_cleaner();

relevant filesrc/eth/rpc/rpc_server.rs
suggestion      

Instead of using let _ = join!(...), consider handling the results of the joined futures to ensure that any errors or important information are not missed. [important]

relevant linelet _ = join!(handle_rpc_server.stopped(), handle_logs_notifier, handle_new_heads_notifier);

relevant filesrc/eth/rpc/rpc_subscriptions.rs
suggestion      

Replace the pattern matching with if let to handle the Err case explicitly, which improves readability and error handling. [medium]

relevant linelet Ok(block) = rx.recv().await else {

relevant filesrc/eth/rpc/rpc_subscriptions.rs
suggestion      

Consider adding error handling for the notify function within the loops to handle potential failures when notifying subscribers. [medium]

relevant linenotify(sub, msg.clone()).await;

Copy link

PR Code Suggestions ✨

CategorySuggestions                                                                                                                                                       
Enhancement
Improve error handling by logging errors from spawning subscription cleaners.

Replace the use of let _ = with a proper error handling or logging mechanism. Ignoring the
result of Arc::clone(&subs).spawn_subscriptions_cleaner() may lead to unhandled errors or
unexpected behavior in the RPC server.

src/eth/rpc/rpc_server.rs [60]

-let _ = Arc::clone(&subs).spawn_subscriptions_cleaner();
+if let Err(e) = Arc::clone(&subs).spawn_subscriptions_cleaner() {
+    tracing::error!("Error spawning subscription cleaner: {:?}", e);
+}
 
Enhance logging by adding context about the receiver when the channel is closed.

Use structured logging to include more context about the error when the transaction logs
channel is closed.

src/eth/rpc/rpc_subscriptions.rs [85]

-tracing::warn!("stopping subscription logs notifier because tx channel was closed");
+tracing::warn!("Stopping subscription logs notifier because tx channel was closed. Receiver: {:?}", rx);
 
Best practice
Replace join! with tokio::select! for better asynchronous task management.

Consider using tokio::select! instead of join! for handling asynchronous tasks to allow
finer control over task cancellation and error handling.

src/eth/rpc/rpc_server.rs [98]

-let _ = join!(handle_rpc_server.stopped(), handle_logs_notifier, handle_new_heads_notifier);
+tokio::select! {
+    _ = handle_rpc_server.stopped() => tracing::info!("RPC server stopped"),
+    _ = handle_logs_notifier => tracing::info!("Logs notifier stopped"),
+    _ = handle_new_heads_notifier => tracing::info!("New heads notifier stopped"),
+}
 
Add error handling for subscriber notifications to manage failures gracefully.

Consider adding error handling for the notify function within the loops to handle
potential failures during notification.

src/eth/rpc/rpc_subscriptions.rs [73]

-notify(sub, msg.clone()).await;
+if let Err(e) = notify(sub, msg.clone()).await {
+    tracing::error!("Failed to notify subscriber: {:?}, Error: {:?}", sub, e);
+}
 
Maintainability
Simplify the loop for receiving broadcast messages using while let.

Replace the manual if let pattern matching with a more concise while let loop for
receiving messages from the broadcast channel.

src/eth/rpc/rpc_subscriptions.rs [65-68]

-let Ok(block) = rx.recv().await else {
-    tracing::warn!("stopping subscription blocks notifier because tx channel was closed");
-    break;
-};
+while let Ok(block) = rx.recv().await {
+    let msg = SubscriptionMessage::from(block.header);
+    let new_heads_subs = self.new_heads.read().await;
+    for sub in new_heads_subs.values() {
+        notify(sub, msg.clone()).await;
+    }
+}
+tracing::warn!("stopping subscription blocks notifier because tx channel was closed");
 

@dinhani-cw dinhani-cw merged commit 5380677 into main May 10, 2024
25 checks passed
@dinhani-cw dinhani-cw deleted the rpc-subs-2 branch May 10, 2024 12:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant