Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[timeseries] Add Support for limit and numGroupsLimit #14945

Merged
merged 4 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, Stri
Long endTs = null;
String step = null;
String timeoutStr = null;
int limit = RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT;
int numGroupsLimit = RangeTimeSeriesRequest.DEFAULT_NUM_GROUPS_LIMIT;
for (NameValuePair nameValuePair : pairs) {
switch (nameValuePair.getName()) {
case "query":
Expand All @@ -167,6 +169,12 @@ private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, Stri
case "timeout":
timeoutStr = nameValuePair.getValue();
break;
case "limit":
limit = Integer.parseInt(nameValuePair.getValue());
break;
case "numGroupsLimit":
numGroupsLimit = Integer.parseInt(nameValuePair.getValue());
break;
default:
/* Okay to ignore unknown parameters since the language implementor may be using them. */
break;
Expand All @@ -182,7 +190,8 @@ private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, Stri
timeout = HumanReadableDuration.from(timeoutStr);
}
// TODO: Pass full raw query param string to the request
return new RangeTimeSeriesRequest(language, query, startTs, endTs, stepSeconds, timeout, queryParamString);
return new RangeTimeSeriesRequest(language, query, startTs, endTs, stepSeconds, timeout, limit, numGroupsLimit,
queryParamString);
}

public static Long getStepSeconds(@Nullable String step) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ public static TimeSeriesBlock buildTimeSeriesBlock(TimeBuckets timeBuckets,
if (groupByResultsBlock.getNumRows() == 0) {
return new TimeSeriesBlock(timeBuckets, new HashMap<>());
}
if (groupByResultsBlock.isNumGroupsLimitReached()) {
throw new IllegalStateException(String.format("Series limit reached. Number of series: %s",
groupByResultsBlock.getNumRows()));
}
// TODO: Check isNumGroupsLimitReached, and propagate it somehow to the caller.
Map<Long, List<TimeSeries>> timeSeriesMap = new HashMap<>(groupByResultsBlock.getNumRows());
List<String> tagNames = getTagNamesFromDataSchema(Objects.requireNonNull(groupByResultsBlock.getDataSchema(),
"DataSchema is null in leaf stage of time-series query"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -78,7 +80,8 @@ public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) {
switch (command) {
case "fetch":
List<String> tokens = commands.get(commandId).subList(1, commands.get(commandId).size());
currentNode = handleFetchNode(planIdGenerator.generateId(), tokens, children, aggInfo, groupByColumns);
currentNode = handleFetchNode(planIdGenerator.generateId(), tokens, children, aggInfo, groupByColumns,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the change for promQL to pass request param in the leaf stage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you just need to set the limit and numGroupsLimit in the LeafTimeSeriesPlanNode of the returned plan

request);
break;
case "sum":
case "min":
Expand Down Expand Up @@ -118,7 +121,8 @@ public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) {
}

public BaseTimeSeriesPlanNode handleFetchNode(String planId, List<String> tokens,
List<BaseTimeSeriesPlanNode> children, AggInfo aggInfo, List<String> groupByColumns) {
List<BaseTimeSeriesPlanNode> children, AggInfo aggInfo, List<String> groupByColumns,
RangeTimeSeriesRequest request) {
Preconditions.checkState(tokens.size() % 2 == 0, "Mismatched args");
String tableName = null;
String timeColumn = null;
Expand Down Expand Up @@ -152,7 +156,11 @@ public BaseTimeSeriesPlanNode handleFetchNode(String planId, List<String> tokens
Preconditions.checkNotNull(timeColumn, "Time column not set. Set via time_col=");
Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via time_unit=");
Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via value=");
Map<String, String> queryOptions = new HashMap<>();
if (request.getNumGroupsLimit() > 0) {
queryOptions.put("numGroupsLimit", Integer.toString(request.getNumGroupsLimit()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can put "numGroupsLimit" in a constant class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Can address in a follow-up. This module is mostly a throw away.. we'll open source the full M3 Plugin after time series support is marked as GA.

}
return new LeafTimeSeriesPlanNode(planId, children, tableName, timeColumn, timeUnit, 0L, filter, valueExpr, aggInfo,
groupByColumns);
groupByColumns, request.getLimit(), queryOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pinot.query.runtime.timeseries;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -111,14 +110,16 @@ QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode, TimeSeriesExec
leafNode.getTimeUnit(), timeBuckets, leafNode.getOffsetSeconds() == null ? 0 : leafNode.getOffsetSeconds());
ExpressionContext aggregation = TimeSeriesAggregationFunction.create(context.getLanguage(),
leafNode.getValueExpression(), timeTransform, timeBuckets, leafNode.getAggInfo());
Map<String, String> queryOptions = new HashMap<>(leafNode.getQueryOptions());
queryOptions.put(QueryOptionKey.TIMEOUT_MS, Long.toString(Math.max(0L, context.getRemainingTimeMs())));
return new QueryContext.Builder()
.setTableName(leafNode.getTableName())
.setFilter(filterContext)
.setGroupByExpressions(groupByExpressions)
.setSelectExpressions(List.of(aggregation))
.setQueryOptions(ImmutableMap.of(QueryOptionKey.TIMEOUT_MS, Long.toString(context.getRemainingTimeMs())))
.setQueryOptions(queryOptions)
.setAliasList(Collections.emptyList())
.setLimit(Integer.MAX_VALUE)
.setLimit(leafNode.getLimit())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
*/
package org.apache.pinot.query.runtime.timeseries;

import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
Expand All @@ -43,6 +46,8 @@
public class PhysicalTimeSeriesServerPlanVisitorTest {
private static final String LANGUAGE = "m3ql";
private static final int DUMMY_DEADLINE_MS = 10_000;
private static final int SERIES_LIMIT = 1000;
private static final Map<String, String> QUERY_OPTIONS = Collections.emptyMap();

@BeforeClass
public void setUp() {
Expand All @@ -65,20 +70,24 @@ public void testCompileQueryContext() {
DUMMY_DEADLINE_MS, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
LeafTimeSeriesPlanNode leafNode =
new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), tableName, timeColumn, TimeUnit.SECONDS, 0L,
filterExpr, "orderCount", aggInfo, Collections.singletonList("cityName"));
filterExpr, "orderCount", aggInfo, Collections.singletonList("cityName"), SERIES_LIMIT,
QUERY_OPTIONS);
QueryContext queryContext = serverPlanVisitor.compileQueryContext(leafNode, context);
assertEquals(queryContext.getFilter().toString(),
"(cityName = 'Chicago' AND orderTime > '990' AND orderTime <= '1990')");
assertTrue(isNumber(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)));
assertEquals(queryContext.getLimit(), SERIES_LIMIT);
}
// Case-2: With offset, complex group-by expression, complex value, and non-empty filter
// Case-2: With offset, complex group-by expression, complex value, non-empty filter, 0 limit, query options.
{
Map<String, String> queryOptions = ImmutableMap.of("numGroupsLimit", "1000");
TimeSeriesExecutionContext context =
new TimeSeriesExecutionContext(LANGUAGE, TimeBuckets.ofSeconds(1000L, Duration.ofSeconds(10), 100),
DUMMY_DEADLINE_MS, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
LeafTimeSeriesPlanNode leafNode =
new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), tableName, timeColumn, TimeUnit.SECONDS, 10L,
filterExpr, "orderCount*2", aggInfo, Collections.singletonList("concat(cityName, stateName, '-')"));
filterExpr, "orderCount*2", aggInfo, Collections.singletonList("concat(cityName, stateName, '-')"),
0 /* limit */, queryOptions);
QueryContext queryContext = serverPlanVisitor.compileQueryContext(leafNode, context);
assertNotNull(queryContext);
assertNotNull(queryContext.getGroupByExpressions());
Expand All @@ -87,6 +96,8 @@ public void testCompileQueryContext() {
assertEquals(queryContext.getFilter().toString(),
"(cityName = 'Chicago' AND orderTime > '980' AND orderTime <= '1980')");
assertTrue(isNumber(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)));
assertEquals(queryContext.getLimit(), RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT);
assertEquals(queryContext.getQueryOptions().get("numGroupsLimit"), "1000");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public enum AggregationFunctionType {
PERCENTILERAWKLLMV("percentileRawKLLMV", ReturnTypes.VARCHAR,
OperandTypes.family(List.of(SqlTypeFamily.ARRAY, SqlTypeFamily.NUMERIC, SqlTypeFamily.INTEGER), i -> i == 2),
SqlTypeName.OTHER),
TIMESERIESAGGREGATE("timeSeriesAggregate", SqlTypeName.OTHER, SqlTypeName.VARCHAR);
TIMESERIESAGGREGATE("timeSeriesAggregate", SqlTypeName.OTHER, SqlTypeName.OTHER);

private static final Set<String> NAMES = Arrays.stream(values())
.flatMap(func -> Stream.of(func.name(), func.getName(), func.getName().toLowerCase()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
Expand All @@ -31,6 +32,9 @@


public class TimeSeriesPlanFragmenterTest {
private static final int SERIES_LIMIT = 1000;
private static final Map<String, String> QUERY_OPTIONS = Collections.emptyMap();

@Test
public void testGetFragmentsWithMultipleLeafNodes() {
/*
Expand Down Expand Up @@ -136,7 +140,8 @@ public void testGetFragmentsWithSinglePlanNode() {

private LeafTimeSeriesPlanNode createMockLeafNode(String id) {
return new LeafTimeSeriesPlanNode(id, Collections.emptyList(), "someTableName", "someTimeColumn",
TimeUnit.SECONDS, 0L, "", "", null, Collections.emptyList());
TimeUnit.SECONDS, 0L, "", "", null, Collections.emptyList(),
SERIES_LIMIT, QUERY_OPTIONS);
}

static class MockTimeSeriesPlanNode extends BaseTimeSeriesPlanNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
* </ul>
*/
public class RangeTimeSeriesRequest {
// TODO: It's not ideal to have another default, that plays with numGroupsLimit, etc.
public static final int DEFAULT_SERIES_LIMIT = 100_000;
public static final int DEFAULT_NUM_GROUPS_LIMIT = -1;
/** Engine allows a Pinot cluster to support multiple Time-Series Query Languages. */
private final String _language;
/** Query is the raw query sent by the caller. */
Expand All @@ -63,11 +66,15 @@ public class RangeTimeSeriesRequest {
private final long _stepSeconds;
/** E2E timeout for the query. */
private final Duration _timeout;
/** Series limit for the query */
private final int _limit;
/** The numGroupsLimit value used in Pinot's Grouping Algorithm. */
private final int _numGroupsLimit;
/** Full query string to allow language implementations to pass custom parameters. */
private final String _fullQueryString;

public RangeTimeSeriesRequest(String language, String query, long startSeconds, long endSeconds, long stepSeconds,
Duration timeout, String fullQueryString) {
Duration timeout, int limit, int numGroupsLimit, String fullQueryString) {
Preconditions.checkState(endSeconds >= startSeconds, "Invalid range. startSeconds "
+ "should be greater than or equal to endSeconds. Found startSeconds=%s and endSeconds=%s",
startSeconds, endSeconds);
Expand All @@ -77,6 +84,8 @@ public RangeTimeSeriesRequest(String language, String query, long startSeconds,
_endSeconds = endSeconds;
_stepSeconds = stepSeconds;
_timeout = timeout;
_limit = limit;
_numGroupsLimit = numGroupsLimit;
_fullQueryString = fullQueryString;
}

Expand Down Expand Up @@ -104,6 +113,14 @@ public Duration getTimeout() {
return _timeout;
}

public int getLimit() {
return _limit;
}

public int getNumGroupsLimit() {
return _numGroupsLimit;
}

public String getFullQueryString() {
return _fullQueryString;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
Expand All @@ -44,6 +46,8 @@ public class LeafTimeSeriesPlanNode extends BaseTimeSeriesPlanNode {
private final String _valueExpression;
private final AggInfo _aggInfo;
private final List<String> _groupByExpressions;
private final Map<String, String> _queryOptions;
private final int _limit;

@JsonCreator
public LeafTimeSeriesPlanNode(
Expand All @@ -52,7 +56,8 @@ public LeafTimeSeriesPlanNode(
@JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offsetSeconds") Long offsetSeconds,
@JsonProperty("filterExpression") String filterExpression,
@JsonProperty("valueExpression") String valueExpression, @JsonProperty("aggInfo") AggInfo aggInfo,
@JsonProperty("groupByExpressions") List<String> groupByExpressions) {
@JsonProperty("groupByExpressions") List<String> groupByExpressions,
@JsonProperty("limit") int limit, @JsonProperty("queryOptions") Map<String, String> queryOptions) {
super(id, inputs);
_tableName = tableName;
_timeColumn = timeColumn;
Expand All @@ -62,17 +67,19 @@ public LeafTimeSeriesPlanNode(
_valueExpression = valueExpression;
_aggInfo = aggInfo;
_groupByExpressions = groupByExpressions;
_limit = limit <= 0 ? RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT : limit;
_queryOptions = queryOptions;
}

public LeafTimeSeriesPlanNode withAggInfo(AggInfo newAggInfo) {
return new LeafTimeSeriesPlanNode(_id, _inputs, _tableName, _timeColumn, _timeUnit, _offsetSeconds,
_filterExpression, _valueExpression, newAggInfo, _groupByExpressions);
_filterExpression, _valueExpression, newAggInfo, _groupByExpressions, _limit, _queryOptions);
}

@Override
public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode> newInputs) {
return new LeafTimeSeriesPlanNode(_id, newInputs, _tableName, _timeColumn, _timeUnit, _offsetSeconds,
_filterExpression, _valueExpression, _aggInfo, _groupByExpressions);
_filterExpression, _valueExpression, _aggInfo, _groupByExpressions, _limit, _queryOptions);
}

@Override
Expand All @@ -83,8 +90,8 @@ public String getKlass() {
@Override
public String getExplainName() {
return String.format("LEAF_TIME_SERIES_PLAN_NODE(%s, table=%s, timeExpr=%s, valueExpr=%s, aggInfo=%s, "
+ "groupBy=%s, filter=%s, offsetSeconds=%s)", _id, _tableName, _timeColumn, _valueExpression,
_aggInfo.getAggFunction(), _groupByExpressions, _filterExpression, _offsetSeconds);
+ "groupBy=%s, filter=%s, offsetSeconds=%s, limit=%s)", _id, _tableName, _timeColumn, _valueExpression,
_aggInfo.getAggFunction(), _groupByExpressions, _filterExpression, _offsetSeconds, _limit);
}

@Override
Expand Down Expand Up @@ -124,6 +131,14 @@ public List<String> getGroupByExpressions() {
return _groupByExpressions;
}

public int getLimit() {
return _limit;
}

public Map<String, String> getQueryOptions() {
return _queryOptions;
}

public String getEffectiveFilter(TimeBuckets timeBuckets) {
String filter = _filterExpression == null ? "" : _filterExpression;
long startTime = _timeUnit.convert(Duration.ofSeconds(timeBuckets.getTimeRangeStartExclusive() - _offsetSeconds));
Expand Down
Loading
Loading