From 003b3645ab3cc238a73b62b97b92866a0c355cff Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 25 Nov 2024 11:48:22 +0800 Subject: [PATCH] fix(query): fix lost data in custom exchange --- .../pipeline/core/src/processors/shuffle_processor.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/query/pipeline/core/src/processors/shuffle_processor.rs b/src/query/pipeline/core/src/processors/shuffle_processor.rs index 724e76bc3839b..5b1308113f7db 100644 --- a/src/query/pipeline/core/src/processors/shuffle_processor.rs +++ b/src/query/pipeline/core/src/processors/shuffle_processor.rs @@ -340,7 +340,10 @@ impl Processor for MergePartitionProcessor { input.set_need_data(); } - if all_inputs_finished { + if all_inputs_finished + && (!matches!(T::STRATEGY, MultiwayStrategy::Custom) + || self.inputs_data.iter().all(Option::is_none)) + { self.output.finish(); return Ok(Event::Finished); } @@ -352,6 +355,11 @@ impl Processor for MergePartitionProcessor { self.output.push_data(Ok(block)); return Ok(Event::NeedConsume); } + + if all_inputs_finished && self.inputs_data.iter().all(Option::is_none) { + self.output.finish(); + return Ok(Event::Finished); + } } Ok(Event::NeedData)