Skip to content

Commit

Permalink
feat: add SetupError in incorrect RabbitQueue params case
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Dec 23, 2024
1 parent 0f5a406 commit 03b6bb1
Showing 1 changed file with 100 additions and 79 deletions.
179 changes: 100 additions & 79 deletions faststream/rabbit/schemas/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from typing import TYPE_CHECKING, Literal, Optional, TypedDict, Union, overload

from faststream.broker.schemas import NameRequired
from faststream.exceptions import SetupError
from faststream.types import EMPTY
from faststream.utils.path import compile_path

if TYPE_CHECKING:
Expand All @@ -12,80 +14,14 @@


class QueueType(str, Enum):
CLASSIC = "CLASSIC"
QUORUM = "QUORUM"
STREAM = "STREAM"


CommonQueueArgs = TypedDict(
"CommonQueueArgs",
{
"x-queue-leader-locator": Literal["client-local", "balanced"],
"x-max-length-bytes": int,
},
total=False,
)

SharedQueueClassicAndQuorumArgs = TypedDict(
"SharedQueueClassicAndQuorumArgs",
{
"x-expires": int,
"x-message-ttl": int,
"x-single-active-consumer": bool,
"x-dead-letter-exchange": str,
"x-dead-letter-routing-key": str,
"x-max-length": int,
"x-max-priority": int,
},
total=False,
)


QueueClassicTypeSpecificArgs = TypedDict(
"QueueClassicTypeSpecificArgs",
{"x-overflow": Literal["drop-head", "reject-publish", "reject-publish-dlx"]},
total=False,
)

QueueQuorumTypeSpecificArgs = TypedDict(
"QueueQuorumTypeSpecificArgs",
{
"x-overflow": Literal["drop-head", "reject-publish"],
"x-delivery-limit": int,
"x-quorum-initial-group-size": int,
"x-quorum-target-group-size": int,
"x-dead-letter-strategy": Literal["at-most-once", "at-least-once"],
},
total=False,
)


QueueStreamTypeSpecificArgs = TypedDict(
"QueueStreamTypeSpecificArgs",
{
"x-max-age": str,
"x-stream-max-segment-size-bytes": int,
"x-stream-filter-size-bytes": int,
"x-initial-cluster-size": int,
},
total=False,
)


class StreamQueueArgs(CommonQueueArgs, QueueStreamTypeSpecificArgs):
pass


class ClassicQueueArgs(
CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueClassicTypeSpecificArgs
):
pass
"""Queue types for RabbitMQ.
Enum should be lowercase to match RabbitMQ API.
"""

class QuorumQueueArgs(
CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueQuorumTypeSpecificArgs
):
pass
CLASSIC = "classic"
QUORUM = "quorum"
STREAM = "stream"


class RabbitQueue(NameRequired):
Expand Down Expand Up @@ -130,11 +66,11 @@ def __init__(
self,
name: str,
queue_type: Literal[QueueType.CLASSIC] = QueueType.CLASSIC,
durable: bool = False,
durable: bool = EMPTY,
exclusive: bool = False,
passive: bool = False,
auto_delete: bool = False,
arguments: Optional[ClassicQueueArgs] = None,
arguments: Optional["ClassicQueueArgs"] = None,
timeout: "TimeoutType" = None,
robust: bool = True,
bind_arguments: Optional["AnyDict"] = None,
Expand All @@ -150,7 +86,7 @@ def __init__(
exclusive: bool = False,
passive: bool = False,
auto_delete: bool = False,
arguments: Optional[QuorumQueueArgs] = None,
arguments: Optional["QuorumQueueArgs"] = None,
timeout: "TimeoutType" = None,
robust: bool = True,
bind_arguments: Optional["AnyDict"] = None,
Expand All @@ -166,7 +102,7 @@ def __init__(
exclusive: bool = False,
passive: bool = False,
auto_delete: bool = False,
arguments: Optional[StreamQueueArgs] = None,
arguments: Optional["StreamQueueArgs"] = None,
timeout: "TimeoutType" = None,
robust: bool = True,
bind_arguments: Optional["AnyDict"] = None,
Expand All @@ -177,11 +113,17 @@ def __init__(
self,
name: str,
queue_type: QueueType = QueueType.CLASSIC,
durable: bool = False,
durable: bool = EMPTY,
exclusive: bool = False,
passive: bool = False,
auto_delete: bool = False,
arguments: Union[QuorumQueueArgs, ClassicQueueArgs, StreamQueueArgs, "AnyDict", None] = None,
arguments: Union[
"QuorumQueueArgs",
"ClassicQueueArgs",
"StreamQueueArgs",
"AnyDict",
None,
] = None,
timeout: "TimeoutType" = None,
robust: bool = True,
bind_arguments: Optional["AnyDict"] = None,
Expand All @@ -208,6 +150,14 @@ def __init__(
patch_regex=lambda x: x.replace(r"\#", ".+"),
)

if queue_type is QueueType.QUORUM or queue_type is QueueType.STREAM:
if durable is EMPTY:
durable = True
else:
raise SetupError("Quorum and Stream queues must be durable")
elif durable is EMPTY:
durable = False

super().__init__(name)

self.path_regex = re
Expand All @@ -218,7 +168,7 @@ def __init__(
self.robust = robust
self.passive = passive
self.auto_delete = auto_delete
self.arguments = {"x-queue-type": queue_type.value, **arguments}
self.arguments = {"x-queue-type": queue_type.value, **(arguments or {})}
self.timeout = timeout

def add_prefix(self, prefix: str) -> "RabbitQueue":
Expand All @@ -230,3 +180,74 @@ def add_prefix(self, prefix: str) -> "RabbitQueue":
new_q.routing_key = "".join((prefix, new_q.routing_key))

return new_q


CommonQueueArgs = TypedDict(
"CommonQueueArgs",
{
"x-queue-leader-locator": Literal["client-local", "balanced"],
"x-max-length-bytes": int,
},
total=False,
)

SharedQueueClassicAndQuorumArgs = TypedDict(
"SharedQueueClassicAndQuorumArgs",
{
"x-expires": int,
"x-message-ttl": int,
"x-single-active-consumer": bool,
"x-dead-letter-exchange": str,
"x-dead-letter-routing-key": str,
"x-max-length": int,
"x-max-priority": int,
},
total=False,
)


QueueClassicTypeSpecificArgs = TypedDict(
"QueueClassicTypeSpecificArgs",
{"x-overflow": Literal["drop-head", "reject-publish", "reject-publish-dlx"]},
total=False,
)

QueueQuorumTypeSpecificArgs = TypedDict(
"QueueQuorumTypeSpecificArgs",
{
"x-overflow": Literal["drop-head", "reject-publish"],
"x-delivery-limit": int,
"x-quorum-initial-group-size": int,
"x-quorum-target-group-size": int,
"x-dead-letter-strategy": Literal["at-most-once", "at-least-once"],
},
total=False,
)


QueueStreamTypeSpecificArgs = TypedDict(
"QueueStreamTypeSpecificArgs",
{
"x-max-age": str,
"x-stream-max-segment-size-bytes": int,
"x-stream-filter-size-bytes": int,
"x-initial-cluster-size": int,
},
total=False,
)


class StreamQueueArgs(CommonQueueArgs, QueueStreamTypeSpecificArgs):
pass


class ClassicQueueArgs(
CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueClassicTypeSpecificArgs
):
pass


class QuorumQueueArgs(
CommonQueueArgs, SharedQueueClassicAndQuorumArgs, QueueQuorumTypeSpecificArgs
):
pass

0 comments on commit 03b6bb1

Please sign in to comment.