Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make upsert compaction task more robust to crc mismatch #13489

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.MediaType;
Expand Down Expand Up @@ -215,14 +216,30 @@ public List<String> getSegmentMetadataFromServer(String tableNameWithType,

/**
* This method is called when the API request is to fetch validDocId metadata for a list segments of the given table.
* This method will pick a server that hosts the target segment and fetch the segment metadata result.
* This method will pick one server randomly that hosts the target segment and fetch the segment metadata result.
*
* @return segment metadata as a JSON string
* @return list of valid doc id metadata, one per segment processed.
*/
public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tableNameWithType,
Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> serverToEndpoints,
@Nullable List<String> segmentNames, int timeoutMs, String validDocIdsType,
int numSegmentsBatchPerServerRequest) {
return getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegmentsMap, serverToEndpoints,
segmentNames, timeoutMs, validDocIdsType, numSegmentsBatchPerServerRequest).values().stream()
.filter(list -> list != null && !list.isEmpty()).map(list -> list.get(0)).collect(Collectors.toList());
}

/**
* This method is called when the API request is to fetch validDocId metadata for a list segments of the given table.
* This method will pick all servers that hosts the target segment and fetch the segment metadata result and
* return as a list.
*
* @return map of segment name to list of valid doc id metadata where each element is every server's metadata.
*/
public Map<String, List<ValidDocIdsMetadataInfo>> getSegmentToValidDocIdsMetadataFromServer(String tableNameWithType,
Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> serverToEndpoints,
@Nullable List<String> segmentNames, int timeoutMs, String validDocIdsType,
int numSegmentsBatchPerServerRequest) {
List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
for (Map.Entry<String, List<String>> serverToSegments : serverToSegmentsMap.entrySet()) {
List<String> segmentsForServer = serverToSegments.getValue();
Expand All @@ -240,9 +257,10 @@ public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tab

// Number of segments to query per server request. If a table has a lot of segments, then we might send a
// huge payload to pinot-server in request. Batching the requests will help in reducing the payload size.
Lists.partition(segmentsToQuery, numSegmentsBatchPerServerRequest).forEach(segmentsToQueryBatch ->
serverURLsAndBodies.add(generateValidDocIdsMetadataURL(tableNameWithType, segmentsToQueryBatch,
validDocIdsType, serverToEndpoints.get(serverToSegments.getKey()))));
Lists.partition(segmentsToQuery, numSegmentsBatchPerServerRequest).forEach(
segmentsToQueryBatch -> serverURLsAndBodies.add(
generateValidDocIdsMetadataURL(tableNameWithType, segmentsToQueryBatch, validDocIdsType,
serverToEndpoints.get(serverToSegments.getKey()))));
}

BiMap<String, String> endpointsToServers = serverToEndpoints.inverse();
Expand All @@ -256,7 +274,7 @@ public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tab
completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, tableNameWithType, true, requestHeaders,
timeoutMs, null);

Map<String, ValidDocIdsMetadataInfo> validDocIdsMetadataInfos = new HashMap<>();
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfos = new HashMap<>();
int failedParses = 0;
int returnedServerRequestsCount = 0;
for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
Expand All @@ -266,7 +284,8 @@ public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tab
JsonUtils.stringToObject(validDocIdsMetadataList, new TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
});
for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo : validDocIdsMetadataInfoList) {
validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(), validDocIdsMetadataInfo);
validDocIdsMetadataInfos.computeIfAbsent(validDocIdsMetadataInfo.getSegmentName(), k -> new ArrayList<>())
.add(validDocIdsMetadataInfo);
}
returnedServerRequestsCount++;
} catch (Exception e) {
Expand All @@ -292,7 +311,7 @@ public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tab

LOGGER.info("Retrieved validDocIds metadata for {} segments from {} server requests.",
validDocIdsMetadataInfos.size(), returnedServerRequestsCount);
return new ArrayList<>(validDocIdsMetadataInfos.values());
return validDocIdsMetadataInfos;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
Expand All @@ -42,6 +44,7 @@
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -143,32 +146,6 @@ public static String normalizeDirectoryURI(String dirInStr) {
return dirInStr;
}

public static ValidDocIdsBitmapResponse getValidDocIdsBitmap(String tableNameWithType, String segmentName,
String validDocIdsType, MinionContext minionContext) {
HelixAdmin helixAdmin = minionContext.getHelixManager().getClusterManagmentTool();
String clusterName = minionContext.getHelixManager().getClusterName();

List<String> servers = getServers(segmentName, tableNameWithType, helixAdmin, clusterName);
for (String server : servers) {
InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, server);
String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);

// We only need aggregated table size and the total number of docs/rows. Skipping column related stats, by
// passing an empty list.
ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
try {
return serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, segmentName, endpoint,
validDocIdsType, 60_000);
} catch (Exception e) {
LOGGER.warn(
String.format("Unable to retrieve validDocIds bitmap for segment: %s from endpoint: %s", segmentName,
endpoint), e);
}
}
throw new IllegalStateException(
String.format("Unable to retrieve validDocIds bitmap for segment: %s from servers: %s", segmentName, servers));
}

public static List<String> getServers(String segmentName, String tableNameWithType, HelixAdmin helixAdmin,
String clusterName) {
ExternalView externalView = helixAdmin.getResourceExternalView(clusterName, tableNameWithType);
Expand Down Expand Up @@ -206,4 +183,66 @@ public static boolean extractMinionAllowDownloadFromServer(TableConfig tableConf
}
return defaultValue;
}

/**
* Returns the validDocID bitmap from the server whose local segment crc matches both crc of ZK metadata and
* deepstore copy.
*/
@Nullable
public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String tableNameWithType, String segmentName,
String validDocIdsType, MinionContext minionContext, String originalSegmentCrcFromTaskGenerator,
String crcFromDeepStorageSegment) {
tibrewalpratik17 marked this conversation as resolved.
Show resolved Hide resolved
String clusterName = minionContext.getHelixManager().getClusterName();
HelixAdmin helixAdmin = minionContext.getHelixManager().getClusterManagmentTool();
RoaringBitmap validDocIds = null;
List<String> servers = MinionTaskUtils.getServers(segmentName, tableNameWithType, helixAdmin, clusterName);
tibrewalpratik17 marked this conversation as resolved.
Show resolved Hide resolved
for (String server : servers) {
InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, server);
String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);

// We only need aggregated table size and the total number of docs/rows. Skipping column related stats, by
// passing an empty list.
ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
try {
validDocIdsBitmapResponse =
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, segmentName, endpoint,
validDocIdsType, 60_000);
} catch (Exception e) {
LOGGER.warn(
String.format("Unable to retrieve validDocIds bitmap for segment: %s from endpoint: %s", segmentName,
endpoint), e);
continue;
}

// Check crc from the downloaded segment against the crc returned from the server along with the valid doc id
// bitmap. If this doesn't match, this means that we are hitting the race condition where the segment has been
// uploaded successfully while the server is still reloading the segment. Reloading can take a while when the
// offheap upsert is used because we will need to delete & add all primary keys.
// `BaseSingleSegmentConversionExecutor.executeTask()` already checks for the crc from the task generator
// against the crc from the current segment zk metadata, so we don't need to check that here.
String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc();
if (!originalSegmentCrcFromTaskGenerator.equals(crcFromValidDocIdsBitmap)) {
// In this scenario, we are hitting the other replica of the segment which did not commit to ZK or deepstore.
// We will skip processing this bitmap to query other server to confirm if there is a valid matching CRC.
String message = String.format("CRC mismatch for segment: %s, expected value based on task generator: %s, "
+ "actual crc from validDocIdsBitmapResponse from endpoint %s: %s", segmentName,
originalSegmentCrcFromTaskGenerator, endpoint, crcFromValidDocIdsBitmap);
LOGGER.warn(message);
continue;
}
if (!crcFromValidDocIdsBitmap.equals(crcFromDeepStorageSegment)) {
// Deepstore copies might not always have the same CRC as that from the server we queried for ValidDocIdsBitmap
// It can happen due to CRC mismatch issues due to replicas diverging, lucene index issues.
String message = String.format(
"CRC mismatch for segment: %s, " + "expected crc from validDocIdsBitmapResponse from endpoint %s: %s, "
+ "actual crc based on deepstore / server metadata copy: %s", segmentName, endpoint,
crcFromValidDocIdsBitmap, crcFromDeepStorageSegment);
LOGGER.warn(message);
continue;
}
validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
tibrewalpratik17 marked this conversation as resolved.
Show resolved Hide resolved
}
return validDocIds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
Expand All @@ -45,6 +43,27 @@
public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskExecutor.class);

private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir, TableConfig tableConfig,
SegmentMetadataImpl segmentMetadata, String segmentName, Schema schema) {
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(workingDir.getPath());
config.setSegmentName(segmentName);
// Keep index creation time the same as original segment because both segments use the same raw data.
// This way, for REFRESH case, when new segment gets pushed to controller, we can use index creation time to
// identify if the new pushed segment has newer data than the existing one.
config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime()));

// The time column type info is not stored in the segment metadata.
// Keep segment start/end time to properly handle time column type other than EPOCH (e.g.SIMPLE_FORMAT).
if (segmentMetadata.getTimeInterval() != null) {
config.setTimeColumnName(tableConfig.getValidationConfig().getTimeColumnName());
config.setStartTime(Long.toString(segmentMetadata.getStartTime()));
config.setEndTime(Long.toString(segmentMetadata.getEndTime()));
config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
}
return config;
}

@Override
protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File indexDir, File workingDir)
throws Exception {
Expand All @@ -60,31 +79,19 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File

String validDocIdsTypeStr =
configs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_TYPE, ValidDocIdsType.SNAPSHOT.name());
ValidDocIdsType validDocIdsType = ValidDocIdsType.valueOf(validDocIdsTypeStr.toUpperCase());
ValidDocIdsBitmapResponse validDocIdsBitmapResponse =
MinionTaskUtils.getValidDocIdsBitmap(tableNameWithType, segmentName, validDocIdsType.toString(),
MINION_CONTEXT);

// Check crc from the downloaded segment against the crc returned from the server along with the valid doc id
// bitmap. If this doesn't match, this means that we are hitting the race condition where the segment has been
// uploaded successfully while the server is still reloading the segment. Reloading can take a while when the
// offheap upsert is used because we will need to delete & add all primary keys.
// `BaseSingleSegmentConversionExecutor.executeTask()` already checks for the crc from the task generator
// against the crc from the current segment zk metadata, so we don't need to check that here.
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
String originalSegmentCrcFromTaskGenerator = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
String crcFromDeepStorageSegment = segmentMetadata.getCrc();
String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc();
if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)
|| !originalSegmentCrcFromTaskGenerator.equals(crcFromValidDocIdsBitmap)) {
LOGGER.warn("CRC mismatch for segment: {}, expected: {}, actual crc from server: {}", segmentName,
crcFromDeepStorageSegment, validDocIdsBitmapResponse.getSegmentCrc());
return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
.build();
RoaringBitmap validDocIds =
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName, validDocIdsTypeStr,
MINION_CONTEXT, originalSegmentCrcFromTaskGenerator, crcFromDeepStorageSegment);
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track it via task-error metrics
LOGGER.error("No validDocIds found from all servers. They either failed to download or did not match crc from"
+ "segment copy obtained from deepstore / servers.");
throw new IllegalStateException("No valid validDocIds found from all servers.");
}

RoaringBitmap validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());

if (validDocIds.isEmpty()) {
// prevents empty segment generation
LOGGER.info("validDocIds is empty, skip the task. Table: {}, segment: {}", tableNameWithType, segmentName);
Expand Down Expand Up @@ -118,27 +125,6 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
return result;
}

private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir, TableConfig tableConfig,
SegmentMetadataImpl segmentMetadata, String segmentName, Schema schema) {
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(workingDir.getPath());
config.setSegmentName(segmentName);
// Keep index creation time the same as original segment because both segments use the same raw data.
// This way, for REFRESH case, when new segment gets pushed to controller, we can use index creation time to
// identify if the new pushed segment has newer data than the existing one.
config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime()));

// The time column type info is not stored in the segment metadata.
// Keep segment start/end time to properly handle time column type other than EPOCH (e.g.SIMPLE_FORMAT).
if (segmentMetadata.getTimeInterval() != null) {
config.setTimeColumnName(tableConfig.getValidationConfig().getTimeColumnName());
config.setStartTime(Long.toString(segmentMetadata.getStartTime()));
config.setEndTime(Long.toString(segmentMetadata.getEndTime()));
config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
}
return config;
}

@Override
protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
SegmentConversionResult segmentConversionResult) {
Expand Down
Loading
Loading