From ddd6e9a66b26f468e3a853152b72e53ecc39799b Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 10 Jan 2025 10:03:27 +0300 Subject: [PATCH 01/10] try to catch issue --- src/tests/rpp/test_subjects.cpp | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index 3293d46fe..f58a570cd 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -169,6 +169,25 @@ TEST_CASE("subject can be modified from on_next call") } } +TEST_CASE("subject handles addition from inside on_next properly") +{ + rpp::subjects::publish_subject subject{}; + + SUBCASE("subscribe inside on_next") + { + int value = {}; + subject.get_observable().subscribe([&subject, &value](int v) { + subject.get_observable().subscribe([](int){}); + value = v; + }); + + for (size_t i =0; i < 100; ++i) + subject.get_observer().on_next(i); + + REQUIRE(value == 100); + } +} + TEST_CASE("publish subject caches error/completed") { auto mock = mock_observer_strategy{}; From 15291162a42ed094c2c8b0774b40f7fdf4855c8d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 10 Jan 2025 07:03:45 +0000 Subject: [PATCH 02/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tests/rpp/test_subjects.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index f58a570cd..f312a93af 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -177,11 +177,11 @@ TEST_CASE("subject handles addition from inside on_next properly") { int value = {}; subject.get_observable().subscribe([&subject, &value](int v) { - subject.get_observable().subscribe([](int){}); + subject.get_observable().subscribe([](int) {}); value = v; }); - for (size_t i =0; i < 100; ++i) + for (size_t i = 0; i < 100; ++i) subject.get_observer().on_next(i); REQUIRE(value == 100); From a0691f6310ca80a0f67cea9747af8df570aec24c Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 10 Jan 2025 12:49:14 +0300 Subject: [PATCH 03/10] Update test_subjects.cpp --- src/tests/rpp/test_subjects.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index f312a93af..abc108030 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -184,7 +184,7 @@ TEST_CASE("subject handles addition from inside on_next properly") for (size_t i = 0; i < 100; ++i) subject.get_observer().on_next(i); - REQUIRE(value == 100); + REQUIRE(value == 99); } } From ae5cbc73b6ae9e8b3dee408eaa657768745f7590 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 10 Jan 2025 12:55:45 +0300 Subject: [PATCH 04/10] Update test_subjects.cpp --- src/tests/rpp/test_subjects.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index abc108030..f6e96b205 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -181,7 +181,7 @@ TEST_CASE("subject handles addition from inside on_next properly") value = v; }); - for (size_t i = 0; i < 100; ++i) + for (int i = 0; i < 100; ++i) subject.get_observer().on_next(i); REQUIRE(value == 99); From 7b0bc3c5e6229afc3f601a70b6c5bd6484f6f6b1 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 10 Jan 2025 14:37:49 +0300 Subject: [PATCH 05/10] Update test_subjects.cpp --- src/tests/rpp/test_subjects.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index f6e96b205..8a18a559d 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -177,7 +177,8 @@ TEST_CASE("subject handles addition from inside on_next properly") { int value = {}; subject.get_observable().subscribe([&subject, &value](int v) { - subject.get_observable().subscribe([](int) {}); + for (int i = 0; i < 100; ++i) + subject.get_observable().subscribe([](int) {}); value = v; }); From 73c834e9f4cfe3d1902d3fc742d5210f3009f1e6 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 10 Jan 2025 22:45:19 +0300 Subject: [PATCH 06/10] fix --- src/rpp/rpp/subjects/details/subject_state.hpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index 78bfd6908..a51363773 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -121,13 +121,16 @@ namespace rpp::subjects::details if (!observers) return; - const auto begin = observers->cbegin(); - const auto end = observers->cend(); + auto itr = observers->cbegin(); + const auto size = observers->size(); observers_lock.unlock(); std::lock_guard lock{m_serialized_mutex}; - std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); }); + for (size_t i = 0; i < size; ++i) + { + (itr++)->on_next(v); + } } void on_error(const std::exception_ptr& err) From b2bd784952a46a38a24c24749a8cd9021348bb6b Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Fri, 10 Jan 2025 22:58:24 +0300 Subject: [PATCH 07/10] fix --- src/rpp/rpp/subjects/details/subject_state.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index a51363773..48a4c65f8 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -129,7 +129,7 @@ namespace rpp::subjects::details std::lock_guard lock{m_serialized_mutex}; for (size_t i = 0; i < size; ++i) { - (itr++)->on_next(v); + (*(itr++))->on_next(v); } } From 5dba2e2ad1c039bc39939e8ebd03c44e35798f57 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 11 Jan 2025 12:26:52 +0300 Subject: [PATCH 08/10] fix --- src/rpp/rpp/subjects/details/subject_state.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index 48a4c65f8..bb5064aba 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -20,7 +20,7 @@ #include #include -#include +#include #include #include #include @@ -67,7 +67,7 @@ namespace rpp::subjects::details }; using observer = std::shared_ptr>; - using observers = std::deque; + using observers = std::list; using shared_observers = std::shared_ptr; using state_t = std::variant; From f180ef36e0fdb5e9aee485ece02cdd4b159a4010 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 11 Jan 2025 13:26:50 +0300 Subject: [PATCH 09/10] speedup --- .../rpp/subjects/details/subject_state.hpp | 39 ++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index bb5064aba..ce626d926 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -112,25 +112,21 @@ namespace rpp::subjects::details void on_next(const Type& v) { std::unique_lock observers_lock{m_mutex}; + process_state_unsafe(m_state, [&](shared_observers observers) { + if (!observers) + return; - if (!std::holds_alternative(m_state)) - return; + auto itr = observers->cbegin(); + const auto size = observers->size(); - // we are getting copy of curent deque and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call - const auto observers = std::get(m_state); - if (!observers) - return; + observers_lock.unlock(); - auto itr = observers->cbegin(); - const auto size = observers->size(); - - observers_lock.unlock(); - - std::lock_guard lock{m_serialized_mutex}; - for (size_t i = 0; i < size; ++i) - { - (*(itr++))->on_next(v); - } + std::lock_guard lock{m_serialized_mutex}; + for (size_t i = 0; i < size; ++i) + { + (*(itr++))->on_next(v); + } + }); } void on_error(const std::exception_ptr& err) @@ -174,19 +170,18 @@ namespace rpp::subjects::details return subs; } - static void process_state_unsafe(const state_t& state, const auto&... actions) + static auto process_state_unsafe(const state_t& state, const auto&... actions) { - std::visit(rpp::utils::overloaded{actions..., rpp::utils::empty_function_any_t{}}, state); + return std::visit(rpp::utils::overloaded{actions..., rpp::utils::empty_function_any_t{}}, state); } shared_observers exchange_observers_under_lock_if_there(state_t&& new_val) { std::lock_guard lock{m_mutex}; - if (!std::holds_alternative(m_state)) - return {}; - - return std::get(std::exchange(m_state, std::move(new_val))); + return process_state_unsafe(m_state, [&](shared_observers observers) { + m_state = std::move(new_val); + return std::move(observers); }, [](auto) { return shared_observers{}; }); } private: From 622be88b398e57ee1a2a29db34cbdc089f99128e Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 11 Jan 2025 13:39:27 +0300 Subject: [PATCH 10/10] fix --- src/rpp/rpp/subjects/details/subject_state.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index ce626d926..41e53a75a 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -181,7 +181,7 @@ namespace rpp::subjects::details return process_state_unsafe(m_state, [&](shared_observers observers) { m_state = std::move(new_val); - return std::move(observers); }, [](auto) { return shared_observers{}; }); + return observers; }, [](auto) { return shared_observers{}; }); } private: