Skip to content

Commit

Permalink
Fix delete record sent due to changed partition path
Browse files Browse the repository at this point in the history
  • Loading branch information
geserdugarov committed Feb 11, 2025
1 parent c00fec1 commit 096983a
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ public void processElement(I record,
// This is processing of delete records from `BucketAssignRowDataFunction::processHoodieFlinkRecord`
HoodieRecordPayload payload;
if (record.getOperationType().equals("D")) {
payload = payloadCreation.createDeletePayload((BaseAvroPayload) gr);
payload = payloadCreation.createDeletePayload((BaseAvroPayload) payloadCreation.createPayload(gr));
} else {
payload = payloadCreation.createPayload(gr);
}
// [HUDI-8968] Use opetationType uniformly instead of instantTime
// [HUDI-8968] Use operationType uniformly instead of instantTime
HoodieOperation operation = HoodieOperation.fromName(record.getInstantTime());
HoodieRecord hoodieRecord = new HoodieAvroRecord<>(hoodieKey, payload, operation);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
Expand All @@ -52,23 +52,22 @@
public class BucketAssignRowDataFunction<K, I extends HoodieFlinkRecord, O extends HoodieFlinkRecord> extends BucketAssignFunction<K, I, O> {

/**
* State for known record keys, which structured as Tuple(partition, fileId, instantTime).
* State for known record keys, which structured as Tuple(partition, fileId).
* If record key is in the state, then update location from the state.
* Otherwise, use the {@link BucketAssigner} to generate a new bucket ID.
*/
private ValueState<Tuple3<StringData, StringData, StringData>> indexState;
private ValueState<Tuple2<StringData, StringData>> indexState;

public BucketAssignRowDataFunction(Configuration config) {
super(config);
}

@Override
public void initializeState(FunctionInitializationContext context) {
ValueStateDescriptor indexStateDesc =
ValueStateDescriptor<Tuple2<StringData, StringData>> indexStateDesc =
new ValueStateDescriptor<>(
"indexState",
new TupleTypeInfo<>(
StringDataTypeInfo.INSTANCE,
StringDataTypeInfo.INSTANCE,
StringDataTypeInfo.INSTANCE));
double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 1000;
Expand Down Expand Up @@ -99,10 +98,9 @@ public void open(Configuration parameters) throws Exception {
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
if (value.isIndexRecord()) {
this.indexState.update(
new Tuple3<>(
new Tuple2<>(
StringData.fromString(value.getPartitionPath()),
StringData.fromString(value.getFileId()),
StringData.fromString(value.getInstantTime())));
StringData.fromString(value.getFileId())));
} else {
processHoodieFlinkRecord(value, out);
}
Expand All @@ -118,33 +116,33 @@ private void processHoodieFlinkRecord(HoodieFlinkRecord record, Collector<O> out
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
// Structured as Tuple(partition, fileId, instantTime).
Tuple3<StringData, StringData, StringData> indexStateValue = indexState.value();
Tuple2<StringData, StringData> indexStateValue = indexState.value();
if (indexStateValue != null) {
// Set up the instant time as "U" to mark the bucket as an update bucket.
String partitionFromState = indexStateValue.getField(0).toString();
String fileIdFromState = indexStateValue.getField(1).toString();
if (!Objects.equals(partitionFromState, partition)) {
// [HUDI-8996] No delete records for Flink upsert if partition path changed
if (globalIndex) {
// if partition path changes, emit a delete record for old partition path,
// then update the index state using location with new partition path.
HoodieFlinkRecord deleteRecord = new HoodieFlinkRecord(recordKey, partitionFromState, row);
deleteRecord.setOperationType("D");
HoodieFlinkRecord deleteRecord =
new HoodieFlinkRecord(recordKey, partitionFromState, fileIdFromState, "U", "D", false, row);
out.collect((O) deleteRecord);
}
location = getNewRecordLocation(partition);
} else {
location = new HoodieRecordLocation("U", indexStateValue.getField(1).toString(), HoodieRecordLocation.INVALID_POSITION);
location = new HoodieRecordLocation("U", fileIdFromState, HoodieRecordLocation.INVALID_POSITION);
this.bucketAssigner.addUpdate(partition, location.getFileId());
}
} else {
location = getNewRecordLocation(partition);
}
// always refresh the index
this.indexState.update(
new Tuple3<>(
new Tuple2<>(
StringData.fromString(partition),
StringData.fromString(location.getFileId()),
StringData.fromString(location.getInstantTime())));
StringData.fromString(location.getFileId())));
} else {
location = getNewRecordLocation(partition);
}
Expand Down

0 comments on commit 096983a

Please sign in to comment.