-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix auto-cancelled Sinks.many() accepting emissions #3725
Fix auto-cancelled Sinks.many() accepting emissions #3725
Conversation
@kaqqao Please sign the Contributor License Agreement! Click here to manually synchronize the status of this Pull Request. See the FAQ for frequently asked questions. |
1 similar comment
@kaqqao Please sign the Contributor License Agreement! Click here to manually synchronize the status of this Pull Request. See the FAQ for frequently asked questions. |
@kaqqao Thank you for signing the Contributor License Agreement! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also run spotlessApply
task for this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much for the effort so far! 🎉
I think the proposed implementation which checks for cancellation upon emission attempts looks simple and elegant. Unfortunately, I'm not sure it's correct. It would definitely improve the situation. However, I am worried about potential races. As I understand this improvement would prevent unnoticed leaks of values that end up in the queue despite having the sink already unusable. In concurrent scenarios, with the proposed implementation I think there is a risk that the cancellation is about to be triggered while an item is offered.
T1
: cancelling thread
T2
: emitting thread
Timeline:
T1: ---------------------------- L607:terminate() -- L610:q.clear() ------------------->
T2: L253:isCancelled()-> false -------------------------------------- L279:q.offer(t) ->
Now t
is in the queue and nothing will remove the reference.
As far as I understand, the access to this Sink
implementation is serialized however only from the producer perspective, so this race is possible. Probably you can confirm that using a JCStress test (it would be neat to add one for this case).
Do you agree about the risk? Would you be willing to verify and improve the implementation? Thanks in advance for your effort!
@kaqqao are you still interested in this? |
Closing due to lack of activity. Please reopen if you'd like to follow-up. |
Fixes #3715
I don't know if this is indeed the correct thing to do, but it does help demonstrate the situation and a possible solution.