diff --git a/serializers/src/main/java/io/pravega/schemaregistry/schemas/JSONSchema.java b/serializers/src/main/java/io/pravega/schemaregistry/schemas/JSONSchema.java index b7a252a92..55a90cf80 100644 --- a/serializers/src/main/java/io/pravega/schemaregistry/schemas/JSONSchema.java +++ b/serializers/src/main/java/io/pravega/schemaregistry/schemas/JSONSchema.java @@ -10,6 +10,7 @@ package io.pravega.schemaregistry.schemas; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.module.jsonSchema.JsonSchema; import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator; @@ -105,18 +106,20 @@ public static JSONSchema of(String type, JsonSchema schema) { } /** - * Method to create a typed JSONSchema of type {@link Object} from the given schema string. + * Method to create a typed JSONSchema of type T from the given schema string. * * @param type type of object identified by {@link SchemaInfo#getType()}. * @param schemaString Schema string to use. + * @param tClass class for the type of object + * @param Type of object * @return Returns an JSONSchema with {@link Object} type. */ - public static JSONSchema of(String type, String schemaString) { + public static JSONSchema of(String type, String schemaString, Class tClass) { Preconditions.checkNotNull(type, "Type cannot be null."); Preconditions.checkArgument(!Strings.isNullOrEmpty(schemaString), "Schema String cannot be null or empty."); try { JsonSchema schema = OBJECT_MAPPER.readValue(schemaString, JsonSchema.class); - return new JSONSchema<>(schema, type, schemaString, Object.class); + return new JSONSchema<>(schema, type, schemaString, tClass); } catch (JsonProcessingException e) { throw new IllegalArgumentException("Unable to parse schema string", e); } @@ -147,18 +150,18 @@ public static JSONSchema ofBaseType(Class tDerived, Class } /** - * Method to create a typed JSONSchema of type {@link Object} from the given schema. + * Method to create a typed JSONSchema of type {@link JsonNode} from the given schema. * * @param schemaInfo Schema info to translate into json schema. - * @return Returns an JSONSchema with {@link Object} type. + * @return Returns an JSONSchema with {@link JsonNode} type. */ - public static JSONSchema from(SchemaInfo schemaInfo) { + public static JSONSchema from(SchemaInfo schemaInfo) { Preconditions.checkNotNull(schemaInfo); try { String schemaString = new String(schemaInfo.getSchemaData().array(), Charsets.UTF_8); JsonSchema schema = OBJECT_MAPPER.readValue(schemaString, JsonSchema.class); - return new JSONSchema<>(schemaInfo, schema, schemaString, Object.class); + return new JSONSchema<>(schemaInfo, schema, schemaString, JsonNode.class); } catch (JsonProcessingException e) { throw new IllegalArgumentException("Unable to get json schema from schema info", e); } diff --git a/serializers/src/main/java/io/pravega/schemaregistry/serializers/JsonGenericDeserializer.java b/serializers/src/main/java/io/pravega/schemaregistry/serializers/JsonGenericDeserializer.java index d1d955af7..29daaa05d 100644 --- a/serializers/src/main/java/io/pravega/schemaregistry/serializers/JsonGenericDeserializer.java +++ b/serializers/src/main/java/io/pravega/schemaregistry/serializers/JsonGenericDeserializer.java @@ -27,8 +27,6 @@ class JsonGenericDeserializer extends AbstractDeserializer> super(groupId, client, null, false, decoders, encodingCache, encodeHeader); this.objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); - objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); - objectMapper.setVisibility(PropertyAccessor.CREATOR, JsonAutoDetect.Visibility.ANY); } @Override diff --git a/serializers/src/main/java/io/pravega/schemaregistry/serializers/MultiFormatSerializerFactory.java b/serializers/src/main/java/io/pravega/schemaregistry/serializers/MultiFormatSerializerFactory.java index abe8798b3..bee87cb01 100644 --- a/serializers/src/main/java/io/pravega/schemaregistry/serializers/MultiFormatSerializerFactory.java +++ b/serializers/src/main/java/io/pravega/schemaregistry/serializers/MultiFormatSerializerFactory.java @@ -167,12 +167,13 @@ private static AbstractSerializer getPravegaSerializer( return new AvroSerializer<>(groupId, schemaRegistryClient, AvroSchema.from(schemaInfo), config.getEncoder(), config.isRegisterSchema()); case Protobuf: - ProtobufSerializer m = new ProtobufSerializer<>(groupId, schemaRegistryClient, + ProtobufSerializer pSerializer = new ProtobufSerializer<>(groupId, schemaRegistryClient, ProtobufSchema.from(schemaInfo), config.getEncoder(), config.isRegisterSchema(), config.isWriteEncodingHeader()); - return (AbstractSerializer) m; + return (AbstractSerializer) pSerializer; case Json: - return new JsonSerializer<>(groupId, schemaRegistryClient, JSONSchema.from(schemaInfo), + JsonSerializer jsonSerializer = new JsonSerializer<>(groupId, schemaRegistryClient, JSONSchema.from(schemaInfo), config.getEncoder(), config.isRegisterSchema(), config.isWriteEncodingHeader()); + return (AbstractSerializer) jsonSerializer; case Custom: return getCustomSerializer(config, customSerializers, schemaRegistryClient, groupId, schemaInfo); default: diff --git a/serializers/src/main/java/io/pravega/schemaregistry/serializers/WithSchema.java b/serializers/src/main/java/io/pravega/schemaregistry/serializers/WithSchema.java index a28853f67..25b0b9d19 100644 --- a/serializers/src/main/java/io/pravega/schemaregistry/serializers/WithSchema.java +++ b/serializers/src/main/java/io/pravega/schemaregistry/serializers/WithSchema.java @@ -11,6 +11,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.DynamicMessage; import com.google.protobuf.GeneratedMessageV3; @@ -212,6 +213,8 @@ private static String toJsonString(SerializationFormat format, Object deserializ * @return A WithSchema object which has Avro Schema and the corresponding object. */ public static WithSchema avro(T object, AvroSchema avroSchema) { + Preconditions.checkNotNull(object, "object cannot be null"); + Preconditions.checkNotNull(avroSchema, "schema cannot be null"); return new WithSchema<>(avroSchema.getSchemaInfo(), object, (x, y) -> object); } @@ -224,6 +227,8 @@ public static WithSchema avro(T object, AvroSchema avroSchema) { * @return A WithSchema object which has Protobuf Schema and the corresponding object. */ public static WithSchema proto(T object, ProtobufSchema protobufSchema) { + Preconditions.checkNotNull(object, "object cannot be null"); + Preconditions.checkNotNull(protobufSchema, "schema cannot be null"); return new WithSchema<>(protobufSchema.getSchemaInfo(), object, (x, y) -> object); } @@ -236,6 +241,8 @@ public static WithSchema proto(T object, Proto * @return A WithSchema object which has Json schema and the corresponding object. */ public static WithSchema json(T object, JSONSchema jsonSchema) { + Preconditions.checkNotNull(object, "object cannot be null"); + Preconditions.checkNotNull(jsonSchema, "schema cannot be null"); return new WithSchema<>(jsonSchema.getSchemaInfo(), object, (x, y) -> object); } } diff --git a/serializers/src/test/java/io/pravega/schemaregistry/schemas/SchemasTest.java b/serializers/src/test/java/io/pravega/schemaregistry/schemas/SchemasTest.java index 3fff1c381..f3a6cba01 100644 --- a/serializers/src/test/java/io/pravega/schemaregistry/schemas/SchemasTest.java +++ b/serializers/src/test/java/io/pravega/schemaregistry/schemas/SchemasTest.java @@ -9,7 +9,6 @@ */ package io.pravega.schemaregistry.schemas; -import com.fasterxml.jackson.core.JsonProcessingException; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.DynamicMessage; import com.google.protobuf.GeneratedMessageV3; @@ -99,12 +98,12 @@ public void testProtobufSchema() throws IOException { } @Test - public void testJsonSchema() throws JsonProcessingException { + public void testJsonSchema() { JSONSchema schema = JSONSchema.of(User.class); assertNotNull(schema.getSchema()); assertEquals(schema.getSchemaInfo().getSerializationFormat(), SerializationFormat.Json); - JSONSchema schema2 = JSONSchema.of("Person", JSON_SCHEMA_STRING); + JSONSchema schema2 = JSONSchema.of("Person", JSON_SCHEMA_STRING, Object.class); assertNotNull(schema2.getSchema()); assertEquals(schema2.getSchemaInfo().getSerializationFormat(), SerializationFormat.Json); diff --git a/serializers/src/test/java/io/pravega/schemaregistry/serializers/SerializerTest.java b/serializers/src/test/java/io/pravega/schemaregistry/serializers/SerializerTest.java index 9bbc0f0ce..8ee567058 100644 --- a/serializers/src/test/java/io/pravega/schemaregistry/serializers/SerializerTest.java +++ b/serializers/src/test/java/io/pravega/schemaregistry/serializers/SerializerTest.java @@ -279,14 +279,14 @@ public void testJsonSerializers() throws JsonProcessingException { String schemaString = "{\"type\": \"object\",\"title\": \"The external data schema\",\"properties\": {\"content\": {\"type\": \"string\"}}}"; - JSONSchema myData = JSONSchema.of("MyData", schemaString); + JSONSchema myData = JSONSchema.of("MyData", schemaString, HashMap.class); VersionInfo versionInfo3 = new VersionInfo("myData", 0, 2); doAnswer(x -> versionInfo3).when(client).getVersionForSchema(anyString(), eq(myData.getSchemaInfo())); doAnswer(x -> new EncodingId(2)).when(client).getEncodingId(anyString(), eq(versionInfo3), any()); doAnswer(x -> new EncodingInfo(versionInfo3, myData.getSchemaInfo(), Codecs.None.getCodec().getCodecType())).when(client).getEncodingInfo(anyString(), eq(new EncodingId(2))); - Serializer serializer2 = SerializerFactory.jsonSerializer(config, myData); - Map jsonObject = new HashMap<>(); + Serializer serializer2 = SerializerFactory.jsonSerializer(config, myData); + HashMap jsonObject = new HashMap<>(); jsonObject.put("content", "mxx"); ByteBuffer s = serializer2.serialize(jsonObject); @@ -294,19 +294,19 @@ public void testJsonSerializers() throws JsonProcessingException { String stringSchema = new ObjectMapper().writeValueAsString(JsonSchema.minimalForFormat(JsonFormatTypes.STRING)); - JSONSchema strSchema = JSONSchema.of("string", stringSchema); + JSONSchema strSchema = JSONSchema.of("string", stringSchema, String.class); VersionInfo versionInfo4 = new VersionInfo("myData", 0, 3); doAnswer(x -> versionInfo4).when(client).getVersionForSchema(anyString(), eq(strSchema.getSchemaInfo())); doAnswer(x -> new EncodingId(3)).when(client).getEncodingId(anyString(), eq(versionInfo4), any()); doAnswer(x -> new EncodingInfo(versionInfo4, strSchema.getSchemaInfo(), Codecs.None.getCodec().getCodecType())).when(client).getEncodingInfo(anyString(), eq(new EncodingId(3))); - Serializer serializer3 = SerializerFactory.jsonSerializer(config, strSchema); - Serializer deserializer3 = SerializerFactory.jsonDeserializer(config, strSchema); + Serializer serializer3 = SerializerFactory.jsonSerializer(config, strSchema); + Serializer deserializer3 = SerializerFactory.jsonDeserializer(config, strSchema); Serializer> generic3 = SerializerFactory.jsonGenericDeserializer(config); String string = "a"; s = serializer3.serialize(string); Object x = deserializer3.deserialize(s); - assertTrue(x instanceof String); + assertNotNull(x); assertEquals(x, string); s = serializer3.serialize(string); Object jsonNode = generic3.deserialize(s);