Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make upsert compaction task more robust to crc mismatch #13489

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.MediaType;
Expand Down Expand Up @@ -215,14 +216,30 @@ public List<String> getSegmentMetadataFromServer(String tableNameWithType,

/**
* This method is called when the API request is to fetch validDocId metadata for a list segments of the given table.
* This method will pick a server that hosts the target segment and fetch the segment metadata result.
* This method will pick one server randomly that hosts the target segment and fetch the segment metadata result.
*
* @return segment metadata as a JSON string
* @return list of valid doc id metadata, one per segment processed.
*/
public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tableNameWithType,
Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> serverToEndpoints,
@Nullable List<String> segmentNames, int timeoutMs, String validDocIdsType,
int numSegmentsBatchPerServerRequest) {
return getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegmentsMap, serverToEndpoints,
segmentNames, timeoutMs, validDocIdsType, numSegmentsBatchPerServerRequest).values().stream()
.filter(list -> list != null && !list.isEmpty()).map(list -> list.get(0)).collect(Collectors.toList());
}

/**
* This method is called when the API request is to fetch validDocId metadata for a list segments of the given table.
* This method will pick all servers that hosts the target segment and fetch the segment metadata result and
* return as a list.
*
* @return map of segment name to list of valid doc id metadata where each element is every server's metadata.
*/
public Map<String, List<ValidDocIdsMetadataInfo>> getSegmentToValidDocIdsMetadataFromServer(String tableNameWithType,
Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> serverToEndpoints,
@Nullable List<String> segmentNames, int timeoutMs, String validDocIdsType,
int numSegmentsBatchPerServerRequest) {
List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
for (Map.Entry<String, List<String>> serverToSegments : serverToSegmentsMap.entrySet()) {
List<String> segmentsForServer = serverToSegments.getValue();
Expand Down Expand Up @@ -256,7 +273,7 @@ public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tab
completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, tableNameWithType, true, requestHeaders,
timeoutMs, null);

Map<String, ValidDocIdsMetadataInfo> validDocIdsMetadataInfos = new HashMap<>();
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfos = new HashMap<>();
int failedParses = 0;
int returnedServerRequestsCount = 0;
for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
Expand All @@ -266,7 +283,8 @@ public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tab
JsonUtils.stringToObject(validDocIdsMetadataList, new TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
});
for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo : validDocIdsMetadataInfoList) {
validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(), validDocIdsMetadataInfo);
validDocIdsMetadataInfos.computeIfAbsent(validDocIdsMetadataInfo.getSegmentName(), k -> new ArrayList<>())
.add(validDocIdsMetadataInfo);
}
returnedServerRequestsCount++;
} catch (Exception e) {
Expand All @@ -292,7 +310,7 @@ public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tab

LOGGER.info("Retrieved validDocIds metadata for {} segments from {} server requests.",
validDocIdsMetadataInfos.size(), returnedServerRequestsCount);
return new ArrayList<>(validDocIdsMetadataInfos.values());
return validDocIdsMetadataInfos;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
Expand All @@ -42,6 +44,7 @@
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -143,32 +146,6 @@ public static String normalizeDirectoryURI(String dirInStr) {
return dirInStr;
}

public static ValidDocIdsBitmapResponse getValidDocIdsBitmap(String tableNameWithType, String segmentName,
String validDocIdsType, MinionContext minionContext) {
HelixAdmin helixAdmin = minionContext.getHelixManager().getClusterManagmentTool();
String clusterName = minionContext.getHelixManager().getClusterName();

List<String> servers = getServers(segmentName, tableNameWithType, helixAdmin, clusterName);
for (String server : servers) {
InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, server);
String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);

// We only need aggregated table size and the total number of docs/rows. Skipping column related stats, by
// passing an empty list.
ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
try {
return serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, segmentName, endpoint,
validDocIdsType, 60_000);
} catch (Exception e) {
LOGGER.warn(
String.format("Unable to retrieve validDocIds bitmap for segment: %s from endpoint: %s", segmentName,
endpoint), e);
}
}
throw new IllegalStateException(
String.format("Unable to retrieve validDocIds bitmap for segment: %s from servers: %s", segmentName, servers));
}

public static List<String> getServers(String segmentName, String tableNameWithType, HelixAdmin helixAdmin,
String clusterName) {
ExternalView externalView = helixAdmin.getResourceExternalView(clusterName, tableNameWithType);
Expand Down Expand Up @@ -206,4 +183,56 @@ public static boolean extractMinionAllowDownloadFromServer(TableConfig tableConf
}
return defaultValue;
}

/**
* Returns the validDocID bitmap from the server whose local segment crc matches both crc of ZK metadata and
* deepstore copy (expectedCrc).
*/
@Nullable
public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String tableNameWithType, String segmentName,
String validDocIdsType, MinionContext minionContext, String expectedCrc) {
String clusterName = minionContext.getHelixManager().getClusterName();
HelixAdmin helixAdmin = minionContext.getHelixManager().getClusterManagmentTool();
RoaringBitmap validDocIds = null;
List<String> servers = getServers(segmentName, tableNameWithType, helixAdmin, clusterName);
for (String server : servers) {
InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, server);
String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);

// We only need aggregated table size and the total number of docs/rows. Skipping column related stats, by
// passing an empty list.
ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
try {
validDocIdsBitmapResponse =
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, segmentName, endpoint,
validDocIdsType, 60_000);
} catch (Exception e) {
LOGGER.warn(
String.format("Unable to retrieve validDocIds bitmap for segment: %s from endpoint: %s", segmentName,
endpoint), e);
continue;
}

// Check crc from the downloaded segment against the crc returned from the server along with the valid doc id
// bitmap. If this doesn't match, this means that we are hitting the race condition where the segment has been
// uploaded successfully while the server is still reloading the segment. Reloading can take a while when the
// offheap upsert is used because we will need to delete & add all primary keys.
// `BaseSingleSegmentConversionExecutor.executeTask()` already checks for the crc from the task generator
// against the crc from the current segment zk metadata, so we don't need to check that here.
String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc();
if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) {
// In this scenario, we are hitting the other replica of the segment which did not commit to ZK or deepstore.
// We will skip processing this bitmap to query other server to confirm if there is a valid matching CRC.
String message = String.format("CRC mismatch for segment: %s, expected value based on task generator: %s, "
+ "actual crc from validDocIdsBitmapResponse from endpoint %s: %s", segmentName, expectedCrc, endpoint,
crcFromValidDocIdsBitmap);
LOGGER.warn(message);
continue;
}
validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
tibrewalpratik17 marked this conversation as resolved.
Show resolved Hide resolved
break;
}
return validDocIds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
Expand Down Expand Up @@ -60,31 +58,28 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File

String validDocIdsTypeStr =
configs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_TYPE, ValidDocIdsType.SNAPSHOT.name());
ValidDocIdsType validDocIdsType = ValidDocIdsType.valueOf(validDocIdsTypeStr.toUpperCase());
ValidDocIdsBitmapResponse validDocIdsBitmapResponse =
MinionTaskUtils.getValidDocIdsBitmap(tableNameWithType, segmentName, validDocIdsType.toString(),
MINION_CONTEXT);

// Check crc from the downloaded segment against the crc returned from the server along with the valid doc id
// bitmap. If this doesn't match, this means that we are hitting the race condition where the segment has been
// uploaded successfully while the server is still reloading the segment. Reloading can take a while when the
// offheap upsert is used because we will need to delete & add all primary keys.
// `BaseSingleSegmentConversionExecutor.executeTask()` already checks for the crc from the task generator
// against the crc from the current segment zk metadata, so we don't need to check that here.
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
String originalSegmentCrcFromTaskGenerator = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
String crcFromDeepStorageSegment = segmentMetadata.getCrc();
String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc();
if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)
|| !originalSegmentCrcFromTaskGenerator.equals(crcFromValidDocIdsBitmap)) {
LOGGER.warn("CRC mismatch for segment: {}, expected: {}, actual crc from server: {}", segmentName,
crcFromDeepStorageSegment, validDocIdsBitmapResponse.getSegmentCrc());
return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
.build();
if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)) {
String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc "
+ "from ZK: %s, crc from deepstore: %s", segmentName, originalSegmentCrcFromTaskGenerator,
crcFromDeepStorageSegment);
LOGGER.error(message);
throw new IllegalStateException(message);
}
RoaringBitmap validDocIds =
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName, validDocIdsTypeStr,
MINION_CONTEXT, originalSegmentCrcFromTaskGenerator);
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track it via task-error metrics
String message = String.format("No validDocIds found from all servers. They either failed to download "
+ "or did not match crc from segment copy obtained from deepstore / servers. " + "Expected crc: %s",
originalSegmentCrcFromTaskGenerator);
LOGGER.error(message);
throw new IllegalStateException(message);
}

RoaringBitmap validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());

if (validDocIds.isEmpty()) {
// prevents empty segment generation
LOGGER.info("validDocIds is empty, skip the task. Table: {}, segment: {}", tableNameWithType, segmentName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
validDocIdsType));
}

List<ValidDocIdsMetadataInfo> validDocIdsMetadataList =
serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments,
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments,
serverToEndpoints, null, 60_000, validDocIdsType.toString(), numSegmentsBatchPerServerRequest);

Map<String, SegmentZKMetadata> completedSegmentsMap =
Expand Down Expand Up @@ -209,7 +209,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {

@VisibleForTesting
public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, String> taskConfigs,
Map<String, SegmentZKMetadata> completedSegmentsMap, List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfoList) {
Map<String, SegmentZKMetadata> completedSegmentsMap,
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap) {
double invalidRecordsThresholdPercent = Double.parseDouble(
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
Expand All @@ -218,30 +219,31 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, Stri
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
List<Pair<SegmentZKMetadata, Long>> segmentsForCompaction = new ArrayList<>();
List<String> segmentsForDeletion = new ArrayList<>();
for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoList) {
long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
String segmentName = validDocIdsMetadata.getSegmentName();

// Skip segments if the crc from zk metadata and server does not match. They may be being reloaded.
SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
if (segment == null) {
for (String segmentName : validDocIdsMetadataInfoMap.keySet()) {
// check if segment is part of completed segments
if (!completedSegmentsMap.containsKey(segmentName)) {
LOGGER.warn("Segment {} is not found in the completed segments list, skipping it for compaction", segmentName);
continue;
}
SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoMap.get(segmentName)) {
long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();

if (segment.getCrc() != Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
LOGGER.warn(
"CRC mismatch for segment: {}, skipping it for compaction (segmentZKMetadata={}, validDocIdsMetadata={})",
segmentName, segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
continue;
}
long totalDocs = validDocIdsMetadata.getTotalDocs();
double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100;
if (totalInvalidDocs == totalDocs) {
segmentsForDeletion.add(segment.getSegmentName());
} else if (invalidRecordPercent >= invalidRecordsThresholdPercent
&& totalInvalidDocs >= invalidRecordsThresholdCount) {
segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
// Skip segments if the crc from zk metadata and server does not match. They may be being reloaded.
if (segment.getCrc() != Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={}, validDocIdsMetadata={})", segmentName,
segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
continue;
}
long totalDocs = validDocIdsMetadata.getTotalDocs();
double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100;
if (totalInvalidDocs == totalDocs) {
segmentsForDeletion.add(segment.getSegmentName());
} else if (invalidRecordPercent >= invalidRecordsThresholdPercent
&& totalInvalidDocs >= invalidRecordsThresholdCount) {
segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
}
break;
}
}
segmentsForCompaction.sort((o1, o2) -> {
Expand All @@ -254,8 +256,7 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, Stri
});

return new SegmentSelectionResult(
segmentsForCompaction.stream().map(Map.Entry::getKey).collect(Collectors.toList()),
segmentsForDeletion);
segmentsForCompaction.stream().map(Map.Entry::getKey).collect(Collectors.toList()), segmentsForDeletion);
}

@VisibleForTesting
Expand Down
Loading
Loading