From 39505564d4ec4dbae94311d5dcce2d6212c73953 Mon Sep 17 00:00:00 2001 From: Geser Dugarov Date: Tue, 11 Feb 2025 17:12:31 +0700 Subject: [PATCH] Fix delete operation for non bucket index --- .../org/apache/hudi/client/model/HoodieFlinkRecord.java | 8 ++------ .../apache/hudi/sink/transform/RowDataEnrichFunction.java | 7 ++++++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java index de4c1e148afc..aed9556b4cbe 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java @@ -55,12 +55,8 @@ public class HoodieFlinkRecord implements Serializable { private final RowData rowData; - public HoodieFlinkRecord(String recordKey, String partitionPath, RowData rowData) { - this(recordKey, partitionPath, "", "", "", false, rowData); - } - - public HoodieFlinkRecord(String recordKey, String partitionPath, boolean isIndexRecord, RowData rowData) { - this(recordKey, partitionPath, "", "", "", isIndexRecord, rowData); + public HoodieFlinkRecord(String recordKey, String partitionPath, String operationType, RowData rowData) { + this(recordKey, partitionPath, "", "", operationType, false, rowData); } // constructor for index records without row data diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataEnrichFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataEnrichFunction.java index 1138213d4357..1a321db3e95e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataEnrichFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataEnrichFunction.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.model.HoodieFlinkRecord; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory; @@ -61,6 +62,10 @@ public O map(I record) throws Exception { // [HUDI-8969] Analyze how to get rid of excessive conversions GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record); final HoodieKey hoodieKey = keyGenerator.getKey(gr); - return (O) new HoodieFlinkRecord(hoodieKey.getRecordKey(), hoodieKey.getPartitionPath(), record); + return (O) new HoodieFlinkRecord( + hoodieKey.getRecordKey(), + hoodieKey.getPartitionPath(), + HoodieOperation.fromValue(record.getRowKind().toByteValue()).getName(), + record); } }