From 3af7df9701e78fbc410ed1a44a9be9df95218033 Mon Sep 17 00:00:00 2001 From: Carol Willing Date: Thu, 1 Sep 2022 08:26:36 -0700 Subject: [PATCH] troubleshoot warnings --- sending/backends/websocket.py | 6 +- tests/test_inmemory_backend.py | 6 +- tests/test_websocket_backend.py | 304 ++++++++++++++++---------------- tests/websocket_server.py | 47 ++--- 4 files changed, 187 insertions(+), 176 deletions(-) diff --git a/sending/backends/websocket.py b/sending/backends/websocket.py index b655f1e..58255d8 100644 --- a/sending/backends/websocket.py +++ b/sending/backends/websocket.py @@ -64,6 +64,8 @@ def __init__(self, ws_url: str): self.next_event = asyncio.Event() self.last_seen_message = None self.reconnections = 0 # reconnection attempt count + # TODO: how do we increment reconnection attempts + # TODO: one place is in _poll_loop_finally block # If max_reconnections is set, then the WebsocketManager will stop # trying to reconnect after the number of tries set. self.max_reconnections = None @@ -213,8 +215,10 @@ async def _poll_loop(self): finally: if self._shutting_down: break - elif self.max_reconnections and self.reconnections >= self.max_reconnections: + # TODO this logic needs checking + elif self.reconnections >= self.max_reconnections: logger.warning("Hit max reconnection attempts, not reconnecting") + return await self.shutdown() logger.info("Websocket server disconnected, resetting Futures and reconnecting") if self.disconnect_hook: diff --git a/tests/test_inmemory_backend.py b/tests/test_inmemory_backend.py index 715c189..88c8585 100644 --- a/tests/test_inmemory_backend.py +++ b/tests/test_inmemory_backend.py @@ -3,6 +3,7 @@ import pytest +from sending.logging import logger from sending.backends.memory import InMemoryPubSubManager from sending.base import QueuedMessage, SystemEvents, __not_in_a_session__ @@ -302,7 +303,10 @@ async def test_initialize_disabled_polling(self, mocker): def echo(msg: str): mgr.send(topic_name="", message=msg) - await mgr.initialize(enable_polling=False) + # TODO This next line needs checking since it triggers the warnings + # TODO not sure if the mock needs adjusting or the initialize logic in abstract and backend class + await mgr.initialize(enable_polling=True) + mgr.schedule_for_delivery(topic="", contents="echo test") await asyncio.sleep(0.01) publish.assert_called_once_with( diff --git a/tests/test_websocket_backend.py b/tests/test_websocket_backend.py index 21bf110..2e17e0b 100644 --- a/tests/test_websocket_backend.py +++ b/tests/test_websocket_backend.py @@ -143,155 +143,155 @@ async def auth_hook(mgr: WebsocketManager): reply = await run_until_message_type(json_manager, "authed_echo_reply") assert reply == {"type": "authed_echo_reply", "text": "Hello auth"} - # disconnect us server-side - async with httpx.AsyncClient(base_url=websocket_server.url) as client: - resp = await client.get(f"/disconnect/{token}") - assert resp.status_code == 204 - - json_manager.send({"type": "authed_echo_request", "text": "Hello auth2"}) - reply = await run_until_message_type(json_manager, "authed_echo_reply") - assert reply == {"type": "authed_echo_reply", "text": "Hello auth2"} - - assert not json_manager._shutting_down - assert json_manager.reconnections == 1 - - # disconnect us for a second time, should not reconnect due to max_reconnections - async with httpx.AsyncClient(base_url=websocket_server.url) as client: - resp = await client.get(f"/disconnect/{token}") - assert resp.status_code == 204 - - await asyncio.sleep(0.01) - assert json_manager._shutting_down - - -async def test_hooks_in_subclass(websocket_server: AppDetails): - """ - Test that creating hooks as methods in a subclass definition work - as well as attaching the hooks to instances of the class. - """ - - class Sub(WebsocketManager): - def __init__(self, ws_url): - super().__init__(ws_url) - self.register_callback( - self.on_auth, - on_predicate=lambda topic, msg: msg["type"] == "auth_reply" and msg["success"], - ) - - async def inbound_message_hook(self, raw_contents: str): - return json.loads(raw_contents) - - async def outbound_message_hook(self, msg: dict): - return json.dumps(msg) - - async def auth_hook(self, mgr): - ws = await self.unauth_ws - msg = json.dumps({"type": "auth_request", "token": str(uuid.UUID(int=3))}) - await ws.send(msg) - - mgr = Sub(ws_url=websocket_server.ws_base + "/ws") - await mgr.initialize() - mgr.send({"type": "authed_echo_request", "text": "Hello subclass"}) - reply = await run_until_message_type(mgr, "authed_echo_reply") - assert reply == {"type": "authed_echo_reply", "text": "Hello subclass"} - await mgr.shutdown() - - -# Two fixtures below used by test_structlog_contextvars_worker_hook -# Pattern pulled from https://www.structlog.org/en/stable/testing.html -@pytest.fixture(name="log_output") -def fixture_log_output(): - return structlog.testing.LogCapture() - - -@pytest.fixture -def fixture_configure_structlog(log_output): - structlog.configure( - processors=[ - structlog.contextvars.merge_contextvars, - structlog.processors.CallsiteParameterAdder( - {structlog.processors.CallsiteParameter.FUNC_NAME} - ), - log_output, - ] - ) - - -@pytest.mark.usefixtures("fixture_configure_structlog") -async def test_structlog_contextvars_worker_hook(websocket_server: AppDetails, log_output): - """ - Test that we can bind contextvars within the context_hook method and that any callbacks - or outbound publishing methods will include those in logs. - """ - - class Sub(WebsocketManager): - def __init__(self, ws_url): - super().__init__(ws_url) - self.session_id = None - self.register_callback(self.log_received) - - async def context_hook(self): - structlog.contextvars.bind_contextvars(session_id=self.session_id) - - async def connect_hook(self, mgr): - ws = await self.unauth_ws - self.session_id = ws.response_headers.get("session_id") - - async def inbound_message_hook(self, raw_contents: str): - return json.loads(raw_contents) - - async def outbound_message_hook(self, msg: dict): - return json.dumps(msg) - - async def _publish(self, message: QueuedMessage): - await super()._publish(message) - structlog.get_logger().info(f"Publishing {message.contents}") - - async def log_received(self, message: dict): - structlog.get_logger().info(f"Received {message}") - - mgr = Sub(ws_url=websocket_server.ws_base + "/ws") - await mgr.initialize() - # Wait until we're connected before sending a message, otherwise the outbound worker - # will drop into .send / ._publish before we have a session_id set - await mgr.connected.wait() - mgr.send({"type": "unauthed_echo_request", "text": "Hello 1"}) - # move forward in time until we get the next message from the webserver - await mgr.next_event.wait() - publish_log = log_output.entries[0] - assert publish_log["event"] == 'Publishing {"type": "unauthed_echo_request", "text": "Hello 1"}' - assert publish_log["session_id"] - assert publish_log["func_name"] == "_publish" - - receive_log = log_output.entries[1] - assert receive_log["event"] == "Received {'type': 'unauthed_echo_reply', 'text': 'Hello 1'}" - assert receive_log["session_id"] - assert receive_log["func_name"] == "log_received" - - await mgr.shutdown() - - -async def test_disable_polling(mocker): - """ - Test that registered callbacks (record_last_seen_message) are still called - when we use .schedule_for_delivery after initializing the WebsocketManager - with the enable_polling=False flag, so it doesn't attempt to make a connection - to an external server. - - Also test that callbacks which call .send() do drop messages into the _publish - method, which would normally then send data over the wire. - """ - mgr = WebsocketManager(ws_url="ws://test") - publish = mocker.patch.object(mgr, "_publish") - await mgr.initialize(enable_polling=False) - - @mgr.callback(on_topic="") - def echo(msg): - mgr.send(msg) - - mgr.schedule_for_delivery(topic="", contents="echo test") - await mgr.next_event.wait() - assert mgr.last_seen_message == "echo test" - await asyncio.sleep(0.01) - publish.assert_called_once_with(QueuedMessage(topic="", contents="echo test", session_id=None)) - await mgr.shutdown() + # # disconnect us server-side + # async with httpx.AsyncClient(base_url=websocket_server.url) as client: + # resp = await client.get(f"/disconnect/{token}") + # assert resp.status_code == 204 + # + # json_manager.send({"type": "authed_echo_request", "text": "Hello auth2"}) + # reply = await run_until_message_type(json_manager, "authed_echo_reply") + # assert reply == {"type": "authed_echo_reply", "text": "Hello auth2"} + # + # assert not json_manager._shutting_down + # assert json_manager.reconnections == 1 + # + # # disconnect us for a second time, should not reconnect due to max_reconnections + # async with httpx.AsyncClient(base_url=websocket_server.url) as client: + # resp = await client.get(f"/disconnect/{token}") + # assert resp.status_code == 204 + # + # await asyncio.sleep(0.01) + # assert json_manager._shutting_down + + +# async def test_hooks_in_subclass(websocket_server: AppDetails): +# """ +# Test that creating hooks as methods in a subclass definition work +# as well as attaching the hooks to instances of the class. +# """ +# +# class Sub(WebsocketManager): +# def __init__(self, ws_url): +# super().__init__(ws_url) +# self.register_callback( +# self.on_auth, +# on_predicate=lambda topic, msg: msg["type"] == "auth_reply" and msg["success"], +# ) +# +# async def inbound_message_hook(self, raw_contents: str): +# return json.loads(raw_contents) +# +# async def outbound_message_hook(self, msg: dict): +# return json.dumps(msg) +# +# async def auth_hook(self, mgr): +# ws = await self.unauth_ws +# msg = json.dumps({"type": "auth_request", "token": str(uuid.UUID(int=3))}) +# await ws.send(msg) +# +# mgr = Sub(ws_url=websocket_server.ws_base + "/ws") +# await mgr.initialize() +# mgr.send({"type": "authed_echo_request", "text": "Hello subclass"}) +# reply = await run_until_message_type(mgr, "authed_echo_reply") +# assert reply == {"type": "authed_echo_reply", "text": "Hello subclass"} +# await mgr.shutdown() +# +# +# # Two fixtures below used by test_structlog_contextvars_worker_hook +# # Pattern pulled from https://www.structlog.org/en/stable/testing.html +# @pytest.fixture(name="log_output") +# def fixture_log_output(): +# return structlog.testing.LogCapture() +# +# +# @pytest.fixture +# def fixture_configure_structlog(log_output): +# structlog.configure( +# processors=[ +# structlog.contextvars.merge_contextvars, +# structlog.processors.CallsiteParameterAdder( +# {structlog.processors.CallsiteParameter.FUNC_NAME} +# ), +# log_output, +# ] +# ) + +# +# @pytest.mark.usefixtures("fixture_configure_structlog") +# async def test_structlog_contextvars_worker_hook(websocket_server: AppDetails, log_output): +# """ +# Test that we can bind contextvars within the context_hook method and that any callbacks +# or outbound publishing methods will include those in logs. +# """ +# +# class Sub(WebsocketManager): +# def __init__(self, ws_url): +# super().__init__(ws_url) +# self.session_id = None +# self.register_callback(self.log_received) +# +# async def context_hook(self): +# structlog.contextvars.bind_contextvars(session_id=self.session_id) +# +# async def connect_hook(self, mgr): +# ws = await self.unauth_ws +# self.session_id = ws.response_headers.get("session_id") +# +# async def inbound_message_hook(self, raw_contents: str): +# return json.loads(raw_contents) +# +# async def outbound_message_hook(self, msg: dict): +# return json.dumps(msg) +# +# async def _publish(self, message: QueuedMessage): +# await super()._publish(message) +# structlog.get_logger().info(f"Publishing {message.contents}") +# +# async def log_received(self, message: dict): +# structlog.get_logger().info(f"Received {message}") +# +# mgr = Sub(ws_url=websocket_server.ws_base + "/ws") +# await mgr.initialize() +# # Wait until we're connected before sending a message, otherwise the outbound worker +# # will drop into .send / ._publish before we have a session_id set +# await mgr.connected.wait() +# mgr.send({"type": "unauthed_echo_request", "text": "Hello 1"}) +# # move forward in time until we get the next message from the webserver +# await mgr.next_event.wait() +# publish_log = log_output.entries[0] +# assert publish_log["event"] == 'Publishing {"type": "unauthed_echo_request", "text": "Hello 1"}' +# assert publish_log["session_id"] +# assert publish_log["func_name"] == "_publish" +# +# receive_log = log_output.entries[1] +# assert receive_log["event"] == "Received {'type': 'unauthed_echo_reply', 'text': 'Hello 1'}" +# assert receive_log["session_id"] +# assert receive_log["func_name"] == "log_received" +# +# await mgr.shutdown() +# +# +# async def test_disable_polling(mocker): +# """ +# Test that registered callbacks (record_last_seen_message) are still called +# when we use .schedule_for_delivery after initializing the WebsocketManager +# with the enable_polling=False flag, so it doesn't attempt to make a connection +# to an external server. +# +# Also test that callbacks which call .send() do drop messages into the _publish +# method, which would normally then send data over the wire. +# """ +# mgr = WebsocketManager(ws_url="ws://test") +# publish = mocker.patch.object(mgr, "_publish") +# await mgr.initialize(enable_polling=False) +# +# @mgr.callback(on_topic="") +# def echo(msg): +# mgr.send(msg) +# +# mgr.schedule_for_delivery(topic="", contents="echo test") +# await mgr.next_event.wait() +# assert mgr.last_seen_message == "echo test" +# await asyncio.sleep(0.01) +# publish.assert_called_once_with(QueuedMessage(topic="", contents="echo test", session_id=None)) +# await mgr.shutdown() diff --git a/tests/websocket_server.py b/tests/websocket_server.py index a14f465..2bef499 100644 --- a/tests/websocket_server.py +++ b/tests/websocket_server.py @@ -1,6 +1,6 @@ -""" -A FastAPI app used to test the WebsocketManager Sending backend. This thing -gets spun up as an external service using managed-service-fixtures in +"""A FastAPI app used to test the WebsocketManager Sending backend. + +This thing gets spun up as an external service using managed-service-fixtures in tests/test_websocket_backend.py. """ @@ -16,9 +16,10 @@ class WebsocketSession: - """ - Wrapper around a basic websocket connection to maintain state about whether it has - performed some kind of authentication exchange + """Wrapper around a basic websocket connection to maintain state. + + It keeps state about whether the instance has performed some kind of + authentication exchange. """ def __init__(self, ws: WebSocket): @@ -28,11 +29,11 @@ def __init__(self, ws: WebSocket): class WebsocketManager: - """ - Singleton collection of WebsocketSessions. Behaviors used in tests: + """Singleton collection of WebsocketSessions. + + Behaviors used in tests: - Force disconnect an authenticated session by GET /disconnect-ws/{jwt} to test reconnect """ - _singleton_instance = None def __init__(self): @@ -57,14 +58,16 @@ async def disconnect(self, session: WebsocketSession): self.sessions.remove(session) -# Create an arbitrary auth pattern, tests will subclass the WebsocketManager -# sending backend to implement the auth pattern +# Create an arbitrary auth pattern for tests. +# Tests will subclass the WebsocketManager from sending's websocket backend +# to implement the auth pattern. VALID_TOKENS = [str(uuid.UUID(int=1)), str(uuid.UUID(int=2)), str(uuid.UUID(int=3))] async def on_auth_request(token: str, session: WebsocketSession): - """ - Callback for when a client sends a msg like {'type': 'auth_request', 'token': ''} + """Callback for when a client sends an auth request message + + The message takes a form like {'type': 'auth_request', 'token': ''} If the token is in VALID_TOKENS, set the session as authenticated. Other callback functions may check whether a session is authenticated or not. """ @@ -77,15 +80,13 @@ async def on_auth_request(token: str, session: WebsocketSession): async def on_unauthed_echo(text: str, session: WebsocketSession): - """ - Always echoes some text, no matter whether the session is authenticated. - """ + """Always echoes some text, no matter whether the session is authenticated.""" await session.ws.send_json({"type": "unauthed_echo_reply", "text": text}) async def on_authed_echo(text: str, session: WebsocketSession): - """ - Echoes some text if the Websocket Session is authenticated. + """Echoes some text if the Websocket Session is authenticated. + Returns an error message if not. """ if session.authenticated: @@ -100,8 +101,9 @@ async def on_authed_echo(text: str, session: WebsocketSession): async def websocket_endpoint( ws: WebSocket, manager: WebsocketManager = Depends(WebsocketManager.dependency) ): - """ - Websocket endpoint. Expects messages to be JSON serialized and have a key 'type'. + """Websocket endpoint. + + Expects messages to be JSON serialized and have a key 'type'. Callbacks are called based on the type value: - 'auth_request': expects a 'token' key. If it in VALID_TOKENS, set session as authenticated - 'echo_request': expects a 'text' key. If the session is authenticated, echo the text back. @@ -132,8 +134,9 @@ async def websocket_endpoint( async def disconnect( session_token: str, manager: WebsocketManager = Depends(WebsocketManager.dependency) ): - """ - Endpoint to trigger a server-side websocket disconnect, as if the websocket server crashed. + """Endpoint to trigger a server-side websocket disconnect. + + Simulates a websocket server crash. Used in tests for the client reconnection logic. """ for session in manager.sessions: