From 1fc8d3b51955e4d4b36e2a4fea455a26be78a740 Mon Sep 17 00:00:00 2001 From: tibrewalpratik Date: Thu, 27 Jun 2024 02:35:36 +0530 Subject: [PATCH] Allow upsert compaction to work properly during schema / indexing updates --- .../pinot/core/common/MinionConstants.java | 6 +++ .../UpsertCompactionTaskExecutor.java | 37 ++++++++++++++++--- .../UpsertCompactionTaskGenerator.java | 4 ++ 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 9b1b89b4b72b..47c6fbdeb413 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -174,5 +174,11 @@ public static class UpsertCompactionTask { * number of segments to query in one batch to fetch valid doc id metadata, by default 500 */ public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest"; + + /** + * skip crc mismatch as deepstore copies are not updated during schema / index changes + * so it's natural for crc to mismatch. + */ + public static final String SKIP_CRC_MISMATCH = "skipCrcMismatch"; } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java index ec5cc127d9f2..5952d2aa97af 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java @@ -75,12 +75,37 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File String originalSegmentCrcFromTaskGenerator = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY); String crcFromDeepStorageSegment = segmentMetadata.getCrc(); String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc(); - if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment) - || !originalSegmentCrcFromTaskGenerator.equals(crcFromValidDocIdsBitmap)) { - LOGGER.warn("CRC mismatch for segment: {}, expected: {}, actual crc from server: {}", segmentName, - crcFromDeepStorageSegment, validDocIdsBitmapResponse.getSegmentCrc()); - return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName) - .build(); + if (!originalSegmentCrcFromTaskGenerator.equals(crcFromValidDocIdsBitmap)) { + // In this scenario, the segment is refreshed or reloaded (due to a schema or index change) between the + // task generation and task execution phases. It is safer to skip this segment for this execution cycle. + // However, we prefer to return it as a task error state so that if this issue occurs in consecutive runs, + // we can identify and address such scenarios. + String message = String.format("CRC mismatch for segment: %s, expected value based on task generator: %s, " + + "actual crc from validDocIdsBitmapResponse: %s", segmentName, originalSegmentCrcFromTaskGenerator, + crcFromValidDocIdsBitmap); + LOGGER.error(message); + throw new IllegalStateException(message); + } + + if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)) { + // We are introducing a skipCrcMismatch option because deepstore copies are not refreshed after schema or + // indexing changes, leading to natural CRC mismatches. If skipCrcMismatch enabled, we can allow all + // segments to be processed once and then disable it again. However, we should develop an intelligent way + // to detect this situation in a self-serve manner. + // Additionally, if skipCrcMismatch is disabled and a CRC mismatch is found, we will mark it as a task-fail error + // instead of marking the task as a success. Otherwise the segment would continue to be picked up in subsequent + // compaction cycles without any execution being actually carried out. + boolean skipCrcMismatch = + Boolean.parseBoolean(configs.get(MinionConstants.UpsertCompactionTask.SKIP_CRC_MISMATCH)); + String message = String.format("CRC mismatch for segment: %s, " + + "expected value based on task generator: %s, actual crc based on deepstore / server metadata copy: %s", + segmentName, originalSegmentCrcFromTaskGenerator, crcFromDeepStorageSegment); + if (skipCrcMismatch) { + LOGGER.warn(message); + } else { + LOGGER.error(message); + throw new IllegalStateException(message); + } } RoaringBitmap validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap()); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java index 0357bca6fe12..4ac06de24eab 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java @@ -59,6 +59,7 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 0.0; private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 1; private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500; + private static final String DEFAULT_SKIP_CRC_MISMATCH = "false"; public static class SegmentSelectionResult { @@ -164,6 +165,8 @@ public List generateTasks(List tableConfigs) { String.format("deleteRecordColumn must be provided for " + "UpsertCompactionTask with validDocIdsType = %s", validDocIdsType)); } + String skipCrcMismatch = taskConfigs.getOrDefault(UpsertCompactionTask.SKIP_CRC_MISMATCH, + DEFAULT_SKIP_CRC_MISMATCH); List validDocIdsMetadataList = serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments, @@ -199,6 +202,7 @@ public List generateTasks(List tableConfigs) { configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments"); configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segment.getCrc())); configs.put(UpsertCompactionTask.VALID_DOC_IDS_TYPE, validDocIdsType.toString()); + configs.put(UpsertCompactionTask.SKIP_CRC_MISMATCH, String.valueOf(skipCrcMismatch)); pinotTaskConfigs.add(new PinotTaskConfig(UpsertCompactionTask.TASK_TYPE, configs)); numTasks++; }