Skip to content

Commit

Permalink
Stylistic removal of certain line continuations
Browse files Browse the repository at this point in the history
  Improves at-a-glance readability
  ^.+\n +\)$
  • Loading branch information
mikeshardmind committed Feb 5, 2025
1 parent 47b77cd commit aa4fde9
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 75 deletions.
12 changes: 4 additions & 8 deletions _misc/_ensure_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import ast
import inspect
import sys
from functools import partial
from types import FunctionType
from typing import Any

_cycle_blocked = False

rec = partial(compile, filename="<string>", mode="exec", flags=0, dont_inherit=True)


#: PYUPDATE: py3.14, annotationslib based check as well.
#: TODO: This runs into issues with indentation, dedent source?
Expand All @@ -38,14 +41,7 @@ def ensure_annotations[T: type | FunctionType](f: T) -> T:
if _cycle_blocked:
return f

new_ast = compile(
ast.parse(inspect.getsource(f)),
"<string>",
"exec",
flags=0,
dont_inherit=True,
optimize=1,
)
new_ast = rec(ast.parse(inspect.getsource(f)), optimize=1)

env = sys.modules[f.__module__].__dict__

Expand Down
42 changes: 18 additions & 24 deletions src/async_utils/_qs.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,12 @@ def set(self, /) -> bool:


class _BaseQueue[T]:
__slots__ = (
"__get_waiters",
"__put_waiters",
"__unlocked",
"__waiters",
"maxsize",
)
__slots__ = ("__get_ws", "__put_ws", "__unlocked", "__unlocked", "__ws", "maxsize")

def __init__(self, /, maxsize: int | None = None) -> None:
self.__waiters: deque[AsyncEvent | ThreadingEvent] = deque()
self.__get_waiters: deque[AsyncEvent | ThreadingEvent] = deque()
self.__put_waiters: deque[AsyncEvent | ThreadingEvent] = deque()
self.__ws: deque[AsyncEvent | ThreadingEvent] = deque()
self.__get_ws: deque[AsyncEvent | ThreadingEvent] = deque()
self.__put_ws: deque[AsyncEvent | ThreadingEvent] = deque()
self.__unlocked: list[bool] = [True]
self.maxsize: int = maxsize if maxsize is not None else 0

Expand Down Expand Up @@ -285,11 +279,11 @@ def _release(self, /) -> None:
size = self._qsize()

if not size:
actual_waiters = self.__put_waiters
actual_waiters = self.__put_ws
elif size >= maxsize > 0:
actual_waiters = self.__get_waiters
actual_waiters = self.__get_ws
else:
actual_waiters = self.__waiters
actual_waiters = self.__ws

while actual_waiters:
try:
Expand All @@ -308,8 +302,8 @@ def _release(self, /) -> None:
break

async def async_put(self, item: T, /) -> None:
waiters = self.__waiters
put_waiters = self.__put_waiters
waiters = self.__ws
put_waiters = self.__put_ws
success = self._acquire_nowait_put()

try:
Expand Down Expand Up @@ -345,8 +339,8 @@ async def async_put(self, item: T, /) -> None:
def sync_put(
self, item: T, /, *, blocking: bool = True, timeout: float | None = None
) -> None:
waiters = self.__waiters
put_waiters = self.__put_waiters
waiters = self.__ws
put_waiters = self.__put_ws
success = self._acquire_nowait_put()

try:
Expand Down Expand Up @@ -383,8 +377,8 @@ def sync_put(
self._release()

async def async_get(self, /) -> T:
waiters = self.__waiters
get_waiters = self.__get_waiters
waiters = self.__ws
get_waiters = self.__get_ws
success = self._acquire_nowait_get()

try:
Expand Down Expand Up @@ -420,8 +414,8 @@ async def async_get(self, /) -> T:
return item

def sync_get(self, /, *, blocking: bool = True, timeout: float | None = None) -> T:
waiters = self.__waiters
get_waiters = self.__get_waiters
waiters = self.__ws
get_waiters = self.__get_ws

success = self._acquire_nowait_get()

Expand Down Expand Up @@ -474,15 +468,15 @@ def _get(self, /) -> T:

@property
def waiting(self, /) -> int:
return len(self.__waiters)
return len(self.__ws)

@property
def putting(self, /) -> int:
return len(self.__put_waiters)
return len(self.__put_ws)

@property
def getting(self, /) -> int:
return len(self.__get_waiters)
return len(self.__get_ws)


class Queue[T](_BaseQueue[T]):
Expand Down
6 changes: 3 additions & 3 deletions src/async_utils/_typings.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
"""Shim for typing- and annotation-related symbols to avoid runtime dependencies on `typing` or `typing-extensions`.
A warning for annotation-related symbols: Do not directly import them from this module
(e.g. `from ._typing_compat import Any`)! Doing so will trigger the module-level `__getattr__`, causing `typing` to
(e.g. `from ._typings import Any`)! Doing so will trigger the module-level `__getattr__`, causing `typing` to
get imported. Instead, import the module and use symbols via attribute access as needed
(e.g. `from . import _typing_compat [as _t]`). To avoid those symbols being evaluated at runtime, which would also cause
(e.g. `from . import _typings [as t]`). To avoid those symbols being evaluated at runtime, which would also cause
`typing` to get imported, make sure to put `from __future__ import annotations` at the top of the module.
"""
# comment above
# taken verbatim from quote from https://github.com/Sachaa-Thanasius in discord
# taken nearly verbatim from quote from https://github.com/Sachaa-Thanasius in discord
# cause I forget otherwise too.

from __future__ import annotations
Expand Down
11 changes: 4 additions & 7 deletions src/async_utils/bg_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ def run_forever(
loop: asyncio.AbstractEventLoop,
/,
*,
use_eager_task_factory: bool = True,
eager: bool = True,
) -> None:
asyncio.set_event_loop(loop)
if use_eager_task_factory:
if eager:
loop.set_task_factory(asyncio.eager_task_factory)
try:
loop.run_forever()
Expand Down Expand Up @@ -152,11 +152,8 @@ def threaded_loop(
thread = None
wrapper = None
try:
thread = threading.Thread(
target=run_forever,
args=(loop,),
kwargs={"use_eager_task_factory": use_eager_task_factory},
)
args, kwargs = (loop,), {"eager": use_eager_task_factory}
thread = threading.Thread(target=run_forever, args=args, kwargs=kwargs)
thread.start()
wrapper = LoopWrapper(loop)
yield wrapper
Expand Down
32 changes: 14 additions & 18 deletions src/async_utils/gen_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import asyncio
import concurrent.futures as cf #: PYUPDATE: py3.14, check cf.Future use
from asyncio import FIRST_COMPLETED as FC
from collections.abc import AsyncGenerator, Callable, Generator
from threading import Event

Expand Down Expand Up @@ -92,45 +93,40 @@ def _sync_to_async_gen[**P, Y](
# TODO: consider shutdownable queue rather than the double event use
# needs to be a seperate queue if so
q: Queue[Y] = Queue(maxsize=1)
laziness_ev = Event() # used to preserve generator laziness
laziness_ev.set()
cancel_future: cf.Future[None] = cf.Future()
lazy_ev = Event() # used to preserve generator laziness
lazy_ev.set()
cancel_fut: cf.Future[None] = cf.Future()

background_coro = asyncio.to_thread(
_consumer, laziness_ev, q, cancel_future, f, *args, **kwargs
)
background_task = asyncio.create_task(background_coro)
bg_coro = asyncio.to_thread(_consumer, lazy_ev, q, cancel_fut, f, *args, **kwargs)
bg_task = asyncio.create_task(bg_coro)

async def gen() -> AsyncGenerator[Y]:
q_get = None
try:
while not background_task.done():
while not bg_task.done():
try:
q_get = asyncio.ensure_future(q.async_get())
done, _pending = await asyncio.wait(
(background_task, q_get),
return_when=asyncio.FIRST_COMPLETED,
)
done, _ = await asyncio.wait((bg_task, q_get), return_when=FC)
if q_get in done:
laziness_ev.clear()
lazy_ev.clear()
yield (await q_get)
laziness_ev.set()
lazy_ev.set()
finally:
if q_get is not None:
q_get.cancel()

finally:
try:
cancel_future.set_result(None)
cancel_fut.set_result(None)
except cf.InvalidStateError:
pass
laziness_ev.set()
lazy_ev.set()
while q:
yield (await q.async_get())
# ensure errors in the generator propogate *after* the last values yielded
await background_task
await bg_task

return gen(), cancel_future
return gen(), cancel_fut


def sync_to_async_gen[**P, Y](
Expand Down
16 changes: 4 additions & 12 deletions src/async_utils/lockout.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,8 @@ class Lockout:

def __repr__(self) -> str:
res = super().__repr__()
extra = (
"unlocked"
if not self._lockouts
else f"locked, timestamps={self._lockouts:!r}"
)
return f"<{res[1:-1]} [{extra}]>"
x = f"locked, timestamps={self._lockouts:!r}" if self._lockouts else "unlocked"
return f"<{res[1:-1]} [{x}]>"

def __init__(self) -> None:
self._lockouts: list[float] = []
Expand Down Expand Up @@ -111,12 +107,8 @@ def __init__(self) -> None:

def __repr__(self) -> str:
res = super().__repr__()
extra = (
"unlocked"
if not self._lockouts
else f"locked, timestamps={self._lockouts:!r}"
)
return f"<{res[1:-1]} [{extra}]>"
x = f"locked, timestamps={self._lockouts:!r}" if self._lockouts else "unlocked"
return f"<{res[1:-1]} [{x}]>"

def lockout_for(self, seconds: float, /) -> None:
"""Lock a resource for an amount of time."""
Expand Down
5 changes: 2 additions & 3 deletions src/async_utils/sig_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ def __init__(self, signals: SignalTuple = default_handled, /) -> None:
self.ss, self.cs = socket.socketpair()
self.ss.setblocking(False)
self.cs.setblocking(False)
self._signals = tuple(
e for name, e in signal.Signals.__members__.items() if name in signals
)
sig_members = signal.Signals.__members__.items()
self._signals = tuple(e for name, e in sig_members if name in signals)

def get_send_socket(self) -> socket.socket:
"""Get the send socket.
Expand Down

0 comments on commit aa4fde9

Please sign in to comment.