Skip to content

Commit

Permalink
Make upsert compaction task more robust to crc mismatch (#13489)
Browse files Browse the repository at this point in the history
* Allow upsert compaction to work properly during schema / indexing updates

* iterate through all servers

* improve upsert compaction task generator logic

* address comments - refactor

* address comments - more refactoring

* address comments -- undo some refactoring

* minor address
  • Loading branch information
tibrewalpratik17 authored Jul 12, 2024
1 parent dacc6d0 commit 1fad53a
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.MediaType;
Expand Down Expand Up @@ -215,14 +216,30 @@ public List<String> getSegmentMetadataFromServer(String tableNameWithType,

/**
* This method is called when the API request is to fetch validDocId metadata for a list segments of the given table.
* This method will pick a server that hosts the target segment and fetch the segment metadata result.
* This method will pick one server randomly that hosts the target segment and fetch the segment metadata result.
*
* @return segment metadata as a JSON string
* @return list of valid doc id metadata, one per segment processed.
*/
public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tableNameWithType,
Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> serverToEndpoints,
@Nullable List<String> segmentNames, int timeoutMs, String validDocIdsType,
int numSegmentsBatchPerServerRequest) {
return getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegmentsMap, serverToEndpoints,
segmentNames, timeoutMs, validDocIdsType, numSegmentsBatchPerServerRequest).values().stream()
.filter(list -> list != null && !list.isEmpty()).map(list -> list.get(0)).collect(Collectors.toList());
}

/**
* This method is called when the API request is to fetch validDocId metadata for a list segments of the given table.
* This method will pick all servers that hosts the target segment and fetch the segment metadata result and
* return as a list.
*
* @return map of segment name to list of valid doc id metadata where each element is every server's metadata.
*/
public Map<String, List<ValidDocIdsMetadataInfo>> getSegmentToValidDocIdsMetadataFromServer(String tableNameWithType,
Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> serverToEndpoints,
@Nullable List<String> segmentNames, int timeoutMs, String validDocIdsType,
int numSegmentsBatchPerServerRequest) {
List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
for (Map.Entry<String, List<String>> serverToSegments : serverToSegmentsMap.entrySet()) {
List<String> segmentsForServer = serverToSegments.getValue();
Expand Down Expand Up @@ -256,7 +273,7 @@ public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tab
completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, tableNameWithType, true, requestHeaders,
timeoutMs, null);

Map<String, ValidDocIdsMetadataInfo> validDocIdsMetadataInfos = new HashMap<>();
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfos = new HashMap<>();
int failedParses = 0;
int returnedServerRequestsCount = 0;
for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
Expand All @@ -266,7 +283,8 @@ public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tab
JsonUtils.stringToObject(validDocIdsMetadataList, new TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
});
for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo : validDocIdsMetadataInfoList) {
validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(), validDocIdsMetadataInfo);
validDocIdsMetadataInfos.computeIfAbsent(validDocIdsMetadataInfo.getSegmentName(), k -> new ArrayList<>())
.add(validDocIdsMetadataInfo);
}
returnedServerRequestsCount++;
} catch (Exception e) {
Expand All @@ -292,7 +310,7 @@ public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tab

LOGGER.info("Retrieved validDocIds metadata for {} segments from {} server requests.",
validDocIdsMetadataInfos.size(), returnedServerRequestsCount);
return new ArrayList<>(validDocIdsMetadataInfos.values());
return validDocIdsMetadataInfos;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
Expand All @@ -42,6 +44,7 @@
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -143,32 +146,6 @@ public static String normalizeDirectoryURI(String dirInStr) {
return dirInStr;
}

public static ValidDocIdsBitmapResponse getValidDocIdsBitmap(String tableNameWithType, String segmentName,
String validDocIdsType, MinionContext minionContext) {
HelixAdmin helixAdmin = minionContext.getHelixManager().getClusterManagmentTool();
String clusterName = minionContext.getHelixManager().getClusterName();

List<String> servers = getServers(segmentName, tableNameWithType, helixAdmin, clusterName);
for (String server : servers) {
InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, server);
String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);

// We only need aggregated table size and the total number of docs/rows. Skipping column related stats, by
// passing an empty list.
ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
try {
return serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, segmentName, endpoint,
validDocIdsType, 60_000);
} catch (Exception e) {
LOGGER.warn(
String.format("Unable to retrieve validDocIds bitmap for segment: %s from endpoint: %s", segmentName,
endpoint), e);
}
}
throw new IllegalStateException(
String.format("Unable to retrieve validDocIds bitmap for segment: %s from servers: %s", segmentName, servers));
}

public static List<String> getServers(String segmentName, String tableNameWithType, HelixAdmin helixAdmin,
String clusterName) {
ExternalView externalView = helixAdmin.getResourceExternalView(clusterName, tableNameWithType);
Expand Down Expand Up @@ -206,4 +183,56 @@ public static boolean extractMinionAllowDownloadFromServer(TableConfig tableConf
}
return defaultValue;
}

/**
* Returns the validDocID bitmap from the server whose local segment crc matches both crc of ZK metadata and
* deepstore copy (expectedCrc).
*/
@Nullable
public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String tableNameWithType, String segmentName,
String validDocIdsType, MinionContext minionContext, String expectedCrc) {
String clusterName = minionContext.getHelixManager().getClusterName();
HelixAdmin helixAdmin = minionContext.getHelixManager().getClusterManagmentTool();
RoaringBitmap validDocIds = null;
List<String> servers = getServers(segmentName, tableNameWithType, helixAdmin, clusterName);
for (String server : servers) {
InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, server);
String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);

// We only need aggregated table size and the total number of docs/rows. Skipping column related stats, by
// passing an empty list.
ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
try {
validDocIdsBitmapResponse =
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, segmentName, endpoint,
validDocIdsType, 60_000);
} catch (Exception e) {
LOGGER.warn(
String.format("Unable to retrieve validDocIds bitmap for segment: %s from endpoint: %s", segmentName,
endpoint), e);
continue;
}

// Check crc from the downloaded segment against the crc returned from the server along with the valid doc id
// bitmap. If this doesn't match, this means that we are hitting the race condition where the segment has been
// uploaded successfully while the server is still reloading the segment. Reloading can take a while when the
// offheap upsert is used because we will need to delete & add all primary keys.
// `BaseSingleSegmentConversionExecutor.executeTask()` already checks for the crc from the task generator
// against the crc from the current segment zk metadata, so we don't need to check that here.
String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc();
if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) {
// In this scenario, we are hitting the other replica of the segment which did not commit to ZK or deepstore.
// We will skip processing this bitmap to query other server to confirm if there is a valid matching CRC.
String message = String.format("CRC mismatch for segment: %s, expected value based on task generator: %s, "
+ "actual crc from validDocIdsBitmapResponse from endpoint %s: %s", segmentName, expectedCrc, endpoint,
crcFromValidDocIdsBitmap);
LOGGER.warn(message);
continue;
}
validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
break;
}
return validDocIds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
Expand Down Expand Up @@ -60,31 +58,28 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File

String validDocIdsTypeStr =
configs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_TYPE, ValidDocIdsType.SNAPSHOT.name());
ValidDocIdsType validDocIdsType = ValidDocIdsType.valueOf(validDocIdsTypeStr.toUpperCase());
ValidDocIdsBitmapResponse validDocIdsBitmapResponse =
MinionTaskUtils.getValidDocIdsBitmap(tableNameWithType, segmentName, validDocIdsType.toString(),
MINION_CONTEXT);

// Check crc from the downloaded segment against the crc returned from the server along with the valid doc id
// bitmap. If this doesn't match, this means that we are hitting the race condition where the segment has been
// uploaded successfully while the server is still reloading the segment. Reloading can take a while when the
// offheap upsert is used because we will need to delete & add all primary keys.
// `BaseSingleSegmentConversionExecutor.executeTask()` already checks for the crc from the task generator
// against the crc from the current segment zk metadata, so we don't need to check that here.
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
String originalSegmentCrcFromTaskGenerator = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
String crcFromDeepStorageSegment = segmentMetadata.getCrc();
String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc();
if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)
|| !originalSegmentCrcFromTaskGenerator.equals(crcFromValidDocIdsBitmap)) {
LOGGER.warn("CRC mismatch for segment: {}, expected: {}, actual crc from server: {}", segmentName,
crcFromDeepStorageSegment, validDocIdsBitmapResponse.getSegmentCrc());
return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
.build();
if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)) {
String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc "
+ "from ZK: %s, crc from deepstore: %s", segmentName, originalSegmentCrcFromTaskGenerator,
crcFromDeepStorageSegment);
LOGGER.error(message);
throw new IllegalStateException(message);
}
RoaringBitmap validDocIds =
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName, validDocIdsTypeStr,
MINION_CONTEXT, originalSegmentCrcFromTaskGenerator);
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track it via task-error metrics
String message = String.format("No validDocIds found from all servers. They either failed to download "
+ "or did not match crc from segment copy obtained from deepstore / servers. " + "Expected crc: %s",
originalSegmentCrcFromTaskGenerator);
LOGGER.error(message);
throw new IllegalStateException(message);
}

RoaringBitmap validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());

if (validDocIds.isEmpty()) {
// prevents empty segment generation
LOGGER.info("validDocIds is empty, skip the task. Table: {}, segment: {}", tableNameWithType, segmentName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
validDocIdsType));
}

List<ValidDocIdsMetadataInfo> validDocIdsMetadataList =
serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments,
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments,
serverToEndpoints, null, 60_000, validDocIdsType.toString(), numSegmentsBatchPerServerRequest);

Map<String, SegmentZKMetadata> completedSegmentsMap =
Expand Down Expand Up @@ -209,7 +209,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {

@VisibleForTesting
public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, String> taskConfigs,
Map<String, SegmentZKMetadata> completedSegmentsMap, List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfoList) {
Map<String, SegmentZKMetadata> completedSegmentsMap,
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap) {
double invalidRecordsThresholdPercent = Double.parseDouble(
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
Expand All @@ -218,30 +219,31 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, Stri
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
List<Pair<SegmentZKMetadata, Long>> segmentsForCompaction = new ArrayList<>();
List<String> segmentsForDeletion = new ArrayList<>();
for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoList) {
long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
String segmentName = validDocIdsMetadata.getSegmentName();

// Skip segments if the crc from zk metadata and server does not match. They may be being reloaded.
SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
if (segment == null) {
for (String segmentName : validDocIdsMetadataInfoMap.keySet()) {
// check if segment is part of completed segments
if (!completedSegmentsMap.containsKey(segmentName)) {
LOGGER.warn("Segment {} is not found in the completed segments list, skipping it for compaction", segmentName);
continue;
}
SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoMap.get(segmentName)) {
long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();

if (segment.getCrc() != Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
LOGGER.warn(
"CRC mismatch for segment: {}, skipping it for compaction (segmentZKMetadata={}, validDocIdsMetadata={})",
segmentName, segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
continue;
}
long totalDocs = validDocIdsMetadata.getTotalDocs();
double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100;
if (totalInvalidDocs == totalDocs) {
segmentsForDeletion.add(segment.getSegmentName());
} else if (invalidRecordPercent >= invalidRecordsThresholdPercent
&& totalInvalidDocs >= invalidRecordsThresholdCount) {
segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
// Skip segments if the crc from zk metadata and server does not match. They may be being reloaded.
if (segment.getCrc() != Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={}, validDocIdsMetadata={})", segmentName,
segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
continue;
}
long totalDocs = validDocIdsMetadata.getTotalDocs();
double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100;
if (totalInvalidDocs == totalDocs) {
segmentsForDeletion.add(segment.getSegmentName());
} else if (invalidRecordPercent >= invalidRecordsThresholdPercent
&& totalInvalidDocs >= invalidRecordsThresholdCount) {
segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
}
break;
}
}
segmentsForCompaction.sort((o1, o2) -> {
Expand All @@ -254,8 +256,7 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, Stri
});

return new SegmentSelectionResult(
segmentsForCompaction.stream().map(Map.Entry::getKey).collect(Collectors.toList()),
segmentsForDeletion);
segmentsForCompaction.stream().map(Map.Entry::getKey).collect(Collectors.toList()), segmentsForDeletion);
}

@VisibleForTesting
Expand Down
Loading

0 comments on commit 1fad53a

Please sign in to comment.