diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java index de4c1e148afc..aed9556b4cbe 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java @@ -55,12 +55,8 @@ public class HoodieFlinkRecord implements Serializable { private final RowData rowData; - public HoodieFlinkRecord(String recordKey, String partitionPath, RowData rowData) { - this(recordKey, partitionPath, "", "", "", false, rowData); - } - - public HoodieFlinkRecord(String recordKey, String partitionPath, boolean isIndexRecord, RowData rowData) { - this(recordKey, partitionPath, "", "", "", isIndexRecord, rowData); + public HoodieFlinkRecord(String recordKey, String partitionPath, String operationType, RowData rowData) { + this(recordKey, partitionPath, "", "", operationType, false, rowData); } // constructor for index records without row data diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataEnrichFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataEnrichFunction.java index 1138213d4357..1a321db3e95e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataEnrichFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataEnrichFunction.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.model.HoodieFlinkRecord; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory; @@ -61,6 +62,10 @@ public O map(I record) throws Exception { // [HUDI-8969] Analyze how to get rid of excessive conversions GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record); final HoodieKey hoodieKey = keyGenerator.getKey(gr); - return (O) new HoodieFlinkRecord(hoodieKey.getRecordKey(), hoodieKey.getPartitionPath(), record); + return (O) new HoodieFlinkRecord( + hoodieKey.getRecordKey(), + hoodieKey.getPartitionPath(), + HoodieOperation.fromValue(record.getRowKind().toByteValue()).getName(), + record); } }