diff --git a/src/connector/src/sink/file_sink/opendal_sink.rs b/src/connector/src/sink/file_sink/opendal_sink.rs index 50c950484b443..3c535a8501680 100644 --- a/src/connector/src/sink/file_sink/opendal_sink.rs +++ b/src/connector/src/sink/file_sink/opendal_sink.rs @@ -152,7 +152,7 @@ impl Sink for FileSink { &self.path, self.schema.clone(), writer_param.executor_id, - self.format_desc.encode.clone(), + &self.format_desc, self.engine_type.clone(), self.batching_strategy.clone(), )?; @@ -350,11 +350,12 @@ impl OpenDalSinkWriter { write_path: &str, rw_schema: Schema, executor_id: u64, - encode_type: SinkEncode, + format_desc: &SinkFormatDesc, engine_type: EngineType, batching_strategy: BatchingStrategy, ) -> Result { let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?; + let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?; let row_encoder = JsonEncoder::new( rw_schema, None, @@ -362,7 +363,7 @@ impl OpenDalSinkWriter { TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, TimeHandlingMode::String, - JsonbHandlingMode::String, + jsonb_handling_mode, ); Ok(Self { schema: Arc::new(arrow_schema), @@ -370,8 +371,7 @@ impl OpenDalSinkWriter { operator, sink_writer: None, executor_id, - - encode_type, + encode_type: format_desc.encode.clone(), row_encoder, engine_type, batching_strategy,