diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java index 9c2ffa61962a..b3fd851ff41d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java @@ -34,6 +34,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.core.MediaType; @@ -215,14 +216,30 @@ public List getSegmentMetadataFromServer(String tableNameWithType, /** * This method is called when the API request is to fetch validDocId metadata for a list segments of the given table. - * This method will pick a server that hosts the target segment and fetch the segment metadata result. + * This method will pick one server randomly that hosts the target segment and fetch the segment metadata result. * - * @return segment metadata as a JSON string + * @return list of valid doc id metadata, one per segment processed. */ public List getValidDocIdsMetadataFromServer(String tableNameWithType, Map> serverToSegmentsMap, BiMap serverToEndpoints, @Nullable List segmentNames, int timeoutMs, String validDocIdsType, int numSegmentsBatchPerServerRequest) { + return getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegmentsMap, serverToEndpoints, + segmentNames, timeoutMs, validDocIdsType, numSegmentsBatchPerServerRequest).values().stream() + .filter(list -> list != null && !list.isEmpty()).map(list -> list.get(0)).collect(Collectors.toList()); + } + + /** + * This method is called when the API request is to fetch validDocId metadata for a list segments of the given table. + * This method will pick all servers that hosts the target segment and fetch the segment metadata result and + * return as a list. + * + * @return map of segment name to list of valid doc id metadata where each element is every server's metadata. + */ + public Map> getSegmentToValidDocIdsMetadataFromServer(String tableNameWithType, + Map> serverToSegmentsMap, BiMap serverToEndpoints, + @Nullable List segmentNames, int timeoutMs, String validDocIdsType, + int numSegmentsBatchPerServerRequest) { List> serverURLsAndBodies = new ArrayList<>(); for (Map.Entry> serverToSegments : serverToSegmentsMap.entrySet()) { List segmentsForServer = serverToSegments.getValue(); @@ -256,7 +273,7 @@ public List getValidDocIdsMetadataFromServer(String tab completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, tableNameWithType, true, requestHeaders, timeoutMs, null); - Map validDocIdsMetadataInfos = new HashMap<>(); + Map> validDocIdsMetadataInfos = new HashMap<>(); int failedParses = 0; int returnedServerRequestsCount = 0; for (Map.Entry streamResponse : serviceResponse._httpResponses.entrySet()) { @@ -266,7 +283,8 @@ public List getValidDocIdsMetadataFromServer(String tab JsonUtils.stringToObject(validDocIdsMetadataList, new TypeReference>() { }); for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo : validDocIdsMetadataInfoList) { - validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(), validDocIdsMetadataInfo); + validDocIdsMetadataInfos.computeIfAbsent(validDocIdsMetadataInfo.getSegmentName(), k -> new ArrayList<>()) + .add(validDocIdsMetadataInfo); } returnedServerRequestsCount++; } catch (Exception e) { @@ -292,7 +310,7 @@ public List getValidDocIdsMetadataFromServer(String tab LOGGER.info("Retrieved validDocIds metadata for {} segments from {} server requests.", validDocIdsMetadataInfos.size(), returnedServerRequestsCount); - return new ArrayList<>(validDocIdsMetadataInfos.values()); + return validDocIdsMetadataInfos; } /** diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java index 9b7dad195510..55dfb97f981e 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java @@ -23,10 +23,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.helix.HelixAdmin; import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse; +import org.apache.pinot.common.utils.RoaringBitmapUtils; import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; import org.apache.pinot.controller.util.ServerSegmentMetadataReader; @@ -42,6 +44,7 @@ import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -143,32 +146,6 @@ public static String normalizeDirectoryURI(String dirInStr) { return dirInStr; } - public static ValidDocIdsBitmapResponse getValidDocIdsBitmap(String tableNameWithType, String segmentName, - String validDocIdsType, MinionContext minionContext) { - HelixAdmin helixAdmin = minionContext.getHelixManager().getClusterManagmentTool(); - String clusterName = minionContext.getHelixManager().getClusterName(); - - List servers = getServers(segmentName, tableNameWithType, helixAdmin, clusterName); - for (String server : servers) { - InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, server); - String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig); - - // We only need aggregated table size and the total number of docs/rows. Skipping column related stats, by - // passing an empty list. - ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader(); - try { - return serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, segmentName, endpoint, - validDocIdsType, 60_000); - } catch (Exception e) { - LOGGER.warn( - String.format("Unable to retrieve validDocIds bitmap for segment: %s from endpoint: %s", segmentName, - endpoint), e); - } - } - throw new IllegalStateException( - String.format("Unable to retrieve validDocIds bitmap for segment: %s from servers: %s", segmentName, servers)); - } - public static List getServers(String segmentName, String tableNameWithType, HelixAdmin helixAdmin, String clusterName) { ExternalView externalView = helixAdmin.getResourceExternalView(clusterName, tableNameWithType); @@ -206,4 +183,56 @@ public static boolean extractMinionAllowDownloadFromServer(TableConfig tableConf } return defaultValue; } + + /** + * Returns the validDocID bitmap from the server whose local segment crc matches both crc of ZK metadata and + * deepstore copy (expectedCrc). + */ + @Nullable + public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String tableNameWithType, String segmentName, + String validDocIdsType, MinionContext minionContext, String expectedCrc) { + String clusterName = minionContext.getHelixManager().getClusterName(); + HelixAdmin helixAdmin = minionContext.getHelixManager().getClusterManagmentTool(); + RoaringBitmap validDocIds = null; + List servers = getServers(segmentName, tableNameWithType, helixAdmin, clusterName); + for (String server : servers) { + InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, server); + String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig); + + // We only need aggregated table size and the total number of docs/rows. Skipping column related stats, by + // passing an empty list. + ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader(); + ValidDocIdsBitmapResponse validDocIdsBitmapResponse; + try { + validDocIdsBitmapResponse = + serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, segmentName, endpoint, + validDocIdsType, 60_000); + } catch (Exception e) { + LOGGER.warn( + String.format("Unable to retrieve validDocIds bitmap for segment: %s from endpoint: %s", segmentName, + endpoint), e); + continue; + } + + // Check crc from the downloaded segment against the crc returned from the server along with the valid doc id + // bitmap. If this doesn't match, this means that we are hitting the race condition where the segment has been + // uploaded successfully while the server is still reloading the segment. Reloading can take a while when the + // offheap upsert is used because we will need to delete & add all primary keys. + // `BaseSingleSegmentConversionExecutor.executeTask()` already checks for the crc from the task generator + // against the crc from the current segment zk metadata, so we don't need to check that here. + String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc(); + if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) { + // In this scenario, we are hitting the other replica of the segment which did not commit to ZK or deepstore. + // We will skip processing this bitmap to query other server to confirm if there is a valid matching CRC. + String message = String.format("CRC mismatch for segment: %s, expected value based on task generator: %s, " + + "actual crc from validDocIdsBitmapResponse from endpoint %s: %s", segmentName, expectedCrc, endpoint, + crcFromValidDocIdsBitmap); + LOGGER.warn(message); + continue; + } + validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap()); + break; + } + return validDocIds; + } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java index ec5cc127d9f2..e683214f4352 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java @@ -23,9 +23,7 @@ import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; -import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse; import org.apache.pinot.common.restlet.resources.ValidDocIdsType; -import org.apache.pinot.common.utils.RoaringBitmapUtils; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor; @@ -60,31 +58,28 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File String validDocIdsTypeStr = configs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_TYPE, ValidDocIdsType.SNAPSHOT.name()); - ValidDocIdsType validDocIdsType = ValidDocIdsType.valueOf(validDocIdsTypeStr.toUpperCase()); - ValidDocIdsBitmapResponse validDocIdsBitmapResponse = - MinionTaskUtils.getValidDocIdsBitmap(tableNameWithType, segmentName, validDocIdsType.toString(), - MINION_CONTEXT); - - // Check crc from the downloaded segment against the crc returned from the server along with the valid doc id - // bitmap. If this doesn't match, this means that we are hitting the race condition where the segment has been - // uploaded successfully while the server is still reloading the segment. Reloading can take a while when the - // offheap upsert is used because we will need to delete & add all primary keys. - // `BaseSingleSegmentConversionExecutor.executeTask()` already checks for the crc from the task generator - // against the crc from the current segment zk metadata, so we don't need to check that here. SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir); String originalSegmentCrcFromTaskGenerator = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY); String crcFromDeepStorageSegment = segmentMetadata.getCrc(); - String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc(); - if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment) - || !originalSegmentCrcFromTaskGenerator.equals(crcFromValidDocIdsBitmap)) { - LOGGER.warn("CRC mismatch for segment: {}, expected: {}, actual crc from server: {}", segmentName, - crcFromDeepStorageSegment, validDocIdsBitmapResponse.getSegmentCrc()); - return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName) - .build(); + if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)) { + String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc " + + "from ZK: %s, crc from deepstore: %s", segmentName, originalSegmentCrcFromTaskGenerator, + crcFromDeepStorageSegment); + LOGGER.error(message); + throw new IllegalStateException(message); + } + RoaringBitmap validDocIds = + MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName, validDocIdsTypeStr, + MINION_CONTEXT, originalSegmentCrcFromTaskGenerator); + if (validDocIds == null) { + // no valid crc match found or no validDocIds obtained from all servers + // error out the task instead of silently failing so that we can track it via task-error metrics + String message = String.format("No validDocIds found from all servers. They either failed to download " + + "or did not match crc from segment copy obtained from deepstore / servers. " + "Expected crc: %s", + originalSegmentCrcFromTaskGenerator); + LOGGER.error(message); + throw new IllegalStateException(message); } - - RoaringBitmap validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap()); - if (validDocIds.isEmpty()) { // prevents empty segment generation LOGGER.info("validDocIds is empty, skip the task. Table: {}, segment: {}", tableNameWithType, segmentName); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java index 0357bca6fe12..64cbe03fe818 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java @@ -165,8 +165,8 @@ public List generateTasks(List tableConfigs) { validDocIdsType)); } - List validDocIdsMetadataList = - serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments, + Map> validDocIdsMetadataList = + serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments, serverToEndpoints, null, 60_000, validDocIdsType.toString(), numSegmentsBatchPerServerRequest); Map completedSegmentsMap = @@ -209,7 +209,8 @@ public List generateTasks(List tableConfigs) { @VisibleForTesting public static SegmentSelectionResult processValidDocIdsMetadata(Map taskConfigs, - Map completedSegmentsMap, List validDocIdsMetadataInfoList) { + Map completedSegmentsMap, + Map> validDocIdsMetadataInfoMap) { double invalidRecordsThresholdPercent = Double.parseDouble( taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT, String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT))); @@ -218,30 +219,31 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map> segmentsForCompaction = new ArrayList<>(); List segmentsForDeletion = new ArrayList<>(); - for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoList) { - long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs(); - String segmentName = validDocIdsMetadata.getSegmentName(); - - // Skip segments if the crc from zk metadata and server does not match. They may be being reloaded. - SegmentZKMetadata segment = completedSegmentsMap.get(segmentName); - if (segment == null) { + for (String segmentName : validDocIdsMetadataInfoMap.keySet()) { + // check if segment is part of completed segments + if (!completedSegmentsMap.containsKey(segmentName)) { LOGGER.warn("Segment {} is not found in the completed segments list, skipping it for compaction", segmentName); continue; } + SegmentZKMetadata segment = completedSegmentsMap.get(segmentName); + for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoMap.get(segmentName)) { + long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs(); - if (segment.getCrc() != Long.parseLong(validDocIdsMetadata.getSegmentCrc())) { - LOGGER.warn( - "CRC mismatch for segment: {}, skipping it for compaction (segmentZKMetadata={}, validDocIdsMetadata={})", - segmentName, segment.getCrc(), validDocIdsMetadata.getSegmentCrc()); - continue; - } - long totalDocs = validDocIdsMetadata.getTotalDocs(); - double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100; - if (totalInvalidDocs == totalDocs) { - segmentsForDeletion.add(segment.getSegmentName()); - } else if (invalidRecordPercent >= invalidRecordsThresholdPercent - && totalInvalidDocs >= invalidRecordsThresholdCount) { - segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs)); + // Skip segments if the crc from zk metadata and server does not match. They may be being reloaded. + if (segment.getCrc() != Long.parseLong(validDocIdsMetadata.getSegmentCrc())) { + LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={}, validDocIdsMetadata={})", segmentName, + segment.getCrc(), validDocIdsMetadata.getSegmentCrc()); + continue; + } + long totalDocs = validDocIdsMetadata.getTotalDocs(); + double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100; + if (totalInvalidDocs == totalDocs) { + segmentsForDeletion.add(segment.getSegmentName()); + } else if (invalidRecordPercent >= invalidRecordsThresholdPercent + && totalInvalidDocs >= invalidRecordsThresholdCount) { + segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs)); + } + break; } } segmentsForCompaction.sort((o1, o2) -> { @@ -254,8 +256,7 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map(); _completedSegmentsMap.put(_completedSegment.getSegmentName(), _completedSegment); @@ -231,24 +233,27 @@ public void testGetCompletedSegments() { public void testProcessValidDocIdsMetadata() throws IOException { Map compactionConfigs = getCompactionConfigs("1", "10"); - String json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \"" - + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + ", \"segmentCrc\": \"" - + _completedSegment.getCrc() + "\"}," + "{" + "\"totalValidDocs\" : 0," + "\"totalInvalidDocs\" : 10," - + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\", " + "\"segmentCrc\" : \"" - + _completedSegment2.getCrc() + "\"," + "\"totalDocs\" : 10" + "}]"; - - List validDocIdsMetadataInfo = - JsonUtils.stringToObject(json, new TypeReference>() { + String json = "{\"testTable__0\": [{\"totalValidDocs\": 50, \"totalInvalidDocs\": 50, " + + "\"segmentName\": \"testTable__0\", \"totalDocs\": 100, \"segmentCrc\": \"1000\"}], " + + "\"testTable__1\": [{\"totalValidDocs\": 0, " + + "\"totalInvalidDocs\": 10, \"segmentName\": \"testTable__1\", \"totalDocs\": 10, \"segmentCrc\": \"2000\"}]}"; + + Map> validDocIdsMetadataInfo = + JsonUtils.stringToObject(json, new TypeReference<>() { }); + // no completed segments scenario, there shouldn't be any segment selected for compaction UpsertCompactionTaskGenerator.SegmentSelectionResult segmentSelectionResult = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, new HashMap<>(), validDocIdsMetadataInfo); assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 0); + // test with valid crc and thresholds segmentSelectionResult = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, validDocIdsMetadataInfo); + assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1); + assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1); assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), _completedSegment.getSegmentName()); assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName()); @@ -259,6 +264,7 @@ public void testProcessValidDocIdsMetadata() UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, validDocIdsMetadataInfo); assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty()); + assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1); assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName()); // test without an invalidRecordsThresholdPercent @@ -266,6 +272,8 @@ public void testProcessValidDocIdsMetadata() segmentSelectionResult = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, validDocIdsMetadataInfo); + assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1); + assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1); assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), _completedSegment.getSegmentName()); assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName()); @@ -275,18 +283,19 @@ public void testProcessValidDocIdsMetadata() segmentSelectionResult = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, validDocIdsMetadataInfo); + assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1); + assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1); assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), _completedSegment.getSegmentName()); assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName()); // Test the case where the completedSegment from api has different crc than segment from zk metadata. - json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \"" - + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + ", \"segmentCrc\": \"" - + "1234567890" + "\"}," + "{" + "\"totalValidDocs\" : 0," + "\"totalInvalidDocs\" : 10," - + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\", " + "\"segmentCrc\" : \"" - + _completedSegment2.getCrc() + "\"," - + "\"totalDocs\" : 10" + "}]"; - validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new TypeReference>() { + json = "{\"" + _completedSegment.getSegmentName() + "\": [{\"totalValidDocs\": 50, \"totalInvalidDocs\": 50, " + + "\"segmentName\": \"" + _completedSegment.getSegmentName() + "\", \"totalDocs\": 100, \"segmentCrc\": " + + "\"1234567890\"}], \"" + _completedSegment2.getSegmentName() + "\": [{\"totalValidDocs\": 0, " + + "\"totalInvalidDocs\": 10, \"segmentName\": \"" + _completedSegment2.getSegmentName() + "\", " + + "\"segmentCrc\": \"" + _completedSegment2.getCrc() + "\", \"totalDocs\": 10}]}"; + validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new TypeReference<>() { }); segmentSelectionResult = UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap, @@ -301,12 +310,13 @@ public void testProcessValidDocIdsMetadata() _completedSegment2.getSegmentName()); // check if both the candidates for compaction are coming in sorted descending order - json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \"" - + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + ", \"segmentCrc\": \"" - + _completedSegment.getCrc() + "\"}," + "{" + "\"totalValidDocs\" : 10," + "\"totalInvalidDocs\" : 40," - + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\", " + "\"segmentCrc\" : \"" - + _completedSegment2.getCrc() + "\"," + "\"totalDocs\" : 50" + "}]"; - validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new TypeReference>() { + json = "{\"" + _completedSegment.getSegmentName() + "\": [{\"totalValidDocs\": 50, \"totalInvalidDocs\": 50, " + + "\"segmentName\": \"" + _completedSegment.getSegmentName() + "\", \"totalDocs\": 100, \"segmentCrc\": \"" + + _completedSegment.getCrc() + "\"}], \"" + _completedSegment2.getSegmentName() + "\": " + + "[{\"totalValidDocs\": 10, \"totalInvalidDocs\": 40, \"segmentName\": \"" + + _completedSegment2.getSegmentName() + "\", \"segmentCrc\": \"" + _completedSegment2.getCrc() + "\", " + + "\"totalDocs\": 50}]}"; + validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new TypeReference<>() { }); compactionConfigs = getCompactionConfigs("30", "0"); segmentSelectionResult =