Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: nested FastAPI routers duplicate subscribers' middlewares #1742

Open
ALittleMoron opened this issue Aug 30, 2024 · 4 comments · May be fixed by #1779
Open

Bug: nested FastAPI routers duplicate subscribers' middlewares #1742

ALittleMoron opened this issue Aug 30, 2024 · 4 comments · May be fixed by #1779
Labels
bug Something isn't working
Milestone

Comments

@ALittleMoron
Copy link

ALittleMoron commented Aug 30, 2024

Describe the bug
According to your documentation https://faststream.airt.ai/latest/getting-started/integrations/fastapi/#routers-nesting, we wanted to create fastapi-like routers structure to separate subscribers for different topics:

├── kafka
├── __init__.py
├── topic1
│  ├── __init__.py
│  └── subscribers.py
├── topic2
│  ├── __init__.py
│  └── publishers.py
├── topic3
│  ├── __init__.py
│  └── subscribers.py
└── core_router.py

But we faced with problem, that nested routers duplicates consumer instances, which cause multiple reading and processing one message from topic:

test-faststream-py3.11➜  test-faststream uvicorn main:app
INFO:     Started server process [53140]
INFO:     Waiting for application startup.
2024-08-30 11:48:03,740 INFO     - test |            - `Hello` waiting for messages
2024-08-30 11:48:03,749 INFO     - test |            - `Hello` waiting for messages
2024-08-30 11:48:03,749 INFO     - test |            - `Hello` waiting for messages

and cause aiokafka error on shutdown:

Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7b93128ff590>

How to reproduce
Include source code:

from fastapi import FastAPI
from faststream.kafka.fastapi import KafkaRouter, Logger
from pydantic import BaseModel

kafka_router = KafkaRouter("localhost:9092")


class Incoming(BaseModel):
    m: dict


inner_kafka = KafkaRouter()


@inner_kafka.subscriber("test")
@inner_kafka.publisher("response")
async def hello(m: Incoming, logger: Logger):
    logger.info(m)
    return {"response": "Hello, Kafka!"}


app = FastAPI()
kafka_router.include_router(inner_kafka)
app.include_router(kafka_router)

And/Or steps to reproduce the behavior:

  1. Start application
  2. Create message for topic "test"
  3. See logs

Expected behavior

test-faststream-py3.11➜  test-faststream uvicorn main:app
INFO:     Started server process [56711]
INFO:     Waiting for application startup.
2024-08-30 12:07:48,342 INFO     - test |            - `Hello` waiting for messages
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
2024-08-30 12:07:55,098 INFO     - test | 0-17250088 - Received
2024-08-30 12:07:55,101 INFO     - test | 0-17250088 - m={'abc': 25}
2024-08-30 12:07:55,101 INFO     - test | 0-17250088 - Processed

Observed behavior

test-faststream-py3.11➜  test-faststream uvicorn main:app
INFO:     Started server process [56100]
INFO:     Waiting for application startup.
2024-08-30 12:06:14,196 INFO     - test |            - `Hello` waiting for messages
2024-08-30 12:06:14,204 INFO     - test |            - `Hello` waiting for messages
2024-08-30 12:06:14,204 INFO     - test |            - `Hello` waiting for messages
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x778c46fbf650>
2024-08-30 12:06:16,150 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,150 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,150 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,150 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,153 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,153 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,153 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,153 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,154 INFO     - test | 0-17250087 - m={'abc': 25}
2024-08-30 12:06:16,154 INFO     - test | 0-17250087 - m={'abc': 25}
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - m={'abc': 25}
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - m={'abc': 25}
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed

Environment
Running FastStream 0.5.19 with CPython 3.12.5 on Linux

[tool.poetry.dependencies]
python = "^3.11"
faststream = {extras = ["kafka"], version = "^0.5.19"}
fastapi = "^0.112.2"
uvicorn = "^0.30.6"
@ALittleMoron ALittleMoron added the bug Something isn't working label Aug 30, 2024
@Lancetnik
Copy link
Member

Hi! Thank you for the report
The problem is related to latest FastAPI 0.112.2 release
I fixes it already in latest commit, please wait for 0.5.20 FastStream release and check it again (I plan to release it this evening)

@Lancetnik
Copy link
Member

The problem was fixed in 0.5.20 (now nested routers doesn't run inner broker), but the problem with Middlewares duplication still exists.

I schedule solution on 0.6.0

Since 0.5.21 release, please use regular BrokerRouter as nested for FastAPI integration one

@Lancetnik Lancetnik moved this to Backlog in FastStream Aug 30, 2024
@Lancetnik Lancetnik added this to the 0.6.0 milestone Aug 30, 2024
@Lancetnik Lancetnik changed the title Bug: nested routers duplicate consumers Bug: nested FastAPI routers duplicate subscribers' middlewares Sep 1, 2024
@Lancetnik Lancetnik linked a pull request Sep 10, 2024 that will close this issue
@and-sm
Copy link

and-sm commented Dec 16, 2024

The problem was fixed in 0.5.20 (now nested routers doesn't run inner broker), but the problem with Middlewares duplication still exists.

I schedule solution on 0.6.0

Since 0.5.21 release, please use regular BrokerRouter as nested for FastAPI integration one

Could you please provide a more detailed explanation of the solution? How can BrokerRouter be used instead of NatsRouter? (fastapi)

@Lancetnik
Copy link
Member

How can BrokerRouter be used instead of NatsRouter?

https://faststream.airt.ai/latest/getting-started/integrations/fastapi/#multiple-routers

@Lancetnik Lancetnik moved this from Backlog to Waiting for merge in FastStream Jan 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: Waiting for merge
Development

Successfully merging a pull request may close this issue.

3 participants