diff --git a/faststream/rabbit/schemas/queue.py b/faststream/rabbit/schemas/queue.py index 5c22c70fec..04aa3d6490 100644 --- a/faststream/rabbit/schemas/queue.py +++ b/faststream/rabbit/schemas/queue.py @@ -3,6 +3,8 @@ from typing import TYPE_CHECKING, Literal, Optional, TypedDict, Union, overload from faststream.broker.schemas import NameRequired +from faststream.exceptions import SetupError +from faststream.types import EMPTY from faststream.utils.path import compile_path if TYPE_CHECKING: @@ -12,80 +14,14 @@ class QueueType(str, Enum): - CLASSIC = "CLASSIC" - QUORUM = "QUORUM" - STREAM = "STREAM" - - -CommonQueueArgs = TypedDict( - "CommonQueueArgs", - { - "x-queue-leader-locator": Literal["client-local", "balanced"], - "x-max-length-bytes": int, - }, - total=False, -) - -SharedQueueClassicAndQuorumArgs = TypedDict( - "SharedQueueClassicAndQuorumArgs", - { - "x-expires": int, - "x-message-ttl": int, - "x-single-active-consumer": bool, - "x-dead-letter-exchange": str, - "x-dead-letter-routing-key": str, - "x-max-length": int, - "x-max-priority": int, - }, - total=False, -) - - -QueueClassicTypeSpecificArgs = TypedDict( - "QueueClassicTypeSpecificArgs", - {"x-overflow": Literal["drop-head", "reject-publish", "reject-publish-dlx"]}, - total=False, -) - -QueueQuorumTypeSpecificArgs = TypedDict( - "QueueQuorumTypeSpecificArgs", - { - "x-overflow": Literal["drop-head", "reject-publish"], - "x-delivery-limit": int, - "x-quorum-initial-group-size": int, - "x-quorum-target-group-size": int, - "x-dead-letter-strategy": Literal["at-most-once", "at-least-once"], - }, - total=False, -) - - -QueueStreamTypeSpecificArgs = TypedDict( - "QueueStreamTypeSpecificArgs", - { - "x-max-age": str, - "x-stream-max-segment-size-bytes": int, - "x-stream-filter-size-bytes": int, - "x-initial-cluster-size": int, - }, - total=False, -) - - -class StreamQueueArgs(CommonQueueArgs, QueueStreamTypeSpecificArgs): - pass - - -class ClassicQueueArgs( - CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueClassicTypeSpecificArgs -): - pass + """Queue types for RabbitMQ. + Enum should be lowercase to match RabbitMQ API. + """ -class QuorumQueueArgs( - CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueQuorumTypeSpecificArgs -): - pass + CLASSIC = "classic" + QUORUM = "quorum" + STREAM = "stream" class RabbitQueue(NameRequired): @@ -130,11 +66,11 @@ def __init__( self, name: str, queue_type: Literal[QueueType.CLASSIC] = QueueType.CLASSIC, - durable: bool = False, + durable: bool = EMPTY, exclusive: bool = False, passive: bool = False, auto_delete: bool = False, - arguments: Optional[ClassicQueueArgs] = None, + arguments: Optional["ClassicQueueArgs"] = None, timeout: "TimeoutType" = None, robust: bool = True, bind_arguments: Optional["AnyDict"] = None, @@ -150,7 +86,7 @@ def __init__( exclusive: bool = False, passive: bool = False, auto_delete: bool = False, - arguments: Optional[QuorumQueueArgs] = None, + arguments: Optional["QuorumQueueArgs"] = None, timeout: "TimeoutType" = None, robust: bool = True, bind_arguments: Optional["AnyDict"] = None, @@ -166,7 +102,7 @@ def __init__( exclusive: bool = False, passive: bool = False, auto_delete: bool = False, - arguments: Optional[StreamQueueArgs] = None, + arguments: Optional["StreamQueueArgs"] = None, timeout: "TimeoutType" = None, robust: bool = True, bind_arguments: Optional["AnyDict"] = None, @@ -177,11 +113,17 @@ def __init__( self, name: str, queue_type: QueueType = QueueType.CLASSIC, - durable: bool = False, + durable: bool = EMPTY, exclusive: bool = False, passive: bool = False, auto_delete: bool = False, - arguments: Union[QuorumQueueArgs, ClassicQueueArgs, StreamQueueArgs, "AnyDict", None] = None, + arguments: Union[ + "QuorumQueueArgs", + "ClassicQueueArgs", + "StreamQueueArgs", + "AnyDict", + None, + ] = None, timeout: "TimeoutType" = None, robust: bool = True, bind_arguments: Optional["AnyDict"] = None, @@ -208,6 +150,14 @@ def __init__( patch_regex=lambda x: x.replace(r"\#", ".+"), ) + if queue_type is QueueType.QUORUM or queue_type is QueueType.STREAM: + if durable is EMPTY: + durable = True + else: + raise SetupError("Quorum and Stream queues must be durable") + elif durable is EMPTY: + durable = False + super().__init__(name) self.path_regex = re @@ -218,7 +168,7 @@ def __init__( self.robust = robust self.passive = passive self.auto_delete = auto_delete - self.arguments = {"x-queue-type": queue_type.value, **arguments} + self.arguments = {"x-queue-type": queue_type.value, **(arguments or {})} self.timeout = timeout def add_prefix(self, prefix: str) -> "RabbitQueue": @@ -230,3 +180,74 @@ def add_prefix(self, prefix: str) -> "RabbitQueue": new_q.routing_key = "".join((prefix, new_q.routing_key)) return new_q + + +CommonQueueArgs = TypedDict( + "CommonQueueArgs", + { + "x-queue-leader-locator": Literal["client-local", "balanced"], + "x-max-length-bytes": int, + }, + total=False, +) + +SharedQueueClassicAndQuorumArgs = TypedDict( + "SharedQueueClassicAndQuorumArgs", + { + "x-expires": int, + "x-message-ttl": int, + "x-single-active-consumer": bool, + "x-dead-letter-exchange": str, + "x-dead-letter-routing-key": str, + "x-max-length": int, + "x-max-priority": int, + }, + total=False, +) + + +QueueClassicTypeSpecificArgs = TypedDict( + "QueueClassicTypeSpecificArgs", + {"x-overflow": Literal["drop-head", "reject-publish", "reject-publish-dlx"]}, + total=False, +) + +QueueQuorumTypeSpecificArgs = TypedDict( + "QueueQuorumTypeSpecificArgs", + { + "x-overflow": Literal["drop-head", "reject-publish"], + "x-delivery-limit": int, + "x-quorum-initial-group-size": int, + "x-quorum-target-group-size": int, + "x-dead-letter-strategy": Literal["at-most-once", "at-least-once"], + }, + total=False, +) + + +QueueStreamTypeSpecificArgs = TypedDict( + "QueueStreamTypeSpecificArgs", + { + "x-max-age": str, + "x-stream-max-segment-size-bytes": int, + "x-stream-filter-size-bytes": int, + "x-initial-cluster-size": int, + }, + total=False, +) + + +class StreamQueueArgs(CommonQueueArgs, QueueStreamTypeSpecificArgs): + pass + + +class ClassicQueueArgs( + CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueClassicTypeSpecificArgs +): + pass + + +class QuorumQueueArgs( + CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueQuorumTypeSpecificArgs +): + pass