Skip to content

Commit

Permalink
Re-add non-context version of sync_to_async_gen
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeshardmind committed Jan 18, 2025
1 parent 11d1e2c commit 36c3314
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/async_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
__author__ = "Michael Hall"
__license__ = "Apache-2.0"
__copyright__ = "Copyright 2020-Present Michael Hall"
__version__ = "2025.01.15"
__version__ = "2025.01.17"

import os
import sys
Expand Down
130 changes: 95 additions & 35 deletions src/async_utils/gen_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
from collections.abc import AsyncGenerator, Callable, Generator
from threading import Event

from . import _typings as t
from ._qs import Queue

__all__ = ["sync_to_async_gen"]
__all__ = ["ACTX", "sync_to_async_gen", "sync_to_async_gen_noctx"]


def _consumer[**P, Y](
Expand Down Expand Up @@ -50,6 +51,17 @@ def _consumer[**P, Y](


class ACTX[Y]:
"""Context manager to forward exception context to generator in thread.
Not intended for public construction.
"""

__final__ = True

def __init_subclass__(cls) -> t.Never:
msg = "Don't subclass this."
raise RuntimeError(msg)

def __init__(self, g: AsyncGenerator[Y], f: cf.Future[None]) -> None:
self.g = g
self.f = f
Expand All @@ -70,42 +82,11 @@ async def __aexit__(
return False


def sync_to_async_gen[**P, Y](
def _sync_to_async_gen[**P, Y](
f: Callable[P, Generator[Y]],
*args: P.args,
**kwargs: P.kwargs,
) -> ACTX[Y]:
"""Asynchronously iterate over a synchronous generator.
The generator function and it's arguments must be threadsafe and will be
iterated lazily. Generators which perform cpu intensive work while holding
the GIL will likely not see a benefit.
Generators which rely on two-way communication (generators as coroutines)
are not appropriate for this function. similarly, generator return values
are completely swallowed.
If your generator is actually a synchronous coroutine, that's super cool,
but rewrite is as a native coroutine or use it directly then, you don't need
what this function does.
.. note::
Parameters
----------
f:
The synchronous generator function to wrap.
*args:
The positional args to pass to the generator construction.
**kwargs:
The keyword arguments to pass to the generator construction.
Returns
-------
ACTX
This is an async context manager,
that when entered, returns an async iterable.
"""
) -> tuple[AsyncGenerator[Y], cf.Future[None]]:
# TODO: consider shutdownable queue rather than the double event use
# needs to be a seperate queue if so
q: Queue[Y] = Queue(maxsize=1)
Expand Down Expand Up @@ -147,4 +128,83 @@ async def gen() -> AsyncGenerator[Y]:
# ensure errors in the generator propogate *after* the last values yielded
await background_task

return ACTX(gen(), cancel_future)
return gen(), cancel_future


def sync_to_async_gen[**P, Y](
f: Callable[P, Generator[Y]],
*args: P.args,
**kwargs: P.kwargs,
) -> ACTX[Y]:
"""Asynchronously iterate over a synchronous generator.
The generator function and it's arguments must be threadsafe and will be
iterated lazily. Generators which perform cpu intensive work while holding
the GIL will likely not see a benefit.
Generators which rely on two-way communication (generators as coroutines)
are not appropriate for this function. similarly, generator return values
are completely swallowed.
If your generator is actually a synchronous coroutine, that's super cool,
but rewrite is as a native coroutine or use it directly then, you don't need
what this function does.
.. note::
Parameters
----------
f:
The synchronous generator function to wrap.
*args:
The positional args to pass to the generator construction.
**kwargs:
The keyword arguments to pass to the generator construction.
Returns
-------
ACTX
This is an async context manager,
that when entered, returns an async iterable.
"""
return ACTX(*_sync_to_async_gen(f, *args, **kwargs))


def sync_to_async_gen_noctx[**P, Y](
f: Callable[P, Generator[Y]],
*args: P.args,
**kwargs: P.kwargs,
) -> AsyncGenerator[Y]:
"""Asynchronously iterate over a synchronous generator.
The generator function and it's arguments must be threadsafe and will be
iterated lazily. Generators which perform cpu intensive work while holding
the GIL will likely not see a benefit.
Generators which rely on two-way communication (generators as coroutines)
are not appropriate for this function. similarly, generator return values
are completely swallowed.
If your generator is actually a synchronous coroutine, that's super cool,
but rewrite is as a native coroutine or use it directly then, you don't need
what this function does.
This version does not forward exception context and is not a context manager.
.. note::
Parameters
----------
f:
The synchronous generator function to wrap.
*args:
The positional args to pass to the generator construction.
**kwargs:
The keyword arguments to pass to the generator construction.
Returns
-------
AsyncGenerator[Y]
"""
gen, _fut = _sync_to_async_gen(f, *args, **kwargs)
return gen

0 comments on commit 36c3314

Please sign in to comment.