Skip to content

Commit

Permalink
address comments -- undo some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 committed Jul 9, 2024
1 parent 69985a8 commit 6338fa2
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,9 @@ public Map<String, List<ValidDocIdsMetadataInfo>> getSegmentToValidDocIdsMetadat

// Number of segments to query per server request. If a table has a lot of segments, then we might send a
// huge payload to pinot-server in request. Batching the requests will help in reducing the payload size.
Lists.partition(segmentsToQuery, numSegmentsBatchPerServerRequest).forEach(
segmentsToQueryBatch -> serverURLsAndBodies.add(
generateValidDocIdsMetadataURL(tableNameWithType, segmentsToQueryBatch, validDocIdsType,
serverToEndpoints.get(serverToSegments.getKey()))));
Lists.partition(segmentsToQuery, numSegmentsBatchPerServerRequest).forEach(segmentsToQueryBatch ->
serverURLsAndBodies.add(generateValidDocIdsMetadataURL(tableNameWithType, segmentsToQueryBatch,
validDocIdsType, serverToEndpoints.get(serverToSegments.getKey()))));
}

BiMap<String, String> endpointsToServers = serverToEndpoints.inverse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String tableNameW
continue;
}
validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
break;
}
return validDocIds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,6 @@
public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskExecutor.class);

private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir, TableConfig tableConfig,
SegmentMetadataImpl segmentMetadata, String segmentName, Schema schema) {
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(workingDir.getPath());
config.setSegmentName(segmentName);
// Keep index creation time the same as original segment because both segments use the same raw data.
// This way, for REFRESH case, when new segment gets pushed to controller, we can use index creation time to
// identify if the new pushed segment has newer data than the existing one.
config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime()));

// The time column type info is not stored in the segment metadata.
// Keep segment start/end time to properly handle time column type other than EPOCH (e.g.SIMPLE_FORMAT).
if (segmentMetadata.getTimeInterval() != null) {
config.setTimeColumnName(tableConfig.getValidationConfig().getTimeColumnName());
config.setStartTime(Long.toString(segmentMetadata.getStartTime()));
config.setEndTime(Long.toString(segmentMetadata.getEndTime()));
config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
}
return config;
}

@Override
protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File indexDir, File workingDir)
throws Exception {
Expand All @@ -82,16 +61,22 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
String originalSegmentCrcFromTaskGenerator = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
String crcFromDeepStorageSegment = segmentMetadata.getCrc();
RoaringBitmap validDocIds = originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)
? MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName, validDocIdsTypeStr,
MINION_CONTEXT, originalSegmentCrcFromTaskGenerator) : null;
if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)) {
String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc "
+ "from ZK: %s, crc from deepstore: %s", segmentName, originalSegmentCrcFromTaskGenerator,
crcFromDeepStorageSegment);
LOGGER.error(message);
throw new IllegalStateException(message);
}
RoaringBitmap validDocIds =
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName, validDocIdsTypeStr,
MINION_CONTEXT, originalSegmentCrcFromTaskGenerator);
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track it via task-error metrics
String message = String.format("No validDocIds found from all servers. They either failed to download "
+ "or did not match crc fromsegment copy obtained from deepstore / servers. "
+ "OriginalSegmentCrcFromTaskGenerator: %s, crcFromDeepStorageSegment: %s",
originalSegmentCrcFromTaskGenerator, crcFromDeepStorageSegment);
+ "or did not match crc from segment copy obtained from deepstore / servers. " + "Expected crc: %s",
originalSegmentCrcFromTaskGenerator);
LOGGER.error(message);
throw new IllegalStateException(message);
}
Expand Down Expand Up @@ -128,6 +113,27 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
return result;
}

private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir, TableConfig tableConfig,
SegmentMetadataImpl segmentMetadata, String segmentName, Schema schema) {
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(workingDir.getPath());
config.setSegmentName(segmentName);
// Keep index creation time the same as original segment because both segments use the same raw data.
// This way, for REFRESH case, when new segment gets pushed to controller, we can use index creation time to
// identify if the new pushed segment has newer data than the existing one.
config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime()));

// The time column type info is not stored in the segment metadata.
// Keep segment start/end time to properly handle time column type other than EPOCH (e.g.SIMPLE_FORMAT).
if (segmentMetadata.getTimeInterval() != null) {
config.setTimeColumnName(tableConfig.getValidationConfig().getTimeColumnName());
config.setStartTime(Long.toString(segmentMetadata.getStartTime()));
config.setEndTime(Long.toString(segmentMetadata.getEndTime()));
config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
}
return config;
}

@Override
protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
SegmentConversionResult segmentConversionResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,26 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 1;
private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500;

public static class SegmentSelectionResult {

private final List<SegmentZKMetadata> _segmentsForCompaction;

private final List<String> _segmentsForDeletion;

SegmentSelectionResult(List<SegmentZKMetadata> segmentsForCompaction, List<String> segmentsForDeletion) {
_segmentsForCompaction = segmentsForCompaction;
_segmentsForDeletion = segmentsForDeletion;
}

public List<SegmentZKMetadata> getSegmentsForCompaction() {
return _segmentsForCompaction;
}

public List<String> getSegmentsForDeletion() {
return _segmentsForDeletion;
}
}

@VisibleForTesting
public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, String> taskConfigs,
Map<String, SegmentZKMetadata> completedSegmentsMap,
Expand Down Expand Up @@ -112,53 +132,6 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, Stri
segmentsForCompaction.stream().map(Map.Entry::getKey).collect(Collectors.toList()), segmentsForDeletion);
}

@VisibleForTesting
public static List<SegmentZKMetadata> getCompletedSegments(Map<String, String> taskConfigs,
List<SegmentZKMetadata> allSegments, long currentTimeInMillis) {
List<SegmentZKMetadata> completedSegments = new ArrayList<>();
String bufferPeriod = taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);
long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
for (SegmentZKMetadata segment : allSegments) {
CommonConstants.Segment.Realtime.Status status = segment.getStatus();
// initial segments selection based on status and age
if (status.isCompleted() && (segment.getEndTimeMs() <= (currentTimeInMillis - bufferMs))) {
completedSegments.add(segment);
}
}
return completedSegments;
}

@VisibleForTesting
public static int getMaxTasks(String taskType, String tableNameWithType, Map<String, String> taskConfigs) {
int maxTasks = Integer.MAX_VALUE;
String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
if (tableMaxNumTasksConfig != null) {
try {
maxTasks = Integer.parseInt(tableMaxNumTasksConfig);
} catch (Exception e) {
LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType);
}
}
return maxTasks;
}

@VisibleForTesting
static boolean validate(TableConfig tableConfig) {
String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE;
String tableNameWithType = tableConfig.getTableName();
if (tableConfig.getTableType() == TableType.OFFLINE) {
LOGGER.warn("Skip generation task: {} for table: {}, offline table is not supported", taskType,
tableNameWithType);
return false;
}
if (!tableConfig.isUpsertEnabled()) {
LOGGER.warn("Skip generation task: {} for table: {}, table without upsert enabled is not supported", taskType,
tableNameWithType);
return false;
}
return true;
}

@Override
public String getTaskType() {
return MinionConstants.UpsertCompactionTask.TASK_TYPE;
Expand Down Expand Up @@ -225,8 +198,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {

// Number of segments to query per server request. If a table has a lot of segments, then we might send a
// huge payload to pinot-server in request. Batching the requests will help in reducing the payload size.
int numSegmentsBatchPerServerRequest = Integer.parseInt(
taskConfigs.getOrDefault(UpsertCompactionTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST,
int numSegmentsBatchPerServerRequest =
Integer.parseInt(taskConfigs.getOrDefault(UpsertCompactionTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST,
String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST)));

// Validate that the snapshot is enabled if validDocIdsType is validDocIdsSnapshot
Expand Down Expand Up @@ -272,7 +245,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
LOGGER.warn("Skipping segment {} for task {} as download url is empty", segment.getSegmentName(), taskType);
continue;
}
Map<String, String> configs = new HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segment.getSegmentName())));
Map<String, String> configs = new HashMap<>(getBaseTaskConfigs(tableConfig,
List.of(segment.getSegmentName())));
configs.put(MinionConstants.DOWNLOAD_URL_KEY, segment.getDownloadUrl());
configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segment.getCrc()));
Expand All @@ -285,23 +259,50 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
return pinotTaskConfigs;
}

public static class SegmentSelectionResult {

private final List<SegmentZKMetadata> _segmentsForCompaction;

private final List<String> _segmentsForDeletion;

SegmentSelectionResult(List<SegmentZKMetadata> segmentsForCompaction, List<String> segmentsForDeletion) {
_segmentsForCompaction = segmentsForCompaction;
_segmentsForDeletion = segmentsForDeletion;
@VisibleForTesting
public static List<SegmentZKMetadata> getCompletedSegments(Map<String, String> taskConfigs,
List<SegmentZKMetadata> allSegments, long currentTimeInMillis) {
List<SegmentZKMetadata> completedSegments = new ArrayList<>();
String bufferPeriod = taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);
long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
for (SegmentZKMetadata segment : allSegments) {
CommonConstants.Segment.Realtime.Status status = segment.getStatus();
// initial segments selection based on status and age
if (status.isCompleted() && (segment.getEndTimeMs() <= (currentTimeInMillis - bufferMs))) {
completedSegments.add(segment);
}
}
return completedSegments;
}

public List<SegmentZKMetadata> getSegmentsForCompaction() {
return _segmentsForCompaction;
@VisibleForTesting
public static int getMaxTasks(String taskType, String tableNameWithType, Map<String, String> taskConfigs) {
int maxTasks = Integer.MAX_VALUE;
String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
if (tableMaxNumTasksConfig != null) {
try {
maxTasks = Integer.parseInt(tableMaxNumTasksConfig);
} catch (Exception e) {
LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType);
}
}
return maxTasks;
}

public List<String> getSegmentsForDeletion() {
return _segmentsForDeletion;
@VisibleForTesting
static boolean validate(TableConfig tableConfig) {
String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE;
String tableNameWithType = tableConfig.getTableName();
if (tableConfig.getTableType() == TableType.OFFLINE) {
LOGGER.warn("Skip generation task: {} for table: {}, offline table is not supported", taskType,
tableNameWithType);
return false;
}
if (!tableConfig.isUpsertEnabled()) {
LOGGER.warn("Skip generation task: {} for table: {}, table without upsert enabled is not supported", taskType,
tableNameWithType);
return false;
}
return true;
}
}

0 comments on commit 6338fa2

Please sign in to comment.