-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
try to catch issue with subjects #697
Conversation
for more information, see https://pre-commit.ci
Warning Rate limit exceeded@victimsnino has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 11 minutes and 15 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThe pull request introduces a new test case in the Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src/tests/rpp/test_subjects.cpp (2)
172-172
: Test name could be more specific.Consider renaming the test case to better describe what specific aspect of the subject's behavior is being tested, e.g., "publish_subject handles nested subscription in on_next callback without missing values".
176-188
: Test case could be more comprehensive.The current test only verifies the last value received. Consider enhancing the test to:
- Verify that the nested subscription receives values correctly
- Verify that no values are missed in the original subscription
Here's a suggested improvement:
SUBCASE("subscribe inside on_next") { - int value = {}; + std::vector<int> values; + std::vector<int> nested_values; subject.get_observable().subscribe([&subject, &values, &nested_values](int v) { - subject.get_observable().subscribe([](int) {}); - value = v; + values.push_back(v); + subject.get_observable().subscribe([&nested_values](int v) { + nested_values.push_back(v); + }); }); for (size_t i = 0; i < 100; ++i) subject.get_observer().on_next(i); - REQUIRE(value == 100); + // Verify original subscription received all values + REQUIRE(values.size() == 100); + for (size_t i = 0; i < 100; ++i) + REQUIRE(values[i] == i); + + // Verify nested subscriptions receive subsequent values + REQUIRE(nested_values.size() == 99); // First value missed as subscription happens after + for (size_t i = 0; i < 99; ++i) + REQUIRE(nested_values[i] == i + 1);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/tests/rpp/test_subjects.cpp
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Cache deps on ci-macos for Release
src/tests/rpp/test_subjects.cpp
Outdated
for (size_t i = 0; i < 100; ++i) | ||
subject.get_observer().on_next(i); | ||
|
||
REQUIRE(value == 100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix incorrect assertion.
The test emits values from 0 to 99, so the last value would be 99, not 100.
Apply this diff to fix the assertion:
- REQUIRE(value == 100);
+ REQUIRE(value == 99);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
REQUIRE(value == 100); | |
REQUIRE(value == 99); |
BENCHMARK RESULTS (AUTOGENERATED)
|
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
Subscribe empty callbacks to empty observable | 300.32 ns | 1.55 ns | 1.54 ns | 1.00 | 1.85 ns |
Subscribe empty callbacks to empty observable via pipe operator | 302.84 ns | 1.54 ns | 1.54 ns | 1.00 | 1.85 ns |
Sources
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 687.24 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
from array of 1 - create + subscribe + current_thread | 1059.06 ns | 3.71 ns | 3.42 ns | 1.08 | 3.71 ns |
concat_as_source of just(1 immediate) create + subscribe | 2223.67 ns | 112.15 ns | 119.40 ns | 0.94 | 118.33 ns |
defer from array of 1 - defer + create + subscribe + immediate | 721.59 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
interval - interval + take(3) + subscribe + immediate | 2108.48 ns | 59.23 ns | 59.23 ns | 1.00 | 59.38 ns |
interval - interval + take(3) + subscribe + current_thread | 2987.56 ns | 32.46 ns | 32.46 ns | 1.00 | 33.99 ns |
from array of 1 - create + as_blocking + subscribe + new_thread | 29126.94 ns | 27363.81 ns | 27513.55 ns | 0.99 | 28154.85 ns |
from array of 1000 - create + as_blocking + subscribe + new_thread | 36971.63 ns | 51711.59 ns | 50418.20 ns | 1.03 | 51542.23 ns |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3458.64 ns | 141.09 ns | 134.69 ns | 1.05 | 149.77 ns |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+take(1)+subscribe | 1125.10 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just+filter(true)+subscribe | 831.39 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just(1,2)+skip(1)+subscribe | 999.77 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 891.63 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just(1,2)+first()+subscribe | 1260.57 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just(1,2)+last()+subscribe | 915.28 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just+take_last(1)+subscribe | 1190.11 ns | 17.91 ns | 19.67 ns | 0.91 | 19.45 ns |
immediate_just(1,2,3)+element_at(1)+subscribe | 831.75 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Schedulers
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate scheduler create worker + schedule | 264.86 ns | 0.46 ns | 0.62 ns | 0.75 | 1.54 ns |
current_thread scheduler create worker + schedule | 367.02 ns | 4.64 ns | 4.94 ns | 0.94 | 4.63 ns |
current_thread scheduler create worker + schedule + recursive schedule | 831.24 ns | 60.93 ns | 61.44 ns | 0.99 | 60.78 ns |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 859.60 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just+scan(10, std::plus)+subscribe | 897.63 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 2340.27 ns | 141.57 ns | 139.02 ns | 1.02 | 167.38 ns |
immediate_just+buffer(2)+subscribe | 1541.59 ns | 13.59 ns | 13.90 ns | 0.98 | 17.77 ns |
immediate_just+window(2)+subscribe + subscsribe inner | 2409.89 ns | 1315.67 ns | 1305.51 ns | 1.01 | 1428.72 ns |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 839.20 ns | - | - | 0.00 | - |
immediate_just+take_while(true)+subscribe | 863.37 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 1997.85 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3389.10 ns | 139.74 ns | 141.09 ns | 0.99 | 174.11 ns |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3634.05 ns | 156.47 ns | 154.18 ns | 1.01 | 161.88 ns |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 129.38 ns | 136.63 ns | 0.95 | 159.96 ns |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3597.01 ns | 392.72 ns | 422.58 ns | 0.93 | 368.75 ns |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 2131.17 ns | 209.60 ns | 217.91 ns | 0.96 | 212.02 ns |
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 3107.53 ns | 218.46 ns | 223.43 ns | 0.98 | 253.84 ns |
Subjects
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
publish_subject with 1 observer - on_next | 34.58 ns | 14.71 ns | 23.08 ns | 0.64 | 14.87 ns |
subscribe 100 observers to publish_subject | 202404.00 ns | 18930.91 ns | 16021.04 ns | 1.18 | 17549.30 ns |
100 on_next to 100 observers to publish_subject | 27156.93 ns | 16771.81 ns | 17139.20 ns | 0.98 | 19844.79 ns |
Scenarios
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
basic sample | 1500.16 ns | 13.90 ns | 13.28 ns | 1.05 | 22.85 ns |
basic sample with immediate scheduler | 1404.59 ns | 5.55 ns | 5.55 ns | 1.00 | 16.35 ns |
mix operators with disposables and without disposables | 6347.26 ns | 1426.73 ns | 1406.92 ns | 1.01 | 1875.61 ns |
single disposable and looooooong indentity chain | 25445.98 ns | 1022.15 ns | 1082.54 ns | 0.94 | 5173.30 ns |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 985.33 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2081.39 ns | 995.99 ns | 986.65 ns | 1.01 | 1031.39 ns |
create(on_error())+retry(1)+subscribe | 596.49 ns | 124.61 ns | 110.10 ns | 1.13 | 114.41 ns |
ci-macos
General
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
Subscribe empty callbacks to empty observable | 370.69 ns | 0.93 ns | 0.47 ns | 1.98 | 1.01 ns |
Subscribe empty callbacks to empty observable via pipe operator | 361.77 ns | 0.96 ns | 0.47 ns | 2.04 | 1.01 ns |
Sources
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 695.11 ns | 0.32 ns | 0.31 ns | 1.04 | 0.36 ns |
from array of 1 - create + subscribe + current_thread | 912.68 ns | 4.14 ns | 4.15 ns | 1.00 | 4.31 ns |
concat_as_source of just(1 immediate) create + subscribe | 2073.30 ns | 165.94 ns | 177.96 ns | 0.93 | 171.77 ns |
defer from array of 1 - defer + create + subscribe + immediate | 764.87 ns | 0.31 ns | 0.31 ns | 1.01 | 0.34 ns |
interval - interval + take(3) + subscribe + immediate | 1922.58 ns | 50.82 ns | 49.72 ns | 1.02 | 55.93 ns |
interval - interval + take(3) + subscribe + current_thread | 2349.03 ns | 29.46 ns | 29.36 ns | 1.00 | 32.04 ns |
from array of 1 - create + as_blocking + subscribe + new_thread | 23728.72 ns | 23144.49 ns | 16740.23 ns | 1.38 | 19817.72 ns |
from array of 1000 - create + as_blocking + subscribe + new_thread | 29967.95 ns | 26045.83 ns | 23141.67 ns | 1.13 | 28180.53 ns |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3052.28 ns | 176.06 ns | 195.50 ns | 0.90 | 191.67 ns |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+take(1)+subscribe | 1070.47 ns | 0.31 ns | 0.32 ns | 0.97 | 0.32 ns |
immediate_just+filter(true)+subscribe | 801.11 ns | 0.31 ns | 0.32 ns | 0.96 | 0.31 ns |
immediate_just(1,2)+skip(1)+subscribe | 1036.21 ns | 0.31 ns | 0.34 ns | 0.93 | 0.34 ns |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 806.64 ns | 0.32 ns | 0.34 ns | 0.95 | 0.34 ns |
immediate_just(1,2)+first()+subscribe | 1288.94 ns | 0.31 ns | 0.36 ns | 0.87 | 0.32 ns |
immediate_just(1,2)+last()+subscribe | 941.17 ns | 0.53 ns | 0.91 ns | 0.58 | 0.91 ns |
immediate_just+take_last(1)+subscribe | 1119.09 ns | 0.31 ns | 0.33 ns | 0.96 | 0.32 ns |
immediate_just(1,2,3)+element_at(1)+subscribe | 795.31 ns | 0.31 ns | 0.38 ns | 0.82 | 0.34 ns |
Schedulers
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate scheduler create worker + schedule | 293.37 ns | 0.93 ns | 0.47 ns | 1.98 | 0.99 ns |
current_thread scheduler create worker + schedule | 417.46 ns | 4.37 ns | 4.06 ns | 1.08 | 4.43 ns |
current_thread scheduler create worker + schedule + recursive schedule | 695.95 ns | 63.43 ns | 61.69 ns | 1.03 | 66.76 ns |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 801.81 ns | 2.47 ns | 2.76 ns | 0.89 | 2.50 ns |
immediate_just+scan(10, std::plus)+subscribe | 917.98 ns | 0.31 ns | 0.34 ns | 0.93 | 0.34 ns |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 1981.08 ns | 182.69 ns | 191.86 ns | 0.95 | 200.36 ns |
immediate_just+buffer(2)+subscribe | 945.87 ns | 15.34 ns | 19.49 ns | 0.79 | 17.84 ns |
immediate_just+window(2)+subscribe + subscsribe inner | 1866.54 ns | 968.67 ns | 1072.01 ns | 0.90 | 1038.61 ns |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 786.67 ns | - | - | 0.00 | - |
immediate_just+take_while(true)+subscribe | 805.80 ns | 0.31 ns | 0.34 ns | 0.91 | 0.31 ns |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 1800.82 ns | 2.08 ns | 1.87 ns | 1.11 | 1.94 ns |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 2785.96 ns | 201.25 ns | 211.83 ns | 0.95 | 219.73 ns |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3017.92 ns | 199.08 ns | 223.47 ns | 0.89 | 218.90 ns |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 202.92 ns | 220.23 ns | 0.92 | 215.12 ns |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 2933.29 ns | 498.49 ns | 535.99 ns | 0.93 | 534.91 ns |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 1956.21 ns | 325.09 ns | 347.09 ns | 0.94 | 342.85 ns |
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 2815.00 ns | 321.36 ns | 352.22 ns | 0.91 | 361.13 ns |
Subjects
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
publish_subject with 1 observer - on_next | 41.28 ns | 21.46 ns | 23.15 ns | 0.93 | 23.78 ns |
subscribe 100 observers to publish_subject | 128833.33 ns | 16565.83 ns | 16153.75 ns | 1.03 | 18806.02 ns |
100 on_next to 100 observers to publish_subject | 32925.78 ns | 14958.33 ns | 14241.78 ns | 1.05 | 16335.18 ns |
Scenarios
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
basic sample | 1263.24 ns | 12.05 ns | 11.33 ns | 1.06 | 24.89 ns |
basic sample with immediate scheduler | 1223.02 ns | 5.40 ns | 5.00 ns | 1.08 | 12.28 ns |
mix operators with disposables and without disposables | 5582.73 ns | 1372.83 ns | 1459.39 ns | 0.94 | 1659.51 ns |
single disposable and looooooong indentity chain | 16323.41 ns | 1663.86 ns | 1787.56 ns | 0.93 | 3681.94 ns |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 945.31 ns | 0.31 ns | 0.44 ns | 0.71 | 0.33 ns |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 3300.14 ns | 2524.83 ns | 2985.82 ns | 0.85 | 2745.50 ns |
create(on_error())+retry(1)+subscribe | 674.33 ns | 167.18 ns | 185.47 ns | 0.90 | 184.16 ns |
ci-ubuntu-clang
General
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
Subscribe empty callbacks to empty observable | 267.95 ns | 1.54 ns | 1.54 ns | 1.00 | 0.64 ns |
Subscribe empty callbacks to empty observable via pipe operator | 271.03 ns | 1.54 ns | 1.54 ns | 1.00 | 0.63 ns |
Sources
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 553.33 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
from array of 1 - create + subscribe + current_thread | 786.74 ns | 4.01 ns | 4.01 ns | 1.00 | 4.01 ns |
concat_as_source of just(1 immediate) create + subscribe | 2373.63 ns | 130.03 ns | 129.44 ns | 1.00 | 130.56 ns |
defer from array of 1 - defer + create + subscribe + immediate | 802.34 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
interval - interval + take(3) + subscribe + immediate | 2249.59 ns | 58.31 ns | 58.30 ns | 1.00 | 58.31 ns |
interval - interval + take(3) + subscribe + current_thread | 3178.97 ns | 30.88 ns | 30.86 ns | 1.00 | 31.50 ns |
from array of 1 - create + as_blocking + subscribe + new_thread | 29425.54 ns | 28038.35 ns | 27626.36 ns | 1.01 | 28538.72 ns |
from array of 1000 - create + as_blocking + subscribe + new_thread | 41186.50 ns | 38477.74 ns | 36648.62 ns | 1.05 | 37600.11 ns |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 3685.71 ns | 148.18 ns | 148.97 ns | 0.99 | 150.38 ns |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+take(1)+subscribe | 1173.50 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just+filter(true)+subscribe | 864.32 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just(1,2)+skip(1)+subscribe | 1118.57 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 910.19 ns | 0.31 ns | 0.62 ns | 0.50 | 0.31 ns |
immediate_just(1,2)+first()+subscribe | 1423.47 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just(1,2)+last()+subscribe | 1027.44 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just+take_last(1)+subscribe | 1213.93 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just(1,2,3)+element_at(1)+subscribe | 894.79 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Schedulers
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate scheduler create worker + schedule | 279.93 ns | 0.63 ns | 0.63 ns | 1.00 | 1.54 ns |
current_thread scheduler create worker + schedule | 420.80 ns | 4.32 ns | 4.01 ns | 1.08 | 4.01 ns |
current_thread scheduler create worker + schedule + recursive schedule | 874.57 ns | 54.81 ns | 55.45 ns | 0.99 | 61.92 ns |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 875.43 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
immediate_just+scan(10, std::plus)+subscribe | 1011.57 ns | 0.62 ns | 0.31 ns | 2.00 | 0.31 ns |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 2283.70 ns | 139.26 ns | 140.25 ns | 0.99 | 137.07 ns |
immediate_just+buffer(2)+subscribe | 1561.33 ns | 14.19 ns | 13.58 ns | 1.05 | 14.83 ns |
immediate_just+window(2)+subscribe + subscsribe inner | 2485.73 ns | 895.28 ns | 911.55 ns | 0.98 | 894.53 ns |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 866.64 ns | - | - | 0.00 | - |
immediate_just+take_while(true)+subscribe | 875.59 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 2032.21 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 3317.16 ns | 161.40 ns | 159.33 ns | 1.01 | 156.86 ns |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 3765.98 ns | 139.19 ns | 139.67 ns | 1.00 | 140.68 ns |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 140.12 ns | 141.58 ns | 0.99 | 138.64 ns |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 3404.17 ns | 378.18 ns | 396.37 ns | 0.95 | 378.58 ns |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 2322.83 ns | 200.86 ns | 195.38 ns | 1.03 | 198.54 ns |
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 3261.20 ns | 221.68 ns | 223.16 ns | 0.99 | 227.31 ns |
Subjects
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
publish_subject with 1 observer - on_next | 54.08 ns | 19.15 ns | 17.72 ns | 1.08 | 20.17 ns |
subscribe 100 observers to publish_subject | 207790.17 ns | 17356.09 ns | 16194.69 ns | 1.07 | 17296.60 ns |
100 on_next to 100 observers to publish_subject | 37392.52 ns | 20127.81 ns | 23516.11 ns | 0.86 | 20131.32 ns |
Scenarios
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
basic sample | 1335.79 ns | 11.42 ns | 11.11 ns | 1.03 | 20.72 ns |
basic sample with immediate scheduler | 1340.79 ns | 5.86 ns | 5.86 ns | 1.00 | 6.49 ns |
mix operators with disposables and without disposables | 6562.21 ns | 1182.67 ns | 1167.99 ns | 1.01 | 1619.72 ns |
single disposable and looooooong indentity chain | 27403.61 ns | 1236.79 ns | 1244.48 ns | 0.99 | 4484.37 ns |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 1026.98 ns | 0.31 ns | 0.31 ns | 1.00 | 0.31 ns |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 2196.55 ns | 1160.14 ns | 1155.68 ns | 1.00 | 1182.82 ns |
create(on_error())+retry(1)+subscribe | 678.05 ns | 137.83 ns | 138.78 ns | 0.99 | 139.57 ns |
ci-windows
General
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
Subscribe empty callbacks to empty observable | 574.93 ns | 1.85 ns | 1.85 ns | 1.00 | 1.85 ns |
Subscribe empty callbacks to empty observable via pipe operator | 584.46 ns | 1.85 ns | 1.85 ns | 1.00 | 1.85 ns |
Sources
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
from array of 1 - create + subscribe + immediate | 1160.52 ns | 5.24 ns | 5.86 ns | 0.89 | 5.86 ns |
from array of 1 - create + subscribe + current_thread | 1427.62 ns | 15.46 ns | 15.75 ns | 0.98 | 15.75 ns |
concat_as_source of just(1 immediate) create + subscribe | 3766.99 ns | 167.17 ns | 164.24 ns | 1.02 | 180.64 ns |
defer from array of 1 - defer + create + subscribe + immediate | 1191.89 ns | 5.24 ns | 5.55 ns | 0.94 | 5.86 ns |
interval - interval + take(3) + subscribe + immediate | 3237.69 ns | 140.80 ns | 140.92 ns | 1.00 | 142.10 ns |
interval - interval + take(3) + subscribe + current_thread | 3723.55 ns | 61.72 ns | 60.09 ns | 1.03 | 61.85 ns |
from array of 1 - create + as_blocking + subscribe + new_thread | 139387.50 ns | 129550.00 ns | 112310.00 ns | 1.15 | 121733.33 ns |
from array of 1000 - create + as_blocking + subscribe + new_thread | 140442.86 ns | 148628.57 ns | 132266.67 ns | 1.12 | 139000.00 ns |
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe | 5388.54 ns | 207.96 ns | 200.24 ns | 1.04 | 210.23 ns |
Filtering Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+take(1)+subscribe | 1847.16 ns | 19.73 ns | 19.74 ns | 1.00 | 21.29 ns |
immediate_just+filter(true)+subscribe | 1626.84 ns | 18.80 ns | 18.82 ns | 1.00 | 21.59 ns |
immediate_just(1,2)+skip(1)+subscribe | 1702.10 ns | 18.50 ns | 18.51 ns | 1.00 | 21.60 ns |
immediate_just(1,1,2)+distinct_until_changed()+subscribe | 1360.00 ns | 23.44 ns | 23.50 ns | 1.00 | 26.84 ns |
immediate_just(1,2)+first()+subscribe | 2384.38 ns | 17.28 ns | 17.29 ns | 1.00 | 19.45 ns |
immediate_just(1,2)+last()+subscribe | 1464.59 ns | 18.51 ns | 18.51 ns | 1.00 | 22.83 ns |
immediate_just+take_last(1)+subscribe | 2007.67 ns | 64.24 ns | 64.79 ns | 0.99 | 69.69 ns |
immediate_just(1,2,3)+element_at(1)+subscribe | 1646.05 ns | 21.90 ns | 21.93 ns | 1.00 | 23.46 ns |
Schedulers
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate scheduler create worker + schedule | 485.90 ns | 4.63 ns | 4.94 ns | 0.94 | 4.01 ns |
current_thread scheduler create worker + schedule | 651.03 ns | 11.71 ns | 11.16 ns | 1.05 | 11.19 ns |
current_thread scheduler create worker + schedule + recursive schedule | 1342.39 ns | 101.85 ns | 99.92 ns | 1.02 | 97.41 ns |
Transforming Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+map(v*2)+subscribe | 1327.24 ns | 18.81 ns | 18.81 ns | 1.00 | 21.60 ns |
immediate_just+scan(10, std::plus)+subscribe | 1445.76 ns | 20.96 ns | 21.29 ns | 0.98 | 23.75 ns |
immediate_just+flat_map(immediate_just(v*2))+subscribe | 3870.59 ns | 182.18 ns | 184.14 ns | 0.99 | 203.50 ns |
immediate_just+buffer(2)+subscribe | 2321.54 ns | 64.96 ns | 65.55 ns | 0.99 | 67.82 ns |
immediate_just+window(2)+subscribe + subscsribe inner | 4051.22 ns | 1200.84 ns | 1310.67 ns | 0.92 | 1218.19 ns |
Conditional Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+take_while(false)+subscribe | 1380.83 ns | 17.57 ns | 17.57 ns | 1.00 | 19.12 ns |
immediate_just+take_while(true)+subscribe | 1312.99 ns | 18.82 ns | 18.81 ns | 1.00 | 21.59 ns |
Utility Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just(1)+subscribe_on(immediate)+subscribe | 3314.83 ns | 11.11 ns | 11.10 ns | 1.00 | 11.11 ns |
Combining Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe | 5214.14 ns | 195.86 ns | 203.87 ns | 0.96 | 216.99 ns |
immediate_just(1) + merge_with(immediate_just(2)) + subscribe | 5895.57 ns | 185.29 ns | 194.07 ns | 0.95 | 205.36 ns |
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe | - | 186.60 ns | 195.08 ns | 0.96 | 193.13 ns |
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe | 6211.29 ns | 434.78 ns | 438.11 ns | 0.99 | 455.22 ns |
immediate_just(1) + zip(immediate_just(2)) + subscribe | 3850.90 ns | 504.71 ns | 522.22 ns | 0.97 | 517.95 ns |
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe | 4957.08 ns | 322.71 ns | 328.46 ns | 0.98 | 337.17 ns |
Subjects
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
publish_subject with 1 observer - on_next | 36.90 ns | 29.29 ns | 19.94 ns | 1.47 | 29.62 ns |
subscribe 100 observers to publish_subject | 261000.00 ns | 26292.50 ns | 27814.29 ns | 0.95 | 26834.09 ns |
100 on_next to 100 observers to publish_subject | 54838.89 ns | 32840.00 ns | 33112.90 ns | 0.99 | 36386.21 ns |
Scenarios
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
basic sample | 1906.90 ns | 95.66 ns | 96.33 ns | 0.99 | 112.09 ns |
basic sample with immediate scheduler | 1931.99 ns | 68.20 ns | 68.64 ns | 0.99 | 82.08 ns |
mix operators with disposables and without disposables | 10195.05 ns | 1902.58 ns | 1892.18 ns | 1.01 | 2560.14 ns |
single disposable and looooooong indentity chain | 26923.81 ns | 1686.09 ns | 1699.22 ns | 0.99 | 6479.78 ns |
Aggregating Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
immediate_just+reduce(10, std::plus)+subscribe | 1468.62 ns | 19.42 ns | 19.43 ns | 1.00 | 22.81 ns |
Error Handling Operators
name | rxcpp | rpp | prev rpp | ratio | rpp no optimization |
---|---|---|---|---|---|
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe | 1916.63 ns | 353.46 ns | 349.92 ns | 1.01 | 364.91 ns |
create(on_error())+retry(1)+subscribe | 1553.78 ns | 137.89 ns | 142.97 ns | 0.96 | 139.91 ns |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## v2 #697 +/- ##
==========================================
+ Coverage 98.61% 98.62% +0.01%
==========================================
Files 156 156
Lines 9786 9800 +14
==========================================
+ Hits 9650 9665 +15
+ Misses 136 135 -1 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/tests/rpp/test_subjects.cpp (1)
172-190
: Consider memory and performance implications of the test.While the test correctly verifies the subject's behavior with dynamic subscriptions, it creates 10,000 subscriptions (100 per on_next * 100 emissions) without cleanup. Consider:
- Adding explicit cleanup verification
- Reducing the number of iterations for faster test execution
- Testing unsubscribe scenarios
Apply this diff to improve the test:
TEST_CASE("subject handles addition from inside on_next properly") { rpp::subjects::publish_subject<int> subject{}; SUBCASE("subscribe inside on_next") { int value = {}; + auto disposable = rpp::composite_disposable_wrapper::make(); subject.get_observable().subscribe([&subject, &value](int v) { - for (int i = 0; i < 100; ++i) - subject.get_observable().subscribe([](int) {}); + for (int i = 0; i < 10; ++i) + subject.get_observable().subscribe(disposable, [](int) {}); value = v; }); - for (int i = 0; i < 100; ++i) + for (int i = 0; i < 10; ++i) subject.get_observer().on_next(i); - REQUIRE(value == 99); + REQUIRE(value == 9); + + // Verify cleanup + disposable.dispose(); } + + SUBCASE("unsubscribe inside on_next") + { + int value = {}; + auto disposable = rpp::composite_disposable_wrapper::make(); + + subject.get_observable().subscribe(disposable, [&subject, &value](int v) { + value = v; + disposable.dispose(); // Unsubscribe during on_next + }); + + subject.get_observer().on_next(0); + subject.get_observer().on_next(1); + + REQUIRE(value == 0); // Only first value should be received + } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/tests/rpp/test_subjects.cpp
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (18)
- GitHub Check: benchmarks ci-windows Debug (Optimizations disabled)
- GitHub Check: benchmarks ci-windows Debug
- GitHub Check: benchmarks ci-windows Release (Optimizations disabled)
- GitHub Check: benchmarks ci-windows Release
- GitHub Check: tests ci-windows Debug (Optimizations disabled)
- GitHub Check: tests ci-windows Debug
- GitHub Check: tests ci-windows Release (Optimizations disabled)
- GitHub Check: tests ci-ubuntu-clang Debug (Optimizations disabled)
- GitHub Check: tests ci-ubuntu-clang Debug
- GitHub Check: tests ci-ubuntu-clang Release (Optimizations disabled)
- GitHub Check: tests ci-ubuntu-clang Release
- GitHub Check: tests ci-ubuntu-gcc Debug (Optimizations disabled)
- GitHub Check: tests ci-ubuntu-gcc Debug
- GitHub Check: Sanitize - ubsan
- GitHub Check: Sanitize - lsan
- GitHub Check: tests ci-ubuntu-gcc Release (Optimizations disabled)
- GitHub Check: Sanitize - asan
- GitHub Check: tests ci-ubuntu-gcc Release
@coderabbitai full review |
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)
124-133
: Consider implementing a more robust subscription modification patternThe current implementation attempts to handle dynamic subscriptions during
on_next
, but this creates complex thread-safety concerns. Consider these architectural alternatives:
- Queue-based approach: Buffer subscription modifications during emission
- Copy-on-Write pattern: Use a snapshot of observers for the entire emission
- Double-buffering: Maintain separate lists for current and pending modifications
Example implementation of the Copy-on-Write pattern:
void on_next(const Type& v) { std::shared_ptr<observers> current_observers; { std::lock_guard observers_lock{m_mutex}; if (!std::holds_alternative<shared_observers>(m_state)) return; current_observers = std::get<shared_observers>(m_state); if (!current_observers) return; } std::lock_guard serialized_lock{m_serialized_mutex}; for (const auto& observer : *current_observers) { observer->on_next(v); } }This approach:
- Provides cleaner separation between subscription management and emission
- Reduces lock contention
- Makes the code's intent clearer
- Maintains consistency during emission
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/rpp/rpp/subjects/details/subject_state.hpp
(1 hunks)
🧰 Additional context used
📓 Learnings (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#667
File: src/rpp/rpp/subjects/details/subject_state.hpp:168-168
Timestamp: 2024-11-10T04:25:44.618Z
Learning: In the `subject_state` class, observers are not thread-safe, so we should not check `obs->is_disposed()` in the `cleanup_observers` function.
auto itr = observers->cbegin(); | ||
const auto size = observers->size(); | ||
|
||
observers_lock.unlock(); | ||
|
||
std::lock_guard lock{m_serialized_mutex}; | ||
std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); }); | ||
for (size_t i = 0; i < size; ++i) | ||
{ | ||
(itr++)->on_next(v); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Fix potential race conditions and iterator invalidation
The current implementation has several thread-safety issues:
- Race Condition: Capturing
size
and releasing the lock creates a time window where the deque could be modified, making the size inconsistent with the actual number of observers. - Iterator Invalidation: The iterator might be invalidated if the deque is modified after unlocking.
- Memory Safety: The shared_ptr is released while keeping its iterator, which could lead to undefined behavior.
Consider this safer implementation:
- auto itr = observers->cbegin();
- const auto size = observers->size();
-
- observers_lock.unlock();
-
- std::lock_guard lock{m_serialized_mutex};
- for (size_t i = 0; i < size; ++i)
- {
- (itr++)->on_next(v);
- }
+ std::lock_guard serialized_lock{m_serialized_mutex};
+ // Keep shared_ptr alive during iteration
+ for (const auto& observer : *observers)
+ {
+ observer->on_next(v);
+ }
This solution:
- Maintains the shared_ptr throughout iteration
- Prevents iterator invalidation
- Eliminates the race condition window
- Simplifies the code
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
auto itr = observers->cbegin(); | |
const auto size = observers->size(); | |
observers_lock.unlock(); | |
std::lock_guard lock{m_serialized_mutex}; | |
std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); }); | |
for (size_t i = 0; i < size; ++i) | |
{ | |
(itr++)->on_next(v); | |
} | |
std::lock_guard serialized_lock{m_serialized_mutex}; | |
// Keep shared_ptr alive during iteration | |
for (const auto& observer : *observers) | |
{ | |
observer->on_next(v); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/tests/rpp/test_subjects.cpp (1)
172-190
: Enhance test coverage with edge casesWhile the test verifies basic functionality, consider adding test cases for:
- Concurrent subscribe/unsubscribe operations
- Error handling during subscription
- Memory pressure scenarios
Would you like me to generate additional test cases to cover these scenarios?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/rpp/rpp/subjects/details/subject_state.hpp
(1 hunks)src/tests/rpp/test_subjects.cpp
(1 hunks)
🧰 Additional context used
🪛 GitHub Check: benchmarks ci-macos Release (Optimizations disabled)
src/rpp/rpp/subjects/details/subject_state.hpp
[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'
🪛 GitHub Check: benchmarks ci-ubuntu-clang Release
src/rpp/rpp/subjects/details/subject_state.hpp
[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'
🪛 GitHub Check: Sanitize - ubsan
src/rpp/rpp/subjects/details/subject_state.hpp
[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'
🪛 GitHub Check: Sanitize - lsan
src/rpp/rpp/subjects/details/subject_state.hpp
[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'
🪛 GitHub Check: Sanitize - asan
src/rpp/rpp/subjects/details/subject_state.hpp
[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'
🪛 GitHub Actions: CI v2
src/rpp/rpp/subjects/details/subject_state.hpp
[error] 132-132: no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'
🔇 Additional comments (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)
124-133
:⚠️ Potential issueCritical: Fix multiple thread-safety and memory safety issues
The current implementation has several critical issues:
- Race Condition: Unlocking after capturing size and iterator creates a window where the deque can be modified
- Iterator Invalidation: The iterator might be invalidated after unlocking
- Memory Safety: The shared_ptr (observers) might be released while keeping its iterator
- Method Access: Pipeline failures indicate issues with accessing 'on_next'
Apply this safer implementation:
- auto itr = observers->cbegin(); - const auto size = observers->size(); - - observers_lock.unlock(); - - std::lock_guard lock{m_serialized_mutex}; - for (size_t i = 0; i < size; ++i) - { - (itr++)->on_next(v); - } + std::lock_guard serialized_lock{m_serialized_mutex}; + for (const auto& observer : *observers) + { + observer->on_next(v); + }This solution:
- Maintains the shared_ptr throughout iteration
- Prevents iterator invalidation
- Eliminates the race condition window
- Uses proper member access syntax
Likely invalid or redundant comment.
🧰 Tools
🪛 GitHub Check: benchmarks ci-macos Release (Optimizations disabled)
[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'🪛 GitHub Check: benchmarks ci-ubuntu-clang Release
[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'🪛 GitHub Check: Sanitize - ubsan
[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'🪛 GitHub Check: Sanitize - lsan
[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'🪛 GitHub Check: Sanitize - asan
[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'🪛 GitHub Actions: CI v2
[error] 132-132: no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'
subject.get_observable().subscribe([&subject, &value](int v) { | ||
for (int i = 0; i < 100; ++i) | ||
subject.get_observable().subscribe([](int) {}); | ||
value = v; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve subscription management in test
The test creates 100 subscriptions without proper cleanup, which could lead to resource leaks.
Apply this diff to properly manage subscriptions:
- for (int i = 0; i < 100; ++i)
- subject.get_observable().subscribe([](int) {});
+ auto d = rpp::composite_disposable_wrapper::make();
+ for (int i = 0; i < 100; ++i)
+ subject.get_observable().subscribe(d, [](int) {});
Committable suggestion skipped: line range outside the PR's diff.
|
Summary by CodeRabbit
publish_subject
.