Skip to content

Commit

Permalink
address comments -- undo some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 committed Jul 9, 2024
1 parent 69985a8 commit 0115cac
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,9 @@ public Map<String, List<ValidDocIdsMetadataInfo>> getSegmentToValidDocIdsMetadat

// Number of segments to query per server request. If a table has a lot of segments, then we might send a
// huge payload to pinot-server in request. Batching the requests will help in reducing the payload size.
Lists.partition(segmentsToQuery, numSegmentsBatchPerServerRequest).forEach(
segmentsToQueryBatch -> serverURLsAndBodies.add(
generateValidDocIdsMetadataURL(tableNameWithType, segmentsToQueryBatch, validDocIdsType,
serverToEndpoints.get(serverToSegments.getKey()))));
Lists.partition(segmentsToQuery, numSegmentsBatchPerServerRequest).forEach(segmentsToQueryBatch ->
serverURLsAndBodies.add(generateValidDocIdsMetadataURL(tableNameWithType, segmentsToQueryBatch,
validDocIdsType, serverToEndpoints.get(serverToSegments.getKey()))));
}

BiMap<String, String> endpointsToServers = serverToEndpoints.inverse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String tableNameW
continue;
}
validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
break;
}
return validDocIds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,6 @@
public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskExecutor.class);

private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir, TableConfig tableConfig,
SegmentMetadataImpl segmentMetadata, String segmentName, Schema schema) {
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(workingDir.getPath());
config.setSegmentName(segmentName);
// Keep index creation time the same as original segment because both segments use the same raw data.
// This way, for REFRESH case, when new segment gets pushed to controller, we can use index creation time to
// identify if the new pushed segment has newer data than the existing one.
config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime()));

// The time column type info is not stored in the segment metadata.
// Keep segment start/end time to properly handle time column type other than EPOCH (e.g.SIMPLE_FORMAT).
if (segmentMetadata.getTimeInterval() != null) {
config.setTimeColumnName(tableConfig.getValidationConfig().getTimeColumnName());
config.setStartTime(Long.toString(segmentMetadata.getStartTime()));
config.setEndTime(Long.toString(segmentMetadata.getEndTime()));
config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
}
return config;
}

@Override
protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File indexDir, File workingDir)
throws Exception {
Expand All @@ -82,16 +61,22 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
String originalSegmentCrcFromTaskGenerator = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
String crcFromDeepStorageSegment = segmentMetadata.getCrc();
RoaringBitmap validDocIds = originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)
? MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName, validDocIdsTypeStr,
MINION_CONTEXT, originalSegmentCrcFromTaskGenerator) : null;
if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)) {
String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc "
+ "from ZK: %s, crc from deepstore: %s", segmentName, originalSegmentCrcFromTaskGenerator,
crcFromDeepStorageSegment);
LOGGER.error(message);
throw new IllegalStateException(message);
}
RoaringBitmap validDocIds =
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName, validDocIdsTypeStr,
MINION_CONTEXT, originalSegmentCrcFromTaskGenerator);
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track it via task-error metrics
String message = String.format("No validDocIds found from all servers. They either failed to download "
+ "or did not match crc fromsegment copy obtained from deepstore / servers. "
+ "OriginalSegmentCrcFromTaskGenerator: %s, crcFromDeepStorageSegment: %s",
originalSegmentCrcFromTaskGenerator, crcFromDeepStorageSegment);
+ "or did not match crc from segment copy obtained from deepstore / servers. " + "Expected crc: %s",
originalSegmentCrcFromTaskGenerator);
LOGGER.error(message);
throw new IllegalStateException(message);
}
Expand Down Expand Up @@ -128,6 +113,27 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
return result;
}

private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir, TableConfig tableConfig,
SegmentMetadataImpl segmentMetadata, String segmentName, Schema schema) {
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(workingDir.getPath());
config.setSegmentName(segmentName);
// Keep index creation time the same as original segment because both segments use the same raw data.
// This way, for REFRESH case, when new segment gets pushed to controller, we can use index creation time to
// identify if the new pushed segment has newer data than the existing one.
config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime()));

// The time column type info is not stored in the segment metadata.
// Keep segment start/end time to properly handle time column type other than EPOCH (e.g.SIMPLE_FORMAT).
if (segmentMetadata.getTimeInterval() != null) {
config.setTimeColumnName(tableConfig.getValidationConfig().getTimeColumnName());
config.setStartTime(Long.toString(segmentMetadata.getStartTime()));
config.setEndTime(Long.toString(segmentMetadata.getEndTime()));
config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
}
return config;
}

@Override
protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
SegmentConversionResult segmentConversionResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,103 +60,24 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 1;
private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500;

@VisibleForTesting
public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, String> taskConfigs,
Map<String, SegmentZKMetadata> completedSegmentsMap,
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap) {
double invalidRecordsThresholdPercent = Double.parseDouble(
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
long invalidRecordsThresholdCount = Long.parseLong(
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT,
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
List<Pair<SegmentZKMetadata, Long>> segmentsForCompaction = new ArrayList<>();
List<String> segmentsForDeletion = new ArrayList<>();
for (String segmentName : validDocIdsMetadataInfoMap.keySet()) {
// check if segment is part of completed segments
if (!completedSegmentsMap.containsKey(segmentName)) {
LOGGER.warn("Segment {} is not found in the completed segments list, skipping it for compaction", segmentName);
continue;
}
SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoMap.get(segmentName)) {
long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
public static class SegmentSelectionResult {

// Skip segments if the crc from zk metadata and server does not match. They may be being reloaded.
if (segment.getCrc() != Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={}, validDocIdsMetadata={})", segmentName,
segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
continue;
}
long totalDocs = validDocIdsMetadata.getTotalDocs();
double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100;
if (totalInvalidDocs == totalDocs) {
segmentsForDeletion.add(segment.getSegmentName());
} else if (invalidRecordPercent >= invalidRecordsThresholdPercent
&& totalInvalidDocs >= invalidRecordsThresholdCount) {
segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
}
break;
}
}
segmentsForCompaction.sort((o1, o2) -> {
if (o1.getValue() > o2.getValue()) {
return -1;
} else if (o1.getValue().equals(o2.getValue())) {
return 0;
}
return 1;
});
private final List<SegmentZKMetadata> _segmentsForCompaction;

return new SegmentSelectionResult(
segmentsForCompaction.stream().map(Map.Entry::getKey).collect(Collectors.toList()), segmentsForDeletion);
}
private final List<String> _segmentsForDeletion;

@VisibleForTesting
public static List<SegmentZKMetadata> getCompletedSegments(Map<String, String> taskConfigs,
List<SegmentZKMetadata> allSegments, long currentTimeInMillis) {
List<SegmentZKMetadata> completedSegments = new ArrayList<>();
String bufferPeriod = taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);
long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
for (SegmentZKMetadata segment : allSegments) {
CommonConstants.Segment.Realtime.Status status = segment.getStatus();
// initial segments selection based on status and age
if (status.isCompleted() && (segment.getEndTimeMs() <= (currentTimeInMillis - bufferMs))) {
completedSegments.add(segment);
}
SegmentSelectionResult(List<SegmentZKMetadata> segmentsForCompaction, List<String> segmentsForDeletion) {
_segmentsForCompaction = segmentsForCompaction;
_segmentsForDeletion = segmentsForDeletion;
}
return completedSegments;
}

@VisibleForTesting
public static int getMaxTasks(String taskType, String tableNameWithType, Map<String, String> taskConfigs) {
int maxTasks = Integer.MAX_VALUE;
String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
if (tableMaxNumTasksConfig != null) {
try {
maxTasks = Integer.parseInt(tableMaxNumTasksConfig);
} catch (Exception e) {
LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType);
}
public List<SegmentZKMetadata> getSegmentsForCompaction() {
return _segmentsForCompaction;
}
return maxTasks;
}

@VisibleForTesting
static boolean validate(TableConfig tableConfig) {
String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE;
String tableNameWithType = tableConfig.getTableName();
if (tableConfig.getTableType() == TableType.OFFLINE) {
LOGGER.warn("Skip generation task: {} for table: {}, offline table is not supported", taskType,
tableNameWithType);
return false;
}
if (!tableConfig.isUpsertEnabled()) {
LOGGER.warn("Skip generation task: {} for table: {}, table without upsert enabled is not supported", taskType,
tableNameWithType);
return false;
public List<String> getSegmentsForDeletion() {
return _segmentsForDeletion;
}
return true;
}

@Override
Expand Down Expand Up @@ -225,8 +146,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {

// Number of segments to query per server request. If a table has a lot of segments, then we might send a
// huge payload to pinot-server in request. Batching the requests will help in reducing the payload size.
int numSegmentsBatchPerServerRequest = Integer.parseInt(
taskConfigs.getOrDefault(UpsertCompactionTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST,
int numSegmentsBatchPerServerRequest =
Integer.parseInt(taskConfigs.getOrDefault(UpsertCompactionTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST,
String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST)));

// Validate that the snapshot is enabled if validDocIdsType is validDocIdsSnapshot
Expand Down Expand Up @@ -272,7 +193,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
LOGGER.warn("Skipping segment {} for task {} as download url is empty", segment.getSegmentName(), taskType);
continue;
}
Map<String, String> configs = new HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segment.getSegmentName())));
Map<String, String> configs = new HashMap<>(getBaseTaskConfigs(tableConfig,
List.of(segment.getSegmentName())));
configs.put(MinionConstants.DOWNLOAD_URL_KEY, segment.getDownloadUrl());
configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segment.getCrc()));
Expand All @@ -285,23 +207,102 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
return pinotTaskConfigs;
}

public static class SegmentSelectionResult {
@VisibleForTesting
public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, String> taskConfigs,
Map<String, SegmentZKMetadata> completedSegmentsMap,
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap) {
double invalidRecordsThresholdPercent = Double.parseDouble(
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
long invalidRecordsThresholdCount = Long.parseLong(
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT,
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
List<Pair<SegmentZKMetadata, Long>> segmentsForCompaction = new ArrayList<>();
List<String> segmentsForDeletion = new ArrayList<>();
for (String segmentName : validDocIdsMetadataInfoMap.keySet()) {
// check if segment is part of completed segments
if (!completedSegmentsMap.containsKey(segmentName)) {
LOGGER.warn("Segment {} is not found in the completed segments list, skipping it for compaction", segmentName);
continue;
}
SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoMap.get(segmentName)) {
long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();

private final List<SegmentZKMetadata> _segmentsForCompaction;
// Skip segments if the crc from zk metadata and server does not match. They may be being reloaded.
if (segment.getCrc() != Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={}, validDocIdsMetadata={})", segmentName,
segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
continue;
}
long totalDocs = validDocIdsMetadata.getTotalDocs();
double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100;
if (totalInvalidDocs == totalDocs) {
segmentsForDeletion.add(segment.getSegmentName());
} else if (invalidRecordPercent >= invalidRecordsThresholdPercent
&& totalInvalidDocs >= invalidRecordsThresholdCount) {
segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
}
break;
}
}
segmentsForCompaction.sort((o1, o2) -> {
if (o1.getValue() > o2.getValue()) {
return -1;
} else if (o1.getValue().equals(o2.getValue())) {
return 0;
}
return 1;
});

private final List<String> _segmentsForDeletion;
return new SegmentSelectionResult(
segmentsForCompaction.stream().map(Map.Entry::getKey).collect(Collectors.toList()), segmentsForDeletion);
}

SegmentSelectionResult(List<SegmentZKMetadata> segmentsForCompaction, List<String> segmentsForDeletion) {
_segmentsForCompaction = segmentsForCompaction;
_segmentsForDeletion = segmentsForDeletion;
@VisibleForTesting
public static List<SegmentZKMetadata> getCompletedSegments(Map<String, String> taskConfigs,
List<SegmentZKMetadata> allSegments, long currentTimeInMillis) {
List<SegmentZKMetadata> completedSegments = new ArrayList<>();
String bufferPeriod = taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);
long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
for (SegmentZKMetadata segment : allSegments) {
CommonConstants.Segment.Realtime.Status status = segment.getStatus();
// initial segments selection based on status and age
if (status.isCompleted() && (segment.getEndTimeMs() <= (currentTimeInMillis - bufferMs))) {
completedSegments.add(segment);
}
}
return completedSegments;
}

public List<SegmentZKMetadata> getSegmentsForCompaction() {
return _segmentsForCompaction;
@VisibleForTesting
public static int getMaxTasks(String taskType, String tableNameWithType, Map<String, String> taskConfigs) {
int maxTasks = Integer.MAX_VALUE;
String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
if (tableMaxNumTasksConfig != null) {
try {
maxTasks = Integer.parseInt(tableMaxNumTasksConfig);
} catch (Exception e) {
LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType);
}
}
return maxTasks;
}

public List<String> getSegmentsForDeletion() {
return _segmentsForDeletion;
@VisibleForTesting
static boolean validate(TableConfig tableConfig) {
String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE;
String tableNameWithType = tableConfig.getTableName();
if (tableConfig.getTableType() == TableType.OFFLINE) {
LOGGER.warn("Skip generation task: {} for table: {}, offline table is not supported", taskType,
tableNameWithType);
return false;
}
if (!tableConfig.isUpsertEnabled()) {
LOGGER.warn("Skip generation task: {} for table: {}, table without upsert enabled is not supported", taskType,
tableNameWithType);
return false;
}
return true;
}
}

0 comments on commit 0115cac

Please sign in to comment.