Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP DEBUG] troubleshoot warnings #45

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sending/backends/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def __init__(self, ws_url: str):
self.next_event = asyncio.Event()
self.last_seen_message = None
self.reconnections = 0 # reconnection attempt count
# TODO: how do we increment reconnection attempts
# TODO: one place is in _poll_loop_finally block
# If max_reconnections is set, then the WebsocketManager will stop
# trying to reconnect after the number of tries set.
self.max_reconnections = None
Expand Down Expand Up @@ -213,8 +215,10 @@ async def _poll_loop(self):
finally:
if self._shutting_down:
break
elif self.max_reconnections and self.reconnections >= self.max_reconnections:
# TODO this logic needs checking
elif self.reconnections >= self.max_reconnections:
logger.warning("Hit max reconnection attempts, not reconnecting")

return await self.shutdown()
logger.info("Websocket server disconnected, resetting Futures and reconnecting")
if self.disconnect_hook:
Expand Down
6 changes: 5 additions & 1 deletion tests/test_inmemory_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest

from sending.logging import logger
from sending.backends.memory import InMemoryPubSubManager
from sending.base import QueuedMessage, SystemEvents, __not_in_a_session__

Expand Down Expand Up @@ -302,7 +303,10 @@ async def test_initialize_disabled_polling(self, mocker):
def echo(msg: str):
mgr.send(topic_name="", message=msg)

await mgr.initialize(enable_polling=False)
# TODO This next line needs checking since it triggers the warnings
# TODO not sure if the mock needs adjusting or the initialize logic in abstract and backend class
await mgr.initialize(enable_polling=True)

mgr.schedule_for_delivery(topic="", contents="echo test")
await asyncio.sleep(0.01)
publish.assert_called_once_with(
Expand Down
304 changes: 152 additions & 152 deletions tests/test_websocket_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,155 +143,155 @@ async def auth_hook(mgr: WebsocketManager):
reply = await run_until_message_type(json_manager, "authed_echo_reply")
assert reply == {"type": "authed_echo_reply", "text": "Hello auth"}

# disconnect us server-side
async with httpx.AsyncClient(base_url=websocket_server.url) as client:
resp = await client.get(f"/disconnect/{token}")
assert resp.status_code == 204

json_manager.send({"type": "authed_echo_request", "text": "Hello auth2"})
reply = await run_until_message_type(json_manager, "authed_echo_reply")
assert reply == {"type": "authed_echo_reply", "text": "Hello auth2"}

assert not json_manager._shutting_down
assert json_manager.reconnections == 1

# disconnect us for a second time, should not reconnect due to max_reconnections
async with httpx.AsyncClient(base_url=websocket_server.url) as client:
resp = await client.get(f"/disconnect/{token}")
assert resp.status_code == 204

await asyncio.sleep(0.01)
assert json_manager._shutting_down


async def test_hooks_in_subclass(websocket_server: AppDetails):
"""
Test that creating hooks as methods in a subclass definition work
as well as attaching the hooks to instances of the class.
"""

class Sub(WebsocketManager):
def __init__(self, ws_url):
super().__init__(ws_url)
self.register_callback(
self.on_auth,
on_predicate=lambda topic, msg: msg["type"] == "auth_reply" and msg["success"],
)

async def inbound_message_hook(self, raw_contents: str):
return json.loads(raw_contents)

async def outbound_message_hook(self, msg: dict):
return json.dumps(msg)

async def auth_hook(self, mgr):
ws = await self.unauth_ws
msg = json.dumps({"type": "auth_request", "token": str(uuid.UUID(int=3))})
await ws.send(msg)

mgr = Sub(ws_url=websocket_server.ws_base + "/ws")
await mgr.initialize()
mgr.send({"type": "authed_echo_request", "text": "Hello subclass"})
reply = await run_until_message_type(mgr, "authed_echo_reply")
assert reply == {"type": "authed_echo_reply", "text": "Hello subclass"}
await mgr.shutdown()


# Two fixtures below used by test_structlog_contextvars_worker_hook
# Pattern pulled from https://www.structlog.org/en/stable/testing.html
@pytest.fixture(name="log_output")
def fixture_log_output():
return structlog.testing.LogCapture()


@pytest.fixture
def fixture_configure_structlog(log_output):
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.CallsiteParameterAdder(
{structlog.processors.CallsiteParameter.FUNC_NAME}
),
log_output,
]
)


@pytest.mark.usefixtures("fixture_configure_structlog")
async def test_structlog_contextvars_worker_hook(websocket_server: AppDetails, log_output):
"""
Test that we can bind contextvars within the context_hook method and that any callbacks
or outbound publishing methods will include those in logs.
"""

class Sub(WebsocketManager):
def __init__(self, ws_url):
super().__init__(ws_url)
self.session_id = None
self.register_callback(self.log_received)

async def context_hook(self):
structlog.contextvars.bind_contextvars(session_id=self.session_id)

async def connect_hook(self, mgr):
ws = await self.unauth_ws
self.session_id = ws.response_headers.get("session_id")

async def inbound_message_hook(self, raw_contents: str):
return json.loads(raw_contents)

async def outbound_message_hook(self, msg: dict):
return json.dumps(msg)

async def _publish(self, message: QueuedMessage):
await super()._publish(message)
structlog.get_logger().info(f"Publishing {message.contents}")

async def log_received(self, message: dict):
structlog.get_logger().info(f"Received {message}")

mgr = Sub(ws_url=websocket_server.ws_base + "/ws")
await mgr.initialize()
# Wait until we're connected before sending a message, otherwise the outbound worker
# will drop into .send / ._publish before we have a session_id set
await mgr.connected.wait()
mgr.send({"type": "unauthed_echo_request", "text": "Hello 1"})
# move forward in time until we get the next message from the webserver
await mgr.next_event.wait()
publish_log = log_output.entries[0]
assert publish_log["event"] == 'Publishing {"type": "unauthed_echo_request", "text": "Hello 1"}'
assert publish_log["session_id"]
assert publish_log["func_name"] == "_publish"

receive_log = log_output.entries[1]
assert receive_log["event"] == "Received {'type': 'unauthed_echo_reply', 'text': 'Hello 1'}"
assert receive_log["session_id"]
assert receive_log["func_name"] == "log_received"

await mgr.shutdown()


async def test_disable_polling(mocker):
"""
Test that registered callbacks (record_last_seen_message) are still called
when we use .schedule_for_delivery after initializing the WebsocketManager
with the enable_polling=False flag, so it doesn't attempt to make a connection
to an external server.

Also test that callbacks which call .send() do drop messages into the _publish
method, which would normally then send data over the wire.
"""
mgr = WebsocketManager(ws_url="ws://test")
publish = mocker.patch.object(mgr, "_publish")
await mgr.initialize(enable_polling=False)

@mgr.callback(on_topic="")
def echo(msg):
mgr.send(msg)

mgr.schedule_for_delivery(topic="", contents="echo test")
await mgr.next_event.wait()
assert mgr.last_seen_message == "echo test"
await asyncio.sleep(0.01)
publish.assert_called_once_with(QueuedMessage(topic="", contents="echo test", session_id=None))
await mgr.shutdown()
# # disconnect us server-side
# async with httpx.AsyncClient(base_url=websocket_server.url) as client:
# resp = await client.get(f"/disconnect/{token}")
# assert resp.status_code == 204
#
# json_manager.send({"type": "authed_echo_request", "text": "Hello auth2"})
# reply = await run_until_message_type(json_manager, "authed_echo_reply")
# assert reply == {"type": "authed_echo_reply", "text": "Hello auth2"}
#
# assert not json_manager._shutting_down
# assert json_manager.reconnections == 1
#
# # disconnect us for a second time, should not reconnect due to max_reconnections
# async with httpx.AsyncClient(base_url=websocket_server.url) as client:
# resp = await client.get(f"/disconnect/{token}")
# assert resp.status_code == 204
#
# await asyncio.sleep(0.01)
# assert json_manager._shutting_down


# async def test_hooks_in_subclass(websocket_server: AppDetails):
# """
# Test that creating hooks as methods in a subclass definition work
# as well as attaching the hooks to instances of the class.
# """
#
# class Sub(WebsocketManager):
# def __init__(self, ws_url):
# super().__init__(ws_url)
# self.register_callback(
# self.on_auth,
# on_predicate=lambda topic, msg: msg["type"] == "auth_reply" and msg["success"],
# )
#
# async def inbound_message_hook(self, raw_contents: str):
# return json.loads(raw_contents)
#
# async def outbound_message_hook(self, msg: dict):
# return json.dumps(msg)
#
# async def auth_hook(self, mgr):
# ws = await self.unauth_ws
# msg = json.dumps({"type": "auth_request", "token": str(uuid.UUID(int=3))})
# await ws.send(msg)
#
# mgr = Sub(ws_url=websocket_server.ws_base + "/ws")
# await mgr.initialize()
# mgr.send({"type": "authed_echo_request", "text": "Hello subclass"})
# reply = await run_until_message_type(mgr, "authed_echo_reply")
# assert reply == {"type": "authed_echo_reply", "text": "Hello subclass"}
# await mgr.shutdown()
#
#
# # Two fixtures below used by test_structlog_contextvars_worker_hook
# # Pattern pulled from https://www.structlog.org/en/stable/testing.html
# @pytest.fixture(name="log_output")
# def fixture_log_output():
# return structlog.testing.LogCapture()
#
#
# @pytest.fixture
# def fixture_configure_structlog(log_output):
# structlog.configure(
# processors=[
# structlog.contextvars.merge_contextvars,
# structlog.processors.CallsiteParameterAdder(
# {structlog.processors.CallsiteParameter.FUNC_NAME}
# ),
# log_output,
# ]
# )

#
# @pytest.mark.usefixtures("fixture_configure_structlog")
# async def test_structlog_contextvars_worker_hook(websocket_server: AppDetails, log_output):
# """
# Test that we can bind contextvars within the context_hook method and that any callbacks
# or outbound publishing methods will include those in logs.
# """
#
# class Sub(WebsocketManager):
# def __init__(self, ws_url):
# super().__init__(ws_url)
# self.session_id = None
# self.register_callback(self.log_received)
#
# async def context_hook(self):
# structlog.contextvars.bind_contextvars(session_id=self.session_id)
#
# async def connect_hook(self, mgr):
# ws = await self.unauth_ws
# self.session_id = ws.response_headers.get("session_id")
#
# async def inbound_message_hook(self, raw_contents: str):
# return json.loads(raw_contents)
#
# async def outbound_message_hook(self, msg: dict):
# return json.dumps(msg)
#
# async def _publish(self, message: QueuedMessage):
# await super()._publish(message)
# structlog.get_logger().info(f"Publishing {message.contents}")
#
# async def log_received(self, message: dict):
# structlog.get_logger().info(f"Received {message}")
#
# mgr = Sub(ws_url=websocket_server.ws_base + "/ws")
# await mgr.initialize()
# # Wait until we're connected before sending a message, otherwise the outbound worker
# # will drop into .send / ._publish before we have a session_id set
# await mgr.connected.wait()
# mgr.send({"type": "unauthed_echo_request", "text": "Hello 1"})
# # move forward in time until we get the next message from the webserver
# await mgr.next_event.wait()
# publish_log = log_output.entries[0]
# assert publish_log["event"] == 'Publishing {"type": "unauthed_echo_request", "text": "Hello 1"}'
# assert publish_log["session_id"]
# assert publish_log["func_name"] == "_publish"
#
# receive_log = log_output.entries[1]
# assert receive_log["event"] == "Received {'type': 'unauthed_echo_reply', 'text': 'Hello 1'}"
# assert receive_log["session_id"]
# assert receive_log["func_name"] == "log_received"
#
# await mgr.shutdown()
#
#
# async def test_disable_polling(mocker):
# """
# Test that registered callbacks (record_last_seen_message) are still called
# when we use .schedule_for_delivery after initializing the WebsocketManager
# with the enable_polling=False flag, so it doesn't attempt to make a connection
# to an external server.
#
# Also test that callbacks which call .send() do drop messages into the _publish
# method, which would normally then send data over the wire.
# """
# mgr = WebsocketManager(ws_url="ws://test")
# publish = mocker.patch.object(mgr, "_publish")
# await mgr.initialize(enable_polling=False)
#
# @mgr.callback(on_topic="")
# def echo(msg):
# mgr.send(msg)
#
# mgr.schedule_for_delivery(topic="", contents="echo test")
# await mgr.next_event.wait()
# assert mgr.last_seen_message == "echo test"
# await asyncio.sleep(0.01)
# publish.assert_called_once_with(QueuedMessage(topic="", contents="echo test", session_id=None))
# await mgr.shutdown()
Loading