Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate work for finished CurrentThreadExecutor to previous executor #494

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

andersk
Copy link
Contributor

@andersk andersk commented Feb 13, 2025

A CurrentThreadExecutor may terminate with work still remaining in its queue, or new work may be submitted later. We previously discarded remaining work, leading to deadlocks, and raised an error on submitting late work. But if there’s another CurrentThreadExecutor for the same thread below us on the stack, we should instead migrate the extra work there to allow it to eventually run.

Doing this in a thread-safe way requires replacing the queue.Queue abstraction with collections.deque and threading.ConditionVariable (the same primitives used to implement queue.Queue).

A CurrentThreadExecutor may terminate with work still remaining in its
queue, or new work may be submitted later.  We previously discarded
remaining work, leading to deadlocks, and raised an error on
submitting late work.  But if there’s another CurrentThreadExecutor
for the same thread below us on the stack, we should instead migrate
the extra work there to allow it to eventually run.

Doing this in a thread-safe way requires replacing the queue.Queue
abstraction with collections.deque and
threading.ConditionVariable (the same primitives used to implement
queue.Queue).

Fixes django#491; fixes django#492.

Signed-off-by: Anders Kaseorg <[email protected]>
@andersk andersk force-pushed the CurrentThreadExecutor-migrate branch from 1cd49b6 to 8a2717c Compare February 14, 2025 22:48
Comment on lines +97 to +107
# Resubmit remaining work to the closest running CurrentThreadExecutor
# on the stack, if any.
if self._work_items:
executor = self._old_executor
while executor is not None:
with executor._work_ready:
if not executor._broken:
executor._work_items.extend(self._work_items)
executor._work_ready.notify()
break
executor = executor._old_executor
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we could execute the remaining work immediately. This would block the async_to_sync caller for longer, but would avoid the possibility of silently discarding the work if we don’t find a running CurrentThreadExecutor:

        while self._work_items:
            work_item = self._work_items.popleft()
            work_item.run()

Which strategy makes more sense?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant