-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix observer disposing after on_erorr/on_completed #685
Conversation
📝 WalkthroughWalkthroughThe pull request introduces several modifications across multiple files, primarily focusing on improving the management of disposables within observable operations. Key changes include the addition of assertions in test cases to validate the disposal state after observable events, updates to the Changes
Possibly related PRs
Warning Rate limit exceeded@victimsnino has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 2 minutes and 55 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
src/tests/utils/disposable_observable.hpp (1)
119-120
: LGTM! Good symmetry with error case testing.These assertions properly verify disposal behavior after
on_completed
, maintaining consistency with the error case testing.Consider extracting the common disposal verification logic into a helper function to reduce duplication between the error and completion cases:
+void verify_disposal(const auto& observer, const auto& disposable) { + CHECK(observer.is_disposed()); + CHECK(disposable.is_disposed()); +}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
src/tests/utils/disposable_observable.hpp
(2 hunks)
🔇 Additional comments (1)
src/tests/utils/disposable_observable.hpp (1)
108-109
: LGTM! Valuable assertions for disposal verification.
These assertions effectively validate that both the observer and its disposable are properly disposed after on_error
, which is crucial for resource cleanup.
Let's verify if this disposal pattern is consistently implemented across the codebase:
✅ Verification successful
The previous ast-grep pattern might have been too specific. Let's search for disposal-related patterns in test files to verify the consistency of disposal checks across error and completion scenarios.
Disposal behavior after on_error/on_completed is consistently implemented
The search results confirm that the disposal behavior after on_error
and on_completed
is consistently implemented across the codebase. Key findings:
- In
test_observers.cpp
, observers are properly disposed after bothon_error
andon_completed
- In
test_disposables.cpp
, disposal propagation is correctly tested for composite disposables - In
test_group_by.cpp
,test_merge.cpp
, and other operator tests, disposal is verified after terminal events - The new assertions in
disposable_observable.hpp
align with the existing disposal patterns
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for similar error handling patterns to ensure consistent disposal behavior
ast-grep --pattern 'on_error($_) {
$$$
CHECK($_.is_disposed())
$$$
}'
Length of output: 77
Script:
#!/bin/bash
# Let's try a broader search for disposal patterns in test files
rg -l "is_disposed|on_error|on_completed" --type cpp src/tests/
# Then examine the content of test files that might contain disposal checks
rg "CHECK.*is_disposed|REQUIRE.*is_disposed" --type cpp src/tests/ -A 2 -B 2
Length of output: 55485
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (3)
src/rpp/rpp/operators/delay.hpp (2)
101-104
: Consider refactoring disposal logic to reduce duplication.The disposal sequence in
on_completed
duplicates the pattern fromon_error
. Consider extracting this common pattern into a private helper method to ensure consistent disposal behavior.private: + void dispose() const noexcept + { + disposable->clear(); + locally_disposed = true; + }Then use it in both methods:
- disposable->clear(); - locally_disposed = true; + dispose();
Line range hint
74-104
: Review impact on reactive stream completion guarantees.The addition of local disposal state affects how the observer handles stream completion. Please verify that this change:
- Maintains proper cleanup of resources in all termination scenarios
- Preserves the reactive streams specification's completion guarantees
- Doesn't introduce any potential memory leaks when observers are disposed
Consider documenting these guarantees in the class documentation to make the behavior explicit for users of this class.
src/rpp/rpp/operators/merge.hpp (1)
Line range hint
70-99
: Consider documenting disposal behavior and thread safety.The disposal implementation handles complex scenarios including local disposal after terminal events and thread safety considerations. Consider adding documentation to:
- Explain the disposal behavior in terminal events (
on_error
/on_completed
)- Document thread safety considerations for
m_locally_disposed
- Add a note about the ordering requirements (e.g., why disposal happens after forwarding events)
This will help maintainers understand the implementation details and constraints.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
src/rpp/rpp/operators/delay.hpp
(3 hunks)src/rpp/rpp/operators/merge.hpp
(2 hunks)
🔇 Additional comments (6)
src/rpp/rpp/operators/delay.hpp (3)
74-74
: LGTM! Good design choice for state tracking.
The addition of locally_disposed
as a mutable member allows for independent tracking of the observer's disposal state, separate from the disposable's state. The mutable qualifier is correctly used here as the disposal state needs to be modified in const methods.
83-83
: LGTM! Comprehensive disposal state check.
The updated is_disposed()
method now correctly considers both the local disposal state and the disposable's state, providing a more complete check of the observer's disposal status.
94-97
: Verify thread safety of disposal sequence.
While the disposal sequence is logically correct (clear disposable then set local state), please verify thread safety in concurrent scenarios. Consider whether additional synchronization is needed around the disposal state changes.
✅ Verification successful
Let me gather more context about the disposal implementation to better understand the thread safety.
Let me try a different approach to understand the class structure.
Let me try one final approach to understand the synchronization context.
Thread safety is properly implemented in the disposal sequence
The code is thread-safe due to the following synchronization mechanisms:
- The
delay_disposable
contains a mutex that guards all shared state including the queue andis_active
flag - The
emplace_safe
method usesstd::lock_guard
to protect the disposal sequence - The
locally_disposed
flag is only accessed from within the observer's methods which are guaranteed to not be called concurrently by Rx contract - The disposal sequence in
on_error
is correct as:emplace(err)
is protected by mutex inemplace_safe
disposable->clear()
is thread-safe as percomposite_disposable_impl
- Setting
locally_disposed
is safe as it's observer-local state
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any concurrent access patterns or synchronization mechanisms
rg -A 5 "std::mutex|std::lock_guard|std::unique_lock" src/rpp/rpp/operators/delay.hpp
Length of output: 979
Script:
#!/bin/bash
# Search for the complete class implementation to understand member access patterns
ast-grep --pattern 'class delay_observer {
$$$
}'
Length of output: 53
Script:
#!/bin/bash
# Search for the delay observer implementation with ripgrep
rg -A 30 "class delay_observer" src/rpp/rpp/operators/delay.hpp
Length of output: 65
Script:
#!/bin/bash
# Look for the complete file content to understand the synchronization
cat src/rpp/rpp/operators/delay.hpp
Length of output: 9503
src/rpp/rpp/operators/merge.hpp (3)
70-70
: LGTM! Thread-safe disposal check enhancement.
The additional check for m_locally_disposed
correctly prevents further emissions after terminal events. The implementation is thread-safe as the flag is marked mutable for use in const methods.
76-76
: LGTM! Correct disposal after error propagation.
The local disposal flag is set after forwarding the error, which is the correct order to ensure proper error propagation before preventing further emissions.
93-93
: LGTM! Proper disposal after completion cleanup.
The local disposal flag is set after all completion logic (counter decrement, completion forwarding, and disposable cleanup), which ensures proper cleanup sequence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
src/rpp/rpp/operators/merge.hpp (2)
93-93
: LGTM: Completion handling is properly implementedThe local disposal state is correctly set after completion logic. Consider adding tests to verify that no events are processed after completion.
Would you like me to help generate test cases to verify the completion behavior?
99-99
: Add documentation for the new member variableConsider adding a brief comment explaining the purpose of
m_locally_disposed
and its relationship with the observable contract.Example documentation:
/// Tracks whether this observer has been disposed locally through on_error or on_completed. /// This is distinct from the parent disposable's state and ensures no events are processed /// after terminal operations. mutable bool m_locally_disposed{};
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
src/rpp/rpp/operators/merge.hpp
(2 hunks)
🔇 Additional comments (2)
src/rpp/rpp/operators/merge.hpp (2)
70-70
: LGTM: Disposal state check is properly implemented
The modification correctly combines local and parent disposal states using short-circuit evaluation.
76-76
: Verify error handling behavior
The local disposal state is correctly set after forwarding the error. Let's verify that no more events are processed after an error.
✅ Verification successful
Setting local disposal state after error is verified as correct
The test cases and implementation confirm that:
- The
m_locally_disposed
flag is set after forwarding the error inon_error()
method - Test case "observable of observables with error" verifies that no more events are processed after error
- The implementation ensures thread-safety through atomic operations and mutex-protected observer access
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for test cases verifying that no events are processed after on_error
rg -l "on_error.*no.*events" test/
Length of output: 82
Script:
#!/bin/bash
# Search for test files related to merge operator
fd -e cpp -e hpp merge test
# Search for test cases or comments mentioning error handling in merge operator
rg -i "merge.*error|error.*merge" --type cpp --type hpp -A 5
Length of output: 212
Script:
#!/bin/bash
# Find all test files in the repository
fd -e cpp -e h -e hpp
# Search for test cases or comments mentioning error handling in merge operator
rg -i "merge.*error|error.*merge" -t cpp -t c++ -A 5
Length of output: 9594
Script:
#!/bin/bash
# Look at merge operator test file content
cat src/tests/rpp/test_merge.cpp
# Look at merge operator implementation
cat src/rpp/rpp/operators/merge.hpp
Length of output: 23199
I am sorry to have no solution but as an observer (sic!), I wonder why any RX C++ implementation ends in a mess of checking bools and (sub-)states like hell. Is this embedded in the problem domain? Is there no way to escape this? I mean, it is a paradox: a functional-style monad implementation is built out of state machines. |
You may remember me ranting about |
it is interesting point =D Yeah, anyway you have to check for some states, it is pretty natural - you are hiding state from user, but state should be handled somehow. Regarding this one: it appears just from optimization due to removing exact disposable state inside observer and changing it to lihtweight bool + external state. Anyway, yeah, i'm going to rewrite it a bit, just was doublechecking CI is ok |
BENCHMARK RESULTS (AUTOGENERATED)
|
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
Subscribe empty callbacks to empty observable | 305.15 ns | 1.85 ns | 1.85 ns | 1.00 |
Subscribe empty callbacks to empty observable via pipe operator | 306.78 ns | 1.85 ns | 1.85 ns | 1.00 |
Sources
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 686.48 ns | 0.31 ns | 0.31 ns | 1.00 |
from array of 1 - create + subscribe + current_thread | 1034.61 ns | 3.42 ns | 3.42 ns | 1.00 |
concat_as_source of just(1 immediate) create + subscribe | 2228.84 ns | 111.84 ns | 112.15 ns | 1.00 |
defer from array of 1 - defer + create + subscribe + immediate | 731.82 ns | 0.31 ns | 0.31 ns | 1.00 |
interval - interval + take(3) + subscribe + immediate | 2183.14 ns | 59.23 ns | 59.27 ns | 1.00 |
interval - interval + take(3) + subscribe + current_thread | 3050.02 ns | 32.42 ns | 32.43 ns | 1.00 |
from array of 1 - create + as_blocking + subscribe + new_thread | 28121.94 ns | 27758.52 ns | 28186.83 ns | 0.98 |
from array of 1000 - create + as_blocking + subscribe + new_thread | 36988.69 ns | 51771.25 ns | 52020.32 ns | 1.00 |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3476.25 ns | 132.65 ns | 138.66 ns | 0.96 |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take(1)+subscribe | 1089.66 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+filter(true)+subscribe | 840.38 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2)+skip(1)+subscribe | 1009.09 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 970.28 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2)+first()+subscribe | 1259.38 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2)+last()+subscribe | 907.35 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+take_last(1)+subscribe | 1136.50 ns | 18.53 ns | 18.20 ns | 1.02 |
immediate_just(1,2,3)+element_at(1)+subscribe | 829.48 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate scheduler create worker + schedule | 260.01 ns | 1.54 ns | 1.54 ns | 1.00 |
current_thread scheduler create worker + schedule | 373.76 ns | 4.63 ns | 4.94 ns | 0.94 |
current_thread scheduler create worker + schedule + recursive schedule | 834.27 ns | 61.09 ns | 60.82 ns | 1.00 |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 845.72 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+scan(10, std::plus)+subscribe | 891.17 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 2321.97 ns | 145.10 ns | 146.01 ns | 0.99 |
immediate_just+buffer(2)+subscribe | 1544.25 ns | 14.21 ns | 13.90 ns | 1.02 |
immediate_just+window(2)+subscribe + subscsribe inner | 2456.60 ns | 1534.45 ns | 1342.30 ns | 1.14 |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 819.18 ns | - | - | 0.00 |
immediate_just+take_while(true)+subscribe | 830.51 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 1981.36 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3445.05 ns | 170.48 ns | 153.57 ns | 1.11 |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3671.93 ns | 159.62 ns | 156.44 ns | 1.02 |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 143.53 ns | 134.03 ns | 1.07 |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3568.21 ns | 402.21 ns | 461.59 ns | 0.87 |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 2096.92 ns | 217.15 ns | 215.53 ns | 1.01 |
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 3106.99 ns | 227.16 ns | 233.47 ns | 0.97 |
Subjects
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
publish_subject with 1 observer - on_next | 34.57 ns | 34.50 ns | 14.67 ns | 2.35 |
subscribe 100 observers to publish_subject | 201291.00 ns | 15969.17 ns | 16175.73 ns | 0.99 |
100 on_next to 100 observers to publish_subject | 27061.61 ns | 17282.63 ns | 17464.32 ns | 0.99 |
Scenarios
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
basic sample | 1409.05 ns | 12.96 ns | 13.28 ns | 0.98 |
basic sample with immediate scheduler | 1371.64 ns | 5.55 ns | 5.55 ns | 1.00 |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 912.15 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2079.46 ns | 986.14 ns | 1004.57 ns | 0.98 |
create(on_error())+retry(1)+subscribe | 610.48 ns | 111.51 ns | 117.97 ns | 0.95 |
ci-macos
General
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
Subscribe empty callbacks to empty observable | 1141.20 ns | 0.80 ns | 0.78 ns | 1.02 |
Subscribe empty callbacks to empty observable via pipe operator | 1174.48 ns | 0.86 ns | 0.75 ns | 1.15 |
Sources
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 1928.00 ns | 0.25 ns | 0.25 ns | 0.98 |
from array of 1 - create + subscribe + current_thread | 2420.48 ns | 35.67 ns | 35.45 ns | 1.01 |
concat_as_source of just(1 immediate) create + subscribe | 5392.91 ns | 331.72 ns | 379.33 ns | 0.87 |
defer from array of 1 - defer + create + subscribe + immediate | 2093.66 ns | 0.26 ns | 0.23 ns | 1.10 |
interval - interval + take(3) + subscribe + immediate | 4911.92 ns | 118.31 ns | 113.28 ns | 1.04 |
interval - interval + take(3) + subscribe + current_thread | 5990.57 ns | 98.46 ns | 99.47 ns | 0.99 |
from array of 1 - create + as_blocking + subscribe + new_thread | 82881.00 ns | 86748.15 ns | 89639.80 ns | 0.97 |
from array of 1000 - create + as_blocking + subscribe + new_thread | 103127.30 ns | 93610.36 ns | 95092.70 ns | 0.98 |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 8502.85 ns | 366.14 ns | 365.99 ns | 1.00 |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take(1)+subscribe | 4748.17 ns | 0.34 ns | 0.23 ns | 1.44 |
immediate_just+filter(true)+subscribe | 23834.96 ns | 1.11 ns | 0.23 ns | 4.76 |
immediate_just(1,2)+skip(1)+subscribe | 2833.37 ns | 0.23 ns | 0.23 ns | 1.00 |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 2316.07 ns | 0.49 ns | 0.48 ns | 1.02 |
immediate_just(1,2)+first()+subscribe | 3832.73 ns | 2.58 ns | 0.23 ns | 11.05 |
immediate_just(1,2)+last()+subscribe | 2548.69 ns | 0.30 ns | 0.23 ns | 1.27 |
immediate_just+take_last(1)+subscribe | 5745.12 ns | 1.00 ns | 0.23 ns | 4.29 |
immediate_just(1,2,3)+element_at(1)+subscribe | 2568.97 ns | 0.26 ns | 0.23 ns | 1.12 |
Schedulers
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate scheduler create worker + schedule | 893.84 ns | 1.00 ns | 0.94 ns | 1.07 |
current_thread scheduler create worker + schedule | 1188.11 ns | 35.08 ns | 34.23 ns | 1.02 |
current_thread scheduler create worker + schedule + recursive schedule | 2302.14 ns | 205.18 ns | 202.88 ns | 1.01 |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 2625.93 ns | 4.97 ns | 4.20 ns | 1.18 |
immediate_just+scan(10, std::plus)+subscribe | 3040.95 ns | 0.62 ns | 0.47 ns | 1.32 |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 8267.39 ns | 563.13 ns | 375.74 ns | 1.50 |
immediate_just+buffer(2)+subscribe | 3810.97 ns | 98.94 ns | 64.65 ns | 1.53 |
immediate_just+window(2)+subscribe + subscsribe inner | 7734.23 ns | 3512.23 ns | 2387.90 ns | 1.47 |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 2486.79 ns | - | - | 0.00 |
immediate_just+take_while(true)+subscribe | 2321.04 ns | 0.24 ns | 0.23 ns | 1.02 |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 5613.09 ns | 5.75 ns | 4.68 ns | 1.23 |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 7954.07 ns | 466.04 ns | 412.06 ns | 1.13 |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 8797.08 ns | 425.75 ns | 414.87 ns | 1.03 |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 471.45 ns | 447.72 ns | 1.05 |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 14797.33 ns | 5531.84 ns | 943.75 ns | 5.86 |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 7517.43 ns | 1044.13 ns | 788.56 ns | 1.32 |
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 7926.12 ns | 682.42 ns | 640.50 ns | 1.07 |
Subjects
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
publish_subject with 1 observer - on_next | 90.81 ns | 59.06 ns | 48.66 ns | 1.21 |
subscribe 100 observers to publish_subject | 423912.00 ns | 49066.12 ns | 40555.96 ns | 1.21 |
100 on_next to 100 observers to publish_subject | 443190.00 ns | 24909.24 ns | 24922.40 ns | 1.00 |
Scenarios
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
basic sample | 20163.54 ns | 642.17 ns | 76.32 ns | 8.41 |
basic sample with immediate scheduler | 10023.41 ns | 85.47 ns | 18.71 ns | 4.57 |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 3198.04 ns | 0.31 ns | 0.23 ns | 1.31 |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 8157.42 ns | 5203.07 ns | 4347.30 ns | 1.20 |
create(on_error())+retry(1)+subscribe | 2202.82 ns | 347.39 ns | 284.21 ns | 1.22 |
ci-ubuntu-clang
General
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
Subscribe empty callbacks to empty observable | 267.51 ns | 1.54 ns | 1.54 ns | 1.00 |
Subscribe empty callbacks to empty observable via pipe operator | 271.17 ns | 1.54 ns | 1.54 ns | 1.00 |
Sources
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 563.97 ns | 0.31 ns | 0.31 ns | 1.00 |
from array of 1 - create + subscribe + current_thread | 792.29 ns | 4.01 ns | 4.01 ns | 1.00 |
concat_as_source of just(1 immediate) create + subscribe | 2374.89 ns | 129.98 ns | 129.67 ns | 1.00 |
defer from array of 1 - defer + create + subscribe + immediate | 781.91 ns | 0.31 ns | 0.31 ns | 1.00 |
interval - interval + take(3) + subscribe + immediate | 2205.37 ns | 58.26 ns | 58.26 ns | 1.00 |
interval - interval + take(3) + subscribe + current_thread | 3174.57 ns | 30.91 ns | 30.86 ns | 1.00 |
from array of 1 - create + as_blocking + subscribe + new_thread | 30257.48 ns | 28119.22 ns | 27745.02 ns | 1.01 |
from array of 1000 - create + as_blocking + subscribe + new_thread | 38892.53 ns | 33076.83 ns | 36026.62 ns | 0.92 |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3680.87 ns | 148.44 ns | 148.30 ns | 1.00 |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take(1)+subscribe | 1142.89 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+filter(true)+subscribe | 840.80 ns | 0.31 ns | 0.31 ns | 1.01 |
immediate_just(1,2)+skip(1)+subscribe | 1082.38 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 878.77 ns | 0.31 ns | 0.62 ns | 0.50 |
immediate_just(1,2)+first()+subscribe | 1356.69 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2)+last()+subscribe | 1006.28 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+take_last(1)+subscribe | 1177.11 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just(1,2,3)+element_at(1)+subscribe | 859.23 ns | 0.31 ns | 0.31 ns | 1.00 |
Schedulers
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate scheduler create worker + schedule | 278.54 ns | 0.63 ns | 0.63 ns | 1.00 |
current_thread scheduler create worker + schedule | 395.95 ns | 4.01 ns | 4.02 ns | 1.00 |
current_thread scheduler create worker + schedule + recursive schedule | 848.90 ns | 55.82 ns | 56.24 ns | 0.99 |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 842.66 ns | 0.31 ns | 0.31 ns | 1.00 |
immediate_just+scan(10, std::plus)+subscribe | 954.13 ns | 0.62 ns | 0.31 ns | 2.00 |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 2228.58 ns | 138.25 ns | 140.26 ns | 0.99 |
immediate_just+buffer(2)+subscribe | 1530.67 ns | 14.19 ns | 13.89 ns | 1.02 |
immediate_just+window(2)+subscribe + subscsribe inner | 2479.47 ns | 910.57 ns | 921.07 ns | 0.99 |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 833.75 ns | - | - | 0.00 |
immediate_just+take_while(true)+subscribe | 843.17 ns | 0.31 ns | 0.31 ns | 1.00 |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 2007.25 ns | 0.31 ns | 0.31 ns | 1.00 |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3304.45 ns | 160.81 ns | 156.07 ns | 1.03 |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3726.21 ns | 139.15 ns | 139.09 ns | 1.00 |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 138.89 ns | 142.89 ns | 0.97 |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3395.67 ns | 376.86 ns | 377.27 ns | 1.00 |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 2272.12 ns | 203.97 ns | 199.42 ns | 1.02 |
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 3275.75 ns | 220.95 ns | 224.21 ns | 0.99 |
Subjects
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
publish_subject with 1 observer - on_next | 54.37 ns | 17.55 ns | 17.48 ns | 1.00 |
subscribe 100 observers to publish_subject | 213306.00 ns | 16015.58 ns | 16096.95 ns | 0.99 |
100 on_next to 100 observers to publish_subject | 42875.83 ns | 23436.70 ns | 23580.12 ns | 0.99 |
Scenarios
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
basic sample | 1303.83 ns | 10.80 ns | 11.42 ns | 0.95 |
basic sample with immediate scheduler | 1318.36 ns | 6.17 ns | 5.86 ns | 1.05 |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 993.19 ns | 0.31 ns | 0.31 ns | 1.00 |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2186.40 ns | 1168.99 ns | 1161.48 ns | 1.01 |
create(on_error())+retry(1)+subscribe | 655.87 ns | 138.67 ns | 139.62 ns | 0.99 |
ci-windows
General
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
Subscribe empty callbacks to empty observable | 560.72 ns | 1.85 ns | 2.16 ns | 0.86 |
Subscribe empty callbacks to empty observable via pipe operator | 578.71 ns | 1.85 ns | 2.16 ns | 0.86 |
Sources
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 1159.86 ns | 5.24 ns | 5.55 ns | 0.94 |
from array of 1 - create + subscribe + current_thread | 1452.95 ns | 15.45 ns | 15.51 ns | 1.00 |
concat_as_source of just(1 immediate) create + subscribe | 3709.42 ns | 174.94 ns | 164.88 ns | 1.06 |
defer from array of 1 - defer + create + subscribe + immediate | 1193.62 ns | 5.24 ns | 5.55 ns | 0.95 |
interval - interval + take(3) + subscribe + immediate | 3354.31 ns | 140.83 ns | 142.69 ns | 0.99 |
interval - interval + take(3) + subscribe + current_thread | 3392.05 ns | 60.16 ns | 60.15 ns | 1.00 |
from array of 1 - create + as_blocking + subscribe + new_thread | 118944.44 ns | 114340.00 ns | 113250.00 ns | 1.01 |
from array of 1000 - create + as_blocking + subscribe + new_thread | 126311.11 ns | 130912.50 ns | 131050.00 ns | 1.00 |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 5372.25 ns | 204.68 ns | 196.99 ns | 1.04 |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take(1)+subscribe | 1822.14 ns | 19.74 ns | 19.42 ns | 1.02 |
immediate_just+filter(true)+subscribe | 1639.94 ns | 18.82 ns | 18.51 ns | 1.02 |
immediate_just(1,2)+skip(1)+subscribe | 1736.51 ns | 18.50 ns | 17.89 ns | 1.03 |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 1356.23 ns | 23.46 ns | 20.69 ns | 1.13 |
immediate_just(1,2)+first()+subscribe | 2375.05 ns | 17.29 ns | 18.20 ns | 0.95 |
immediate_just(1,2)+last()+subscribe | 1511.44 ns | 18.52 ns | 19.12 ns | 0.97 |
immediate_just+take_last(1)+subscribe | 2027.57 ns | 68.35 ns | 64.50 ns | 1.06 |
immediate_just(1,2,3)+element_at(1)+subscribe | 1646.78 ns | 21.91 ns | 20.97 ns | 1.04 |
Schedulers
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate scheduler create worker + schedule | 475.53 ns | 4.32 ns | 4.32 ns | 1.00 |
current_thread scheduler create worker + schedule | 654.17 ns | 11.68 ns | 11.16 ns | 1.05 |
current_thread scheduler create worker + schedule + recursive schedule | 1330.08 ns | 96.88 ns | 98.98 ns | 0.98 |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 1335.21 ns | 18.82 ns | 18.82 ns | 1.00 |
immediate_just+scan(10, std::plus)+subscribe | 1439.37 ns | 20.96 ns | 20.96 ns | 1.00 |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 3866.79 ns | 191.05 ns | 182.32 ns | 1.05 |
immediate_just+buffer(2)+subscribe | 2322.80 ns | 62.95 ns | 63.67 ns | 0.99 |
immediate_just+window(2)+subscribe + subscsribe inner | 4042.69 ns | 1325.03 ns | 1301.24 ns | 1.02 |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 1317.19 ns | 17.58 ns | 17.57 ns | 1.00 |
immediate_just+take_while(true)+subscribe | 1331.56 ns | 18.82 ns | 18.50 ns | 1.02 |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 3603.13 ns | 11.10 ns | 11.74 ns | 0.95 |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 5339.91 ns | 204.57 ns | 202.43 ns | 1.01 |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 5459.46 ns | 188.06 ns | 185.79 ns | 1.01 |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 191.35 ns | 195.28 ns | 0.98 |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 6187.79 ns | 442.50 ns | 439.04 ns | 1.01 |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 3854.55 ns | 528.18 ns | 523.70 ns | 1.01 |
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 4941.01 ns | 332.07 ns | 319.77 ns | 1.04 |
Subjects
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
publish_subject with 1 observer - on_next | 36.16 ns | 20.68 ns | 20.14 ns | 1.03 |
subscribe 100 observers to publish_subject | 261375.00 ns | 28054.05 ns | 27892.86 ns | 1.01 |
100 on_next to 100 observers to publish_subject | 51777.27 ns | 36125.00 ns | 32628.57 ns | 1.11 |
Scenarios
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
basic sample | 1882.98 ns | 95.99 ns | 95.11 ns | 1.01 |
basic sample with immediate scheduler | 1908.68 ns | 69.12 ns | 68.81 ns | 1.00 |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 1455.68 ns | 19.42 ns | 19.13 ns | 1.02 |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio |
---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2037.43 ns | 362.17 ns | 358.64 ns | 1.01 |
create(on_error())+retry(1)+subscribe | 1546.81 ns | 139.29 ns | 136.26 ns | 1.02 |
My problem is that the code does not hide the payload state that travels down the queue from the user; it is the extra janitor stuff that needs stateful expressions. This will get messy when we enter multi-threaded environments. |
What happens if you try not to use bools but the lifetime of objects? |
updated a bit to be a bit more clear, was just first version to be sure fix is valid |
Not sure if i get your point |
It is correct question and it is why bools exists. Correct answer is "same". These bools are just optimization to NOT have unnecessary lifetime objects. But it is hidden from users |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (7)
src/rpp/rpp/observers/details/disposables_strategy.hpp (2)
62-62
: Document the emptyadd
method's purposeThe empty
add
method's purpose isn't immediately clear. Consider adding a comment explaining why disposables are intentionally ignored.- static constexpr void add(const rpp::disposable_wrapper&) {} + // Intentionally ignores disposables as this strategy only tracks disposal state + static constexpr void add(const rpp::disposable_wrapper&) {}
64-66
: Consider making disposal state irreversibleOnce disposed, the state cannot be undisposed. Consider making this explicit by marking
dispose()
as[[nodiscard]]
and returning the previous state.bool is_disposed() const noexcept { return m_is_disposed; } - void dispose() const { m_is_disposed = true; } + [[nodiscard]] bool dispose() const { + bool was_disposed = m_is_disposed; + m_is_disposed = true; + return was_disposed; + }src/rpp/rpp/observers/details/fwd.hpp (2)
28-30
: LGTM! Consider documenting thread-safety guarantees.The new
Boolean
mode is well-documented and provides a simpler way to manage disposal state. However, given the concerns raised in PR comments about multi-threading, consider documenting the thread-safety guarantees of this mode.
52-55
: Enhance documentation with implementation requirements.While the purpose is documented, consider adding:
- Thread-safety guarantees
- Memory ordering requirements for the boolean state
- Interaction with observer strategy
src/rpp/rpp/operators/retry.hpp (1)
80-80
: Simplified disposal check improves state managementThe simplified
is_disposed()
implementation removes local state tracking and delegates to the composite state object, which:
- Reduces state management complexity
- Centralizes disposal state
- Improves thread-safety by removing a potential race condition between local and shared state
This change aligns well with daixtrose's suggestion about simplifying state management.
Consider documenting the thread-safety guarantees of
state->is_disposed()
in the header comments, as this is now the single source of truth for disposal state.src/rpp/rpp/operators/concat.hpp (1)
Line range hint
118-148
: Consider further simplifying state managementWhile the current changes improve disposal state management, there's potential for further simplification:
- The interaction between
ConcatStage
and disposal states could be consolidated.- Consider using RAII or scope guards to manage state transitions.
- Document the thread-safety guarantees for state transitions.
This could help address the concerns about state management complexity in multi-threaded environments while maintaining correctness.
src/rpp/rpp/sources/concat.hpp (1)
70-70
: LGTM! Consider adding documentationThe simplified disposal check improves maintainability by centralizing the disposal state in the composite_disposable base class. This addresses the concern about multiple boolean states raised in the PR discussion.
Consider adding a brief comment explaining the rationale behind delegating to state's disposal status, as this is a key architectural decision for state management.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (7)
src/rpp/rpp/observers/details/disposables_strategy.hpp
(1 hunks)src/rpp/rpp/observers/details/fwd.hpp
(3 hunks)src/rpp/rpp/operators/concat.hpp
(2 hunks)src/rpp/rpp/operators/delay.hpp
(1 hunks)src/rpp/rpp/operators/merge.hpp
(1 hunks)src/rpp/rpp/operators/retry.hpp
(2 hunks)src/rpp/rpp/sources/concat.hpp
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- src/rpp/rpp/operators/delay.hpp
- src/rpp/rpp/operators/merge.hpp
🧰 Additional context used
📓 Learnings (2)
src/rpp/rpp/observers/details/disposables_strategy.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#667
File: src/rpp/rpp/observers/details/disposable_strategy.hpp:37-37
Timestamp: 2024-11-10T04:25:44.618Z
Learning: In `local_disposable_strategy`, `m_is_disposed` is safely used as a plain `bool` because the class is intended for single-threaded use or is protected by synchronization mechanisms.
src/rpp/rpp/operators/concat.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#673
File: src/rpp/rpp/operators/concat.hpp:59-59
Timestamp: 2024-11-10T04:25:44.618Z
Learning: In the `concat_disposable` class in `src/rpp/rpp/operators/concat.hpp`, within the `drain` method, it's acceptable to call `get_observer()->on_completed()` even when `is_disposed()` returns true, because `refcounted.dispose()` could cause disposing of `this` object.
🔇 Additional comments (7)
src/rpp/rpp/observers/details/disposables_strategy.hpp (1)
58-70
: Consider thread safety implications of mutable state
The new boolean_disposables_strategy
uses a mutable boolean flag without synchronization, which could lead to race conditions in multi-threaded scenarios. While this might be intentional for single-threaded use (similar to local_disposables_strategy
), it should be explicitly documented.
Consider one of these approaches:
- Add documentation explicitly stating the thread-safety guarantees
- Use
std::atomic<bool>
if thread-safety is required - Add a static assertion or compile-time check to prevent usage in multi-threaded contexts
Example implementation with atomic:
private:
- mutable bool m_is_disposed{};
+ mutable std::atomic<bool> m_is_disposed{false};
Let's check if this class is used in any multi-threaded contexts:
src/rpp/rpp/observers/details/fwd.hpp (1)
75-75
: Verify enum value handling completeness.
The deduction logic looks correct. Let's verify that all enum values are properly handled.
Also applies to: 83-84
✅ Verification successful
Let me search for the enum definition since it wasn't found in the first attempt.
All enum values are properly handled in the deduction logic
The verification confirms that all disposables_mode
enum values (Auto
, None
, External
, and Boolean
) have corresponding deduction cases in the if-constexpr chain, and the static_assert correctly validates all possible enum values. The implementation is complete and correct.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that all enum values are handled in deduction logic
# Expected: All enum values should have corresponding deduction cases
# Search for enum definition and deduction cases
echo "Enum values:"
rg "enum class disposables_mode.*\{" -A 10
echo -e "\nDeduction cases:"
rg "if constexpr \(mode == disposables_mode::" -A 1
Length of output: 1041
Script:
#!/bin/bash
# Search for enum definition with a broader pattern
echo "Searching for enum definition:"
rg "enum.*disposables_mode" -A 10
# Also search in header files specifically
echo -e "\nSearching in hpp files:"
fd -e hpp -x rg "enum.*disposables_mode" -A 10 {}
Length of output: 1495
src/rpp/rpp/operators/retry.hpp (1)
44-44
: Verify thread-safety of Boolean disposables mode
The change from None
to Boolean
disposables mode aligns with the broader refactoring of observer strategies, but we should ensure thread-safety in retry scenarios.
Let's check if other retry-related components use the same disposables mode:
✅ Verification successful
Change to Boolean disposables mode is consistent with thread-safety requirements
The codebase analysis shows that Boolean
disposables mode is used in operators that require strict thread-safety and state management, particularly in operators dealing with multiple subscriptions or timing concerns (merge, delay, concat). The retry operator's change aligns with this pattern as it also manages multiple subscription attempts.
Key findings that support thread-safety:
- Other operators with similar subscription management (merge, concat) use
Boolean
mode - Delay operator, which deals with timing and state, uses
Boolean
mode - The retry operator's state is managed through a shared state object similar to these operators
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other retry-related components and their disposables mode
rg -A 3 "preferred_disposables_mode.*None" src/
rg -A 3 "preferred_disposables_mode.*Boolean" src/
Length of output: 17335
src/rpp/rpp/operators/concat.hpp (2)
118-118
: LGTM: Consistent disposable mode change
The change from None
to Boolean
disposables mode aligns with the broader refactoring of disposal state management across observer strategies. This modification helps standardize how disposal states are tracked and managed.
148-148
: Verify disposal state handling in concat operation
The simplified disposal check relies solely on the inner child disposable's state. While this reduces state complexity, we should verify that this doesn't affect the concat operation's behavior, particularly during:
- Error scenarios
- Completion of inner observables
- Disposal during active concatenation
Let's verify the usage patterns:
✅ Verification successful
Simplified disposal check is safe and consistent with the operator's behavior
The code analysis shows the disposal state change is correct because:
- The concat operator maintains two separate composite disposables: base and inner child
- The inner observer correctly checks inner_child_disposable for its disposal state, which is used during active concatenation
- Error propagation immediately forwards to the downstream observer
- Completion handling properly manages disposal states and stage transitions
- The implementation maintains thread-safety through atomic operations and proper state management
The simplified check aligns with the operator's behavior and doesn't compromise error handling, completion, or disposal during concatenation.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for tests validating disposal behavior
rg -A 5 "TEST_CASE.*concat.*dispos" test/
# Search for potential race conditions in disposal checks
ast-grep --pattern 'if ($COND) {
$$$
is_disposed()
$$$
}'
# Search for error handling patterns
ast-grep --pattern 'on_error($ERR) {
$$$
is_disposed()
$$$
}'
Length of output: 225
Script:
#!/bin/bash
# Search for tests related to concat operator
fd -e cpp -e hpp "concat.*test" -X cat
# Look for disposal checks in concat operator implementation
rg -A 5 "is_disposed\(\)" src/rpp/rpp/operators/concat.hpp
# Find error handling in concat operator
rg -A 10 "on_error" src/rpp/rpp/operators/concat.hpp
# Look for disposal state management
rg -A 5 "disposable" src/rpp/rpp/operators/concat.hpp
Length of output: 7442
src/rpp/rpp/sources/concat.hpp (2)
Line range hint 83-86
: Review potential race conditions in drain()
The is_inside_drain
atomic flag might not fully protect against all race conditions. Consider the sequence:
- Thread A checks
!state->is_disposed()
- Thread B disposes the state
- Thread A continues execution with disposed state
Consider strengthening the synchronization mechanism here.
53-53
: Verify thread safety with Boolean disposables mode
The change from None
to Boolean
disposables mode aligns with the broader strategy changes. However, given the concerns raised in PR comments about multi-threaded environments, we should verify the thread safety of this approach.
✅ Verification successful
Based on the search results, I can now verify the thread safety implications of using Boolean
disposables mode. Let me analyze the key findings:
- The
boolean_disposables_strategy
implementation insrc/rpp/rpp/observers/details/disposables_strategy.hpp
shows:
mutable bool m_is_disposed{};
bool is_disposed() const noexcept { return m_is_disposed; }
void dispose() const { m_is_disposed = true; }
- The base disposable implementation in
src/rpp/rpp/disposables/details/base_disposable.hpp
shows proper atomic handling:
bool is_disposed() const noexcept final {
// just need atomicity, not guarding anything
return m_disposed.load(std::memory_order::seq_cst);
}
Let me check the actual usage pattern in concat source to understand the synchronization context better.
Let me check how the state is accessed and synchronized in the concat implementation.
Thread safety is properly handled with Boolean disposables mode
The implementation is thread-safe for the following reasons:
- The
concat_state_t
inherits fromcomposite_disposable
which uses atomic operations for disposal state - The state uses atomic synchronization via
is_inside_drain
withstd::memory_order::seq_cst
for drain coordination - The disposal check is used as a guard in the drain loop and properly synchronized with other operations
- The observer operations (on_next, on_error, on_completed) are protected by the disposal state check
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any synchronization mechanisms around disposal state
rg -A 3 -B 3 'is_disposed|disposables_mode::Boolean' src/rpp/rpp/
Length of output: 74508
Script:
#!/bin/bash
# Look for the state implementation and synchronization in concat source
rg -A 10 "struct concat_state_t" src/rpp/rpp/sources/concat.hpp
Length of output: 426
Script:
#!/bin/bash
# Look for state access patterns and synchronization in concat source
rg -A 5 -B 5 "is_disposed|on_next|on_error|on_completed" src/rpp/rpp/sources/concat.hpp
Length of output: 2086
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## v2 #685 +/- ##
==========================================
- Coverage 98.60% 98.60% -0.01%
==========================================
Files 155 155
Lines 9775 9774 -1
==========================================
- Hits 9639 9638 -1
Misses 136 136 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (4)
src/rpp/rpp/observers/details/fwd.hpp (2)
52-55
: Consider enhancing the documentation with thread-safety details.Given the discussions in the PR comments about state management in multi-threaded environments, it would be helpful to document any thread-safety guarantees or requirements for this strategy.
Consider expanding the documentation like this:
/** * @brief Just control is_disposed or not via boolean and ignore upstreams at all + * @thread_safety Document thread-safety guarantees here + * @note This strategy simplifies state management by avoiding upstream disposable handling */
28-30
: Consider documenting the architectural trade-offs of Boolean mode.The introduction of
Boolean
mode represents a strategic decision to simplify state management, as discussed in the PR comments. While this addresses immediate concerns about state complexity, it's important to:
- Document when to use
Boolean
vs other modes- Consider providing migration guidelines for existing code
- Address the multi-threading concerns raised in the PR discussion
Would you like me to help draft architectural documentation covering these aspects?
Also applies to: 52-55
src/benchmarks/benchmarks.cpp (2)
809-810
: Consider consolidating redundant filter operations.The benchmark contains three identical filter operations that don't perform actual filtering:
filter([](int v) -> bool { return v; })These predicates always return the input value, making them redundant. Consider either:
- Removing redundant filters if they don't serve a benchmarking purpose
- Using meaningful predicates if testing filter performance
Also applies to: 815-816, 818-819
816-816
: Consider removing unnecessary delay operation.The delay operation with 0 seconds duration might not add value to the benchmark:
delay(std::chrono::seconds{0}, rpp::schedulers::immediate{})If testing delay scheduling, consider using a non-zero duration. If not needed for testing, consider removing it to simplify the benchmark.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
.github/workflows/ci v2.yml
(3 hunks)src/benchmarks/benchmarks.cpp
(1 hunks)src/rpp/rpp/observers/details/fwd.hpp
(3 hunks)
🧰 Additional context used
🪛 GitHub Check: tests ci-ubuntu-clang Release
src/rpp/rpp/observers/details/fwd.hpp
[failure] 77-77:
'RPP_DISABLE_DISPOSABLES_OPTIMZIATION' is not defined, evaluates to 0 [clang-diagnostic-undef]
🔇 Additional comments (4)
src/rpp/rpp/observers/details/fwd.hpp (1)
28-30
: LGTM! Well-documented enum extension.
The new Boolean
mode is properly documented and follows the existing enum pattern.
.github/workflows/ci v2.yml (2)
143-143
: LGTM: Matrix strategy effectively handles optimization testing.
The addition of the optimization_disabled
matrix parameter with clear postfix naming provides good visibility into the optimization state of each test run.
Also applies to: 147-147
212-212
: LGTM: Artifact naming clearly indicates optimization state.
The artifact naming convention appropriately reflects the optimization state, maintaining consistency with the test matrix configuration.
src/benchmarks/benchmarks.cpp (1)
807-822
: LGTM! The benchmark effectively tests operator composition.
The benchmark successfully tests:
- Subject creation and subscription
- Mix of operators with and without disposables
- Operator chaining and completion handling
- Use of immediate scheduler for deterministic results
static_assert(mode == disposables_mode::Auto || mode == disposables_mode::None || mode == disposables_mode::External || mode == disposables_mode::Boolean); | ||
|
||
#if RPP_DISABLE_DISPOSABLES_OPTIMZIATION | ||
return static_cast<default_disposables_strategy*>(nullptr); | ||
#else | ||
if constexpr (mode == disposables_mode::Auto) | ||
return static_cast<default_disposables_strategy*>(nullptr); | ||
else if constexpr (mode == disposables_mode::None) | ||
return static_cast<none_disposables_strategy*>(nullptr); | ||
else if constexpr (mode == disposables_mode::External) | ||
return static_cast<composite_disposable_wrapper*>(nullptr); | ||
else if constexpr (mode == disposables_mode::Boolean) | ||
return static_cast<boolean_disposables_strategy*>(nullptr); | ||
else | ||
return static_cast<void*>(nullptr); | ||
#endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Fix typo in macro name and verify implementation
The verification revealed two issues:
- The macro name has a typo:
RPP_DISABLE_DISPOSABLES_OPTIMZIATION
(missing 'Z'). It should beRPP_DISABLE_DISPOSABLES_OPTIMIZATION
. - The
boolean_disposables_strategy
implementation anddisposables_mode::Boolean
enum are properly defined and consistently used across the codebase.
Fix needed:
-#if RPP_DISABLE_DISPOSABLES_OPTIMZIATION
+#if RPP_DISABLE_DISPOSABLES_OPTIMIZATION
🔗 Analysis chain
Fix undefined macro and verify implementation consistency.
Two issues need attention:
- The macro
RPP_DISABLE_DISPOSABLES_OPTIMZIATION
is undefined, which could lead to unexpected behavior. - The implementation should be verified for consistency with other files.
Fix the macro issue:
+#ifndef RPP_DISABLE_DISPOSABLES_OPTIMZIATION
+#define RPP_DISABLE_DISPOSABLES_OPTIMZIATION 0
+#endif
Let's verify the consistency of the boolean strategy implementation:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check if boolean_disposables_strategy is consistently used across the codebase
rg -l "preferred_disposables_mode.*=.*Boolean"
# Verify the implementation of boolean_disposables_strategy
ast-grep --pattern 'class boolean_disposables_strategy {
$$$
}'
Length of output: 276
Script:
#!/bin/bash
# Let's check the definition of boolean_disposables_strategy
rg -A 10 "class boolean_disposables_strategy"
# Check if the macro is defined elsewhere
rg "RPP_DISABLE_DISPOSABLES_OPTIMZIATION"
# Check for similar macro definitions pattern
rg "#define RPP_DISABLE"
# Let's also check the enum definition to verify Boolean mode
rg -A 5 "enum class disposables_mode"
Length of output: 2396
Script:
#!/bin/bash
# Let's see the complete enum definition to verify Boolean mode is properly defined
rg -A 10 "enum class disposables_mode"
# Check for any RPP_DISABLE related macros in header files
fd -e hpp -e h -X rg "RPP_DISABLE"
# Let's also check if there are any configuration or build header files
fd -e hpp -e h "config" -X rg "RPP_"
Length of output: 1036
🧰 Tools
🪛 GitHub Check: tests ci-ubuntu-clang Release
[failure] 77-77:
'RPP_DISABLE_DISPOSABLES_OPTIMZIATION' is not defined, evaluates to 0 [clang-diagnostic-undef]
.github/workflows/ci v2.yml
Outdated
@@ -198,7 +199,7 @@ jobs: | |||
uses: lukka/run-cmake@v10 | |||
with: | |||
configurePreset: ${{ matrix.config.name }}-${{ matrix.type }} | |||
configurePresetAdditionalArgs: "['-DCMAKE_BUILD_TYPE=${{ matrix.build_type.config }}', '-DCMAKE_CONFIGURATION_TYPES=${{ matrix.build_type.config }}']" | |||
configurePresetAdditionalArgs: "['-DCMAKE_BUILD_TYPE=${{ matrix.build_type.config }}', '-DCMAKE_CONFIGURATION_TYPES=${{ matrix.build_type.config }}', '-DRPP_DISABLE_DISPOSABLES_OPTIMZIATION=${{matrix.optimization_disabled.mode}}']" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix typo in CMake flag name.
There's a typo in the CMake flag name: OPTIMZIATION
should be OPTIMIZATION
.
- configurePresetAdditionalArgs: "['-DCMAKE_BUILD_TYPE=${{ matrix.build_type.config }}', '-DCMAKE_CONFIGURATION_TYPES=${{ matrix.build_type.config }}', '-DRPP_DISABLE_DISPOSABLES_OPTIMZIATION=${{matrix.optimization_disabled.mode}}']"
+ configurePresetAdditionalArgs: "['-DCMAKE_BUILD_TYPE=${{ matrix.build_type.config }}', '-DCMAKE_CONFIGURATION_TYPES=${{ matrix.build_type.config }}', '-DRPP_DISABLE_DISPOSABLES_OPTIMIZATION=${{matrix.optimization_disabled.mode}}']"
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
configurePresetAdditionalArgs: "['-DCMAKE_BUILD_TYPE=${{ matrix.build_type.config }}', '-DCMAKE_CONFIGURATION_TYPES=${{ matrix.build_type.config }}', '-DRPP_DISABLE_DISPOSABLES_OPTIMZIATION=${{matrix.optimization_disabled.mode}}']" | |
configurePresetAdditionalArgs: "['-DCMAKE_BUILD_TYPE=${{ matrix.build_type.config }}', '-DCMAKE_CONFIGURATION_TYPES=${{ matrix.build_type.config }}', '-DRPP_DISABLE_DISPOSABLES_OPTIMIZATION=${{matrix.optimization_disabled.mode}}']" |
💡 Codebase verification
The CMake flag RPP_DISABLE_DISPOSABLES_OPTIMZIATION
is not properly configured
The flag RPP_DISABLE_DISPOSABLES_OPTIMZIATION
is not defined or documented in any CMake files. Additionally, there appears to be a typo in the flag name ("OPTIMZIATION" instead of "OPTIMIZATION"). The flag should be properly defined as a CMake option before being used in the CI workflow.
- The flag needs to be defined in
cmake/variables.cmake
where all other RPP-related options are configured - The flag name should be corrected to use proper spelling:
RPP_DISABLE_DISPOSABLES_OPTIMIZATION
🔗 Analysis chain
Verify CMake configuration handles the optimization flag.
Ensure that the CMake configuration properly recognizes and processes the RPP_DISABLE_DISPOSABLES_OPTIMIZATION
flag.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify CMake configuration handles the optimization flag
# Check if the flag is documented in CMake files
echo "Searching for optimization flag documentation and usage in CMake files..."
rg -l "RPP_DISABLE_DISPOSABLES_OPTIMZIATION|RPP_DISABLE_DISPOSABLES_OPTIMIZATION" "CMakeLists.txt" "cmake/"
# Check for any existing optimization-related flags
echo "Checking for other optimization-related flags..."
rg -l "DISABLE.*OPTIMIZ" "CMakeLists.txt" "cmake/"
Length of output: 418
Script:
#!/bin/bash
# Let's try a broader search across all files to find any references to this optimization flag
echo "Searching for optimization flag across all files..."
rg -i "RPP_DISABLE_DISPOSABLES_OPTIMIZ"
# Also search for any CMake option definitions that might be related
echo "Searching for CMake option definitions..."
rg "option\s*\(\s*RPP"
# Check the main CMakeLists.txt content
echo "Checking main CMakeLists.txt content..."
cat CMakeLists.txt
# Look for any build configuration related files
echo "Looking for build configuration files..."
fd -e cmake -e txt -e in "CMake|cmake|build"
Length of output: 5280
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (1)
src/benchmarks/CMakeLists.txt (1)
27-28
: Remove extra blank lineThere's an unnecessary extra blank line that can be removed to maintain consistent spacing.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
src/benchmarks/CMakeLists.txt
(1 hunks)src/rpp/rpp/observers/details/fwd.hpp
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/rpp/rpp/observers/details/fwd.hpp
src/benchmarks/CMakeLists.txt
Outdated
if (RPP_DISABLE_DISPOSABLES_OPTIMZIATION) | ||
target_compile_definitions(${TARGET} PRIVATE RPP_DISABLE_DISPOSABLES_OPTIMZIATION) | ||
endif() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Fix spelling error in macro name across multiple files
The misspelled macro "OPTIMZIATION" appears in two files:
src/benchmarks/CMakeLists.txt
src/rpp/rpp/observers/details/fwd.hpp
Apply this fix in both locations:
-RPP_DISABLE_DISPOSABLES_OPTIMZIATION
+RPP_DISABLE_DISPOSABLES_OPTIMIZATION
🔗 Analysis chain
Fix spelling error in macro name
There's a typo in the macro name: "OPTIMZIATION" should be "OPTIMIZATION".
Apply this diff to fix the spelling:
-if (RPP_DISABLE_DISPOSABLES_OPTIMZIATION)
- target_compile_definitions(${TARGET} PRIVATE RPP_DISABLE_DISPOSABLES_OPTIMZIATION)
+if (RPP_DISABLE_DISPOSABLES_OPTIMIZATION)
+ target_compile_definitions(${TARGET} PRIVATE RPP_DISABLE_DISPOSABLES_OPTIMIZATION)
Note: This change needs to be synchronized with any other files or CI configurations that use this macro.
Let's verify other occurrences of this macro:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other occurrences of the misspelled macro
rg -i "OPTIMZIATION"
# Search for any CI workflow files that might reference this
fd -e yml -e yaml | xargs rg -i "optimz"
Length of output: 697
a9c9c3b
to
571fb95
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (3)
src/rpp/rpp/operators/retry.hpp (2)
44-44
: Improved state management architectureThe change from
None
toBoolean
disposables mode consolidates state management into a single source of truth, eliminating the need for redundant state tracking. This aligns with the broader effort to simplify state management and reduce complexity in multi-threaded scenarios.
Line range hint
44-80
: Well-structured state management improvementsThe changes demonstrate a thoughtful approach to simplifying state management while maintaining thread safety:
- The consolidated disposal state tracking reduces complexity
- Atomic operations in
retry_state_t
ensure thread safety- Error handling paths remain robust while removing redundant state checks
This addresses the concerns raised about state management complexity in the PR discussion while maintaining proper functionality.
src/rpp/rpp/operators/concat.hpp (1)
148-148
: LGTM! Simplified disposal state management.The simplified
is_disposed()
implementation reduces complexity by:
- Eliminating redundant state tracking
- Using only the inner child disposable's state
- Reducing the number of boolean checks, addressing the concerns about state management complexity
This change maintains correctness while making the code more maintainable.
This is a good example of how to simplify state management in reactive streams. Consider applying similar simplifications to other operators where multiple boolean states are being tracked.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (8)
src/rpp/rpp/observers/details/disposables_strategy.hpp
(1 hunks)src/rpp/rpp/observers/details/fwd.hpp
(3 hunks)src/rpp/rpp/operators/concat.hpp
(2 hunks)src/rpp/rpp/operators/delay.hpp
(1 hunks)src/rpp/rpp/operators/merge.hpp
(1 hunks)src/rpp/rpp/operators/retry.hpp
(2 hunks)src/rpp/rpp/sources/concat.hpp
(2 hunks)src/tests/utils/disposable_observable.hpp
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- src/rpp/rpp/observers/details/disposables_strategy.hpp
- src/rpp/rpp/observers/details/fwd.hpp
- src/rpp/rpp/operators/delay.hpp
- src/rpp/rpp/operators/merge.hpp
- src/rpp/rpp/sources/concat.hpp
- src/tests/utils/disposable_observable.hpp
🧰 Additional context used
📓 Learnings (1)
src/rpp/rpp/operators/concat.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#673
File: src/rpp/rpp/operators/concat.hpp:59-59
Timestamp: 2024-11-10T04:25:44.618Z
Learning: In the `concat_disposable` class in `src/rpp/rpp/operators/concat.hpp`, within the `drain` method, it's acceptable to call `get_observer()->on_completed()` even when `is_disposed()` returns true, because `refcounted.dispose()` could cause disposing of `this` object.
🔇 Additional comments (2)
src/rpp/rpp/operators/retry.hpp (1)
80-80
: Simplified disposal state checking
The implementation now relies solely on the state's disposal status, eliminating redundant state checking. This change reduces complexity and potential race conditions in multi-threaded scenarios.
Let's verify that all disposal scenarios are still properly handled:
✅ Verification successful
Based on the search results, I can now generate the final response:
Simplified disposal state checking is safe and well-tested
The change to rely solely on state->is_disposed()
is verified to be correct and safe because:
- The test cases in
test_retry.cpp
explicitly verify disposal scenarios, including "retry disposes on looping" and "retry satisfies disposable contracts" - The implementation is consistent with the retry operator's drain loop which checks
!state->is_disposed()
to control iteration - The state's disposal status is managed through atomic operations via composite_disposable, making it thread-safe
- The pattern of using state's disposal status as single source of truth is consistent with other operators in the codebase (e.g., concat, window, merge)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for test cases covering disposal scenarios
rg -A 5 "TEST.*retry.*dispos" tests/
# Search for error handling paths that might affect disposal
ast-grep --pattern 'on_error($$$) { $$$state->is_disposed()$$$ }'
Length of output: 153
Script:
#!/bin/bash
# Search for test files related to retry operator
fd -e cpp -e hpp "test.*retry"
# Search for disposal-related test cases in any test files
rg -A 5 "TEST.*retry.*dispos"
# Look for retry operator implementation details around disposal
rg -A 5 "is_disposed" src/
# Check for error handling paths in retry operator
ast-grep --pattern 'class retry {
$$$
on_error($$$) {
$$$
}
$$$
}'
Length of output: 126458
src/rpp/rpp/operators/concat.hpp (1)
118-118
: LGTM! Good improvement in disposables management.
The change from None
to Boolean
disposables mode simplifies state management by leveraging the built-in disposal tracking mechanism instead of maintaining separate state. This aligns well with the broader changes across the codebase to streamline disposable handling.
Quality Gate passedIssues Measures |
Summary by CodeRabbit
New Features
boolean_disposables_strategy
class for improved disposal state management.Boolean
mode to enhance observer control over disposal states.Improvements
Boolean
disposables mode, streamlining error handling and state management.Bug Fixes