Skip to content

Commit

Permalink
feat(dal-sink): support dynamic json handle mode (#20418)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Feb 12, 2025
1 parent a517cae commit 2b269bd
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions src/connector/src/sink/file_sink/opendal_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl<S: OpendalSinkBackend> Sink for FileSink<S> {
&self.path,
self.schema.clone(),
writer_param.executor_id,
self.format_desc.encode.clone(),
&self.format_desc,
self.engine_type.clone(),
self.batching_strategy.clone(),
)?;
Expand Down Expand Up @@ -350,28 +350,28 @@ impl OpenDalSinkWriter {
write_path: &str,
rw_schema: Schema,
executor_id: u64,
encode_type: SinkEncode,
format_desc: &SinkFormatDesc,
engine_type: EngineType,
batching_strategy: BatchingStrategy,
) -> Result<Self> {
let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?;
let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?;
let row_encoder = JsonEncoder::new(
rw_schema,
None,
crate::sink::encoder::DateHandlingMode::String,
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::String,
JsonbHandlingMode::String,
jsonb_handling_mode,
);
Ok(Self {
schema: Arc::new(arrow_schema),
write_path: write_path.to_owned(),
operator,
sink_writer: None,
executor_id,

encode_type,
encode_type: format_desc.encode.clone(),
row_encoder,
engine_type,
batching_strategy,
Expand Down

0 comments on commit 2b269bd

Please sign in to comment.