Skip to content

Commit

Permalink
Allow upsert compaction to work properly during schema / indexing upd…
Browse files Browse the repository at this point in the history
…ates
  • Loading branch information
tibrewalpratik17 committed Jun 26, 2024
1 parent 7dbb05d commit 1fc8d3b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,11 @@ public static class UpsertCompactionTask {
* number of segments to query in one batch to fetch valid doc id metadata, by default 500
*/
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest";

/**
* skip crc mismatch as deepstore copies are not updated during schema / index changes
* so it's natural for crc to mismatch.
*/
public static final String SKIP_CRC_MISMATCH = "skipCrcMismatch";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,37 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
String originalSegmentCrcFromTaskGenerator = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
String crcFromDeepStorageSegment = segmentMetadata.getCrc();
String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc();
if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)
|| !originalSegmentCrcFromTaskGenerator.equals(crcFromValidDocIdsBitmap)) {
LOGGER.warn("CRC mismatch for segment: {}, expected: {}, actual crc from server: {}", segmentName,
crcFromDeepStorageSegment, validDocIdsBitmapResponse.getSegmentCrc());
return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
.build();
if (!originalSegmentCrcFromTaskGenerator.equals(crcFromValidDocIdsBitmap)) {
// In this scenario, the segment is refreshed or reloaded (due to a schema or index change) between the
// task generation and task execution phases. It is safer to skip this segment for this execution cycle.
// However, we prefer to return it as a task error state so that if this issue occurs in consecutive runs,
// we can identify and address such scenarios.
String message = String.format("CRC mismatch for segment: %s, expected value based on task generator: %s, "
+ "actual crc from validDocIdsBitmapResponse: %s", segmentName, originalSegmentCrcFromTaskGenerator,
crcFromValidDocIdsBitmap);
LOGGER.error(message);
throw new IllegalStateException(message);
}

if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)) {
// We are introducing a skipCrcMismatch option because deepstore copies are not refreshed after schema or
// indexing changes, leading to natural CRC mismatches. If skipCrcMismatch enabled, we can allow all
// segments to be processed once and then disable it again. However, we should develop an intelligent way
// to detect this situation in a self-serve manner.
// Additionally, if skipCrcMismatch is disabled and a CRC mismatch is found, we will mark it as a task-fail error
// instead of marking the task as a success. Otherwise the segment would continue to be picked up in subsequent
// compaction cycles without any execution being actually carried out.
boolean skipCrcMismatch =
Boolean.parseBoolean(configs.get(MinionConstants.UpsertCompactionTask.SKIP_CRC_MISMATCH));
String message = String.format("CRC mismatch for segment: %s, "
+ "expected value based on task generator: %s, actual crc based on deepstore / server metadata copy: %s",
segmentName, originalSegmentCrcFromTaskGenerator, crcFromDeepStorageSegment);
if (skipCrcMismatch) {
LOGGER.warn(message);
} else {
LOGGER.error(message);
throw new IllegalStateException(message);
}
}

RoaringBitmap validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 0.0;
private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 1;
private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500;
private static final String DEFAULT_SKIP_CRC_MISMATCH = "false";

public static class SegmentSelectionResult {

Expand Down Expand Up @@ -164,6 +165,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
String.format("deleteRecordColumn must be provided for " + "UpsertCompactionTask with validDocIdsType = %s",
validDocIdsType));
}
String skipCrcMismatch = taskConfigs.getOrDefault(UpsertCompactionTask.SKIP_CRC_MISMATCH,
DEFAULT_SKIP_CRC_MISMATCH);

List<ValidDocIdsMetadataInfo> validDocIdsMetadataList =
serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments,
Expand Down Expand Up @@ -199,6 +202,7 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segment.getCrc()));
configs.put(UpsertCompactionTask.VALID_DOC_IDS_TYPE, validDocIdsType.toString());
configs.put(UpsertCompactionTask.SKIP_CRC_MISMATCH, String.valueOf(skipCrcMismatch));
pinotTaskConfigs.add(new PinotTaskConfig(UpsertCompactionTask.TASK_TYPE, configs));
numTasks++;
}
Expand Down

0 comments on commit 1fc8d3b

Please sign in to comment.