Skip to content

Commit

Permalink
add more topics to sandbox script
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Jan 17, 2025
1 parent ba9695a commit 2ba84da
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 22 deletions.
11 changes: 11 additions & 0 deletions kaskade/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import asyncio
import functools
import struct
from io import BytesIO
from pathlib import Path
from types import MappingProxyType
from typing import Callable, Any

from confluent_kafka import KafkaException
from fastavro import schemaless_writer
from fastavro.schema import load_schema
from textual.app import App

from kaskade import logger
Expand Down Expand Up @@ -58,3 +62,10 @@ def load_properties(file_path: str, sep: str = "=", comment_char: str = "#") ->
props[key] = value

return props


def py_to_avro(schema_path: str, data: dict[str, Any] | MappingProxyType[str, Any]) -> bytes:
schema = load_schema(schema_path)
buffer_writer = BytesIO()
schemaless_writer(buffer_writer, schema, data)
return buffer_writer.getvalue()
50 changes: 44 additions & 6 deletions scripts/sandbox.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import time
from time import sleep

from confluent_kafka.cimpl import NewTopic
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.schema_registry.json_schema import JSONSerializer
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
from typing import Callable, Any
import uuid
Expand All @@ -14,9 +17,13 @@
from confluent_kafka import Producer, KafkaError, KafkaException
from faker import Faker

from kaskade.utils import pack_bytes, file_to_str
from tests.protobuf.user_pb2 import User as ProtoUser
from kaskade.utils import pack_bytes, file_to_str, py_to_avro
from tests.protobuf.user_pb2 import User as ProtobufUser
from tests.avro.user import User as AvroUser
from tests.json.user import User as JsonUser

JSON_USER_SCHEMA = "tests/json/user.schema.json"
AVRO_USER_SCHEMA = "tests/avro/user.avsc"


class Populator:
Expand All @@ -42,6 +49,7 @@ def create_topic(self, topic: str) -> None:
for future in futures.values():
try:
future.result()
sleep(0.1)
except KafkaException as ke:
if (
len(ke.args) > 0
Expand Down Expand Up @@ -75,11 +83,20 @@ def populate(
show_default=True,
)
def main(messages: int, bootstrap_servers: str, registry: str) -> None:
registry_client = SchemaRegistryClient({"url": registry})
avro_serializer = AvroSerializer(
SchemaRegistryClient({"url": registry}),
file_to_str("tests/avro/user.avsc"),
registry_client,
file_to_str(AVRO_USER_SCHEMA),
lambda value, ctx: vars(value),
)
json_serializer = JSONSerializer(
file_to_str(JSON_USER_SCHEMA),
registry_client,
lambda value, ctx: vars(value),
)
protobuf_serializer = ProtobufSerializer(
ProtobufUser, registry_client, {"use.deprecated.format": False}
)
faker = Faker()
topics = [
(
Expand Down Expand Up @@ -122,15 +139,36 @@ def main(messages: int, bootstrap_servers: str, registry: str) -> None:
lambda: faker.json(),
lambda value: value.encode("utf-8"),
),
(
"json-schema",
lambda: JsonUser(name=faker.name()),
lambda value: json_serializer(
value, SerializationContext("json-schema", MessageField.VALUE)
),
),
(
"protobuf",
lambda: ProtoUser(name=faker.name()),
lambda: ProtobufUser(name=faker.name()),
lambda value: value.SerializeToString(),
),
(
"protobuf-schema",
lambda: ProtobufUser(name=faker.name()),
lambda value: protobuf_serializer(
value, SerializationContext("protobuf-schema", MessageField.VALUE)
),
),
(
"avro",
lambda: AvroUser(name=faker.name()),
lambda value: avro_serializer(value, SerializationContext("avro", MessageField.VALUE)),
lambda value: py_to_avro(AVRO_USER_SCHEMA, vars(value)),
),
(
"avro-schema",
lambda: AvroUser(name=faker.name()),
lambda value: avro_serializer(
value, SerializationContext("avro-schema", MessageField.VALUE)
),
),
]
populator = Populator({BOOTSTRAP_SERVERS: bootstrap_servers})
Expand Down
6 changes: 6 additions & 0 deletions tests/json/user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class User:
def __init__(self, name: str):
self.name = name

def __str__(self) -> str:
return str(vars(self))
11 changes: 11 additions & 0 deletions tests/json/user.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "User",
"type": "object",
"properties": {
"name": {
"type": "string"
}
}
}

20 changes: 4 additions & 16 deletions tests/tests_deserializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@

import struct
import unittest
from typing import Any

from fastavro import schemaless_writer
from fastavro.schema import load_schema
from io import BytesIO
from unittest.mock import patch

from confluent_kafka.serialization import MessageField
Expand All @@ -25,7 +21,7 @@
ProtobufDeserializer,
AvroDeserializer,
)
from kaskade.utils import file_to_str
from kaskade.utils import file_to_str, py_to_avro
from tests import faker
from tests.protobuf.user_pb2 import User

Expand All @@ -46,14 +42,6 @@
)


def py_to_avro(expected_value: dict[str, Any]):
schema = load_schema(AVRO_PATH)
buffer_writer = BytesIO()
schemaless_writer(buffer_writer, schema, expected_value)
encoded = buffer_writer.getvalue()
return encoded


class TestDeserializer(unittest.TestCase):

def test_string_deserialization(self):
Expand Down Expand Up @@ -138,7 +126,7 @@ def test_registry_deserialization_avro(self, mock_sr_client_class):
)
mock_sr_client_class.return_value.get_schema.return_value.schema_type = "AVRO"

encoded = py_to_avro(expected_value)
encoded = py_to_avro(AVRO_PATH, expected_value)

deserializer = RegistryDeserializer({})

Expand Down Expand Up @@ -185,7 +173,7 @@ def test_protobuf_deserialization_with_magic_byte(self):
def test_avro_deserialization(self):
expected_value = {"name": "Pedro Pascal"}
deserializer = AvroDeserializer({"value": AVRO_PATH})
encoded = py_to_avro(expected_value)
encoded = py_to_avro(AVRO_PATH, expected_value)

result = deserializer.deserialize(encoded, "", MessageField.VALUE)
print(encoded)
Expand All @@ -195,7 +183,7 @@ def test_avro_deserialization(self):
def test_avro_deserialization_with_magic_byte(self):
expected_value = {"name": "Pedro Pascal"}
deserializer = AvroDeserializer({"value": AVRO_PATH})
encoded = py_to_avro(expected_value)
encoded = py_to_avro(AVRO_PATH, expected_value)

result = deserializer.deserialize(b"\x00\x00\x00\x00\x00" + encoded, "", MessageField.VALUE)

Expand Down

0 comments on commit 2ba84da

Please sign in to comment.