Skip to content

Commit

Permalink
chore: merge main (#2045)
Browse files Browse the repository at this point in the history
* chore: merge main

* tests: new Listener tests

* chore: refactor CincurrentBetwwenPartitionsSubscriber

* chore: refactor CincurrentBetwwenPartitionsSubscriber

* fix: correct ConcurrentManualAckSubscriber behavior

* tests: increase timeout in kafka concurrent tests

* refactor: change ListenerProxy logic

* tests: increase workers number for kafka
  • Loading branch information
Lancetnik authored Jan 19, 2025
1 parent 57c0633 commit 2e9929e
Show file tree
Hide file tree
Showing 35 changed files with 732 additions and 428 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ venv*
htmlcov
token
.DS_Store
*.egg-info

docs/site/
docs/site_build/
4 changes: 2 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
"filename": "docs/docs/en/release.md",
"hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450",
"is_verified": false,
"line_number": 1934,
"line_number": 1958,
"is_secret": false
}
],
Expand All @@ -178,5 +178,5 @@
}
]
},
"generated_at": "2024-12-05T15:39:13Z"
"generated_at": "2025-01-10T14:53:01Z"
}

This file was deleted.

11 changes: 0 additions & 11 deletions docs/docs/en/api/faststream/rabbit/QueueType.md

This file was deleted.

11 changes: 0 additions & 11 deletions docs/docs/en/api/faststream/rabbit/schemas/QueueType.md

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

11 changes: 0 additions & 11 deletions docs/docs/en/api/faststream/rabbit/schemas/queue/QueueType.md

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

11 changes: 0 additions & 11 deletions docs/docs/en/api/faststream/rabbit/utils/build_virtual_host.md

This file was deleted.

2 changes: 1 addition & 1 deletion docs/docs/en/getting-started/asgi.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ app = AsgiFastStream(

!!! tip
You do not need to setup all routes using the `asgi_routes=[]` parameter.<br/>
You can use the `#!python app.mount("/healh", asgi_endpoint)` method also.
You can use the `#!python app.mount("/health", asgi_endpoint)` method also.

### AsyncAPI Documentation

Expand Down
7 changes: 7 additions & 0 deletions docs/docs/en/kafka/Subscriber/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,10 @@ async def base_handler(
):
...
```


## Concurrent processing

There are two possible modes of concurrent message processing:
- With `auto_commit=False` and `max_workers` > 1, a handler processes all messages concurrently in a at-most-once semantic.
- With `auto_commit=True` and `max_workers` > 1, processing is concurrent between topic partitions and sequential within a partition to ensure reliable at-least-once processing. Maximum concurrency is achieved when total number of workers across all application instances running workers in the same consumer group is equal to the number of partitions in the topic. Increasing worker count beyond that will result in idle workers as not more than one consumer from a consumer group can be consuming from the same partition.
24 changes: 24 additions & 0 deletions docs/docs/en/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,30 @@ hide:
---

# Release Notes
## 0.5.34

### What's Changed

* fix: when / present in virtual host name and passing as uri by [@pepellsd](https://github.com/pepellsd){.external-link target="_blank"} in [#1979](https://github.com/airtai/faststream/pull/1979){.external-link target="_blank"}
* fix (#2013): allow to create publisher in already connected broker by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#2024](https://github.com/airtai/faststream/pull/2024){.external-link target="_blank"}
* feat: add BatchBufferOverflowException by [@spataphore1337](https://github.com/spataphore1337){.external-link target="_blank"} in [#1990](https://github.com/airtai/faststream/pull/1990){.external-link target="_blank"}
* feat: add static instrumentation info by [@draincoder](https://github.com/draincoder){.external-link target="_blank"} in [#1996](https://github.com/airtai/faststream/pull/1996){.external-link target="_blank"}
* docs: remove reference of "faststream.access" by [@rishabhc32](https://github.com/rishabhc32){.external-link target="_blank"} in [#1995](https://github.com/airtai/faststream/pull/1995){.external-link target="_blank"}
* docs: fixed typo in publishing/test.md by [@AlexPetul](https://github.com/AlexPetul){.external-link target="_blank"} in [#2009](https://github.com/airtai/faststream/pull/2009){.external-link target="_blank"}
* docs: ability to declare queue/exchange binding by [@MagicAbdel](https://github.com/MagicAbdel){.external-link target="_blank"} in [#2011](https://github.com/airtai/faststream/pull/2011){.external-link target="_blank"}
* docs: fix spelling mistake of `/health` by [@herotomg](https://github.com/herotomg){.external-link target="_blank"} in [#2023](https://github.com/airtai/faststream/pull/2023){.external-link target="_blank"}
* docs: update aio-pika external docs URL as it has been moved by [@HybridBit](https://github.com/HybridBit){.external-link target="_blank"} in [#1984](https://github.com/airtai/faststream/pull/1984){.external-link target="_blank"}
* refactor: add type annotations for RabbitQueue and enum for queue type by [@pepellsd](https://github.com/pepellsd){.external-link target="_blank"} in [#2002](https://github.com/airtai/faststream/pull/2002){.external-link target="_blank"}

### New Contributors
* [@HybridBit](https://github.com/HybridBit){.external-link target="_blank"} made their first contribution in [#1984](https://github.com/airtai/faststream/pull/1984){.external-link target="_blank"}
* [@rishabhc32](https://github.com/rishabhc32){.external-link target="_blank"} made their first contribution in [#1995](https://github.com/airtai/faststream/pull/1995){.external-link target="_blank"}
* [@AlexPetul](https://github.com/AlexPetul){.external-link target="_blank"} made their first contribution in [#2009](https://github.com/airtai/faststream/pull/2009){.external-link target="_blank"}
* [@MagicAbdel](https://github.com/MagicAbdel){.external-link target="_blank"} made their first contribution in [#2011](https://github.com/airtai/faststream/pull/2011){.external-link target="_blank"}
* [@herotomg](https://github.com/herotomg){.external-link target="_blank"} made their first contribution in [#2023](https://github.com/airtai/faststream/pull/2023){.external-link target="_blank"}

**Full Changelog**: [#0.5.33...0.5.34](https://github.com/airtai/faststream/compare/0.5.33...0.5.34){.external-link target="_blank"}

## 0.5.33

### What's Changed
Expand Down
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.33"
__version__ = "0.5.34"

SERVICE_NAME = f"faststream-{__version__}"
41 changes: 21 additions & 20 deletions faststream/_internal/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Annotated,
Any,
Callable,
NamedTuple,
Optional,
Union,
)
Expand Down Expand Up @@ -54,26 +55,11 @@
from faststream.response import Response


class _CallOptions:
__slots__ = (
"decoder",
"dependencies",
"middlewares",
"parser",
)

def __init__(
self,
*,
parser: Optional["CustomCallable"],
decoder: Optional["CustomCallable"],
middlewares: Sequence["SubscriberMiddleware[Any]"],
dependencies: Iterable["Dependant"],
) -> None:
self.parser = parser
self.decoder = decoder
self.middlewares = middlewares
self.dependencies = dependencies
class _CallOptions(NamedTuple):
parser: Optional["CustomCallable"]
decoder: Optional["CustomCallable"]
middlewares: Sequence["SubscriberMiddleware[Any]"]
dependencies: Iterable["Dependant"]


class SubscriberUsecase(SubscriberProto[MsgType]):
Expand Down Expand Up @@ -445,3 +431,18 @@ def get_log_context(
return {
"message_id": getattr(message, "message_id", ""),
}

def _log(
self,
log_level: int,
message: str,
extra: Optional["AnyDict"] = None,
exc_info: Optional[Exception] = None,
) -> None:
state = self._state.get()
state.logger_state.logger.log(
log_level,
message,
extra=extra,
exc_info=exc_info,
)
31 changes: 24 additions & 7 deletions faststream/kafka/broker/registrator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Iterable, Sequence
from collections.abc import Collection, Iterable, Sequence
from typing import (
TYPE_CHECKING,
Annotated,
Expand Down Expand Up @@ -39,6 +39,7 @@
)
from faststream.kafka.subscriber.specified import (
SpecificationBatchSubscriber,
SpecificationConcurrentBetweenPartitionsSubscriber,
SpecificationConcurrentDefaultSubscriber,
SpecificationDefaultSubscriber,
)
Expand All @@ -59,6 +60,7 @@ class KafkaRegistrator(
"SpecificationBatchSubscriber",
"SpecificationDefaultSubscriber",
"SpecificationConcurrentDefaultSubscriber",
"SpecificationConcurrentBetweenPartitionsSubscriber",
]
]
_publishers: list[
Expand Down Expand Up @@ -384,7 +386,7 @@ def subscriber(
),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Collection["TopicPartition"],
Doc(
"""
An explicit partitions list to assign.
Expand Down Expand Up @@ -765,7 +767,7 @@ def subscriber(
),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Collection["TopicPartition"],
Doc(
"""
An explicit partitions list to assign.
Expand Down Expand Up @@ -1146,7 +1148,7 @@ def subscriber(
),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Collection["TopicPartition"],
Doc(
"""
An explicit partitions list to assign.
Expand Down Expand Up @@ -1530,7 +1532,7 @@ def subscriber(
),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Collection["TopicPartition"],
Doc(
"""
An explicit partitions list to assign.
Expand Down Expand Up @@ -1561,7 +1563,14 @@ def subscriber(
] = (),
max_workers: Annotated[
int,
Doc("Number of workers to process messages concurrently."),
Doc(
"Maximum number of messages being processed concurrently. With "
"`auto_commit=False` processing is concurrent between partitions and "
"sequential within a partition. With `auto_commit=False` maximum "
"concurrency is achieved when total number of workers across all "
"application instances running workers in the same consumer group "
"is equal to the number of partitions in the topic."
),
] = 1,
no_ack: Annotated[
bool,
Expand Down Expand Up @@ -1598,6 +1607,7 @@ def subscriber(
"SpecificationDefaultSubscriber",
"SpecificationBatchSubscriber",
"SpecificationConcurrentDefaultSubscriber",
"SpecificationConcurrentBetweenPartitionsSubscriber",
]:
sub = create_subscriber(
*topics,
Expand Down Expand Up @@ -1648,7 +1658,14 @@ def subscriber(
if batch:
subscriber = cast("SpecificationBatchSubscriber", subscriber)
elif max_workers > 1:
subscriber = cast("SpecificationConcurrentDefaultSubscriber", subscriber)
if auto_commit:
subscriber = cast(
"SpecificationConcurrentDefaultSubscriber", subscriber
)
else:
subscriber = cast(
"SpecificationConcurrentBetweenPartitionsSubscriber", subscriber
)
else:
subscriber = cast("SpecificationDefaultSubscriber", subscriber)

Expand Down
Loading

0 comments on commit 2e9929e

Please sign in to comment.