Skip to content

Commit

Permalink
Apply black formatter formatting on misformatted files
Browse files Browse the repository at this point in the history
  • Loading branch information
jimleroyer committed Feb 13, 2025
1 parent ae002d9 commit 91127f2
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 37 deletions.
4 changes: 1 addition & 3 deletions app/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ def wrapper(*args, **kwargs):
if param.annotation is SignedNotification:
verified_value = signer_notification.verify(signed)
elif param.annotation is SignedNotifications:
verified_value = [
signer_notification.verify(item) for item in signed
]
verified_value = [signer_notification.verify(item) for item in signed]

# Replace the signed value with the verified value
bound_args.arguments[param_name] = verified_value
Expand Down
8 changes: 2 additions & 6 deletions app/encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ def init_app(self, app: Any, secret_key: str | List[str], salt: str) -> None:
salt (str): The salt to use for signing.
"""
self.app = app
self.secret_key = cast(
List[str], [secret_key] if type(secret_key) is str else secret_key
)
self.secret_key = cast(List[str], [secret_key] if type(secret_key) is str else secret_key)
self.serializer = URLSafeSerializer(secret_key)
self.salt = salt

Expand All @@ -56,9 +54,7 @@ def sign(self, to_sign: str | NotificationDictToSign) -> str | bytes:
"""
return self.serializer.dumps(to_sign, salt=self.salt)

def sign_with_all_keys(
self, to_sign: str | NotificationDictToSign
) -> List[str | bytes]:
def sign_with_all_keys(self, to_sign: str | NotificationDictToSign) -> List[str | bytes]:
"""Sign a string or dict with all the individual keys in the class secret key list, and the class salt.
Args:
Expand Down
24 changes: 6 additions & 18 deletions app/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ def inbox_name(self, suffix=None, process_type=None):
return f"{self.value}::{str(process_type)}"
return self.value

def inflight_prefix(
self, suffix: Optional[str] = None, process_type: Optional[str] = None
) -> str:
def inflight_prefix(self, suffix: Optional[str] = None, process_type: Optional[str] = None) -> str:
if process_type and suffix:
return f"{Buffer.IN_FLIGHT.value}:{str(suffix)}:{str(process_type)}"
if suffix:
Expand Down Expand Up @@ -122,9 +120,7 @@ class RedisQueue(Queue):

scripts: Dict[str, Any] = {}

def __init__(
self, suffix=None, expire_inflight_after_seconds=300, process_type=None
) -> None:
def __init__(self, suffix=None, expire_inflight_after_seconds=300, process_type=None) -> None:
"""
Constructor for the Redis Queue
Expand Down Expand Up @@ -153,9 +149,7 @@ def init_app(self, redis: Redis, metrics_logger: MetricsLogger):

def poll(self, count=10) -> tuple[UUID, list[str]]:
receipt = uuid4()
in_flight_key = Buffer.IN_FLIGHT.inflight_name(
receipt, self._suffix, self._process_type
)
in_flight_key = Buffer.IN_FLIGHT.inflight_name(receipt, self._suffix, self._process_type)
results = self.__move_to_inflight(in_flight_key, count)
if results:
current_app.logger.info(f"Inflight created: {in_flight_key}")
Expand All @@ -178,9 +172,7 @@ def expire_inflights(self):
expired = self.scripts[self.LUA_EXPIRE_INFLIGHTS](args=args)
if expired:
put_batch_saving_expiry_metric(self.__metrics_logger, self, len(expired))
current_app.logger.warning(
f"Moved inflights {expired} back to inbox {self._inbox}"
)
current_app.logger.warning(f"Moved inflights {expired} back to inbox {self._inbox}")

def acknowledge(self, receipt: UUID) -> bool:
"""
Expand All @@ -192,9 +184,7 @@ def acknowledge(self, receipt: UUID) -> bool:
Returns: True if the inflight was found in that queue and removed, False otherwise
"""
inflight_name = Buffer.IN_FLIGHT.inflight_name(
receipt, self._suffix, self._process_type
)
inflight_name = Buffer.IN_FLIGHT.inflight_name(receipt, self._suffix, self._process_type)
if not self._redis_client.exists(inflight_name):
current_app.logger.warning(f"Inflight to delete not found: {inflight_name}")
return False
Expand All @@ -208,9 +198,7 @@ def publish(self, message: str):
put_batch_saving_metric(self.__metrics_logger, self, 1)

def __move_to_inflight(self, in_flight_key: str, count: int) -> list[str]:
results = self.scripts[self.LUA_MOVE_TO_INFLIGHT](
args=[self._inbox, in_flight_key, count]
)
results = self.scripts[self.LUA_MOVE_TO_INFLIGHT](args=[self._inbox, in_flight_key, count])
decoded = [result.decode("utf-8") for result in results]
return decoded

Expand Down
12 changes: 5 additions & 7 deletions tests/app/test_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ def func_with_list_of_signed_notifications(
):
return signed_notifications

signed = [
signer_notification.sign(notification)
for notification in ["raw notification 1", "raw notification 2"]
]
signed = [signer_notification.sign(notification) for notification in ["raw notification 1", "raw notification 2"]]
unsigned = func_with_list_of_signed_notifications(signed)
assert unsigned == ["raw notification 1", "raw notification 2"]

Expand Down Expand Up @@ -89,9 +86,10 @@ def func_to_sign_return():
return ["raw notification 1", "raw notification 2"]

signed = func_to_sign_return()
assert [
signer_notification.verify(notification) for notification in signed
] == ["raw notification 1", "raw notification 2"]
assert [signer_notification.verify(notification) for notification in signed] == [
"raw notification 1",
"raw notification 2",
]

def test_sign_return_with_empty_list(self):
@sign_return
Expand Down
4 changes: 1 addition & 3 deletions tests/app/test_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ def test_should_verify_content_signed_with_an_old_secret(self, notify_api):
signer2.init_app(notify_api, ["s2", "s3"], "salt")
assert signer2.verify(signer1.sign("this")) == "this"

def test_should_unsafe_verify_content_signed_with_different_secrets(
self, notify_api
):
def test_should_unsafe_verify_content_signed_with_different_secrets(self, notify_api):
signer1 = CryptoSigner()
signer2 = CryptoSigner()
signer1.init_app(notify_api, "secret1", "salt")
Expand Down

0 comments on commit 91127f2

Please sign in to comment.