diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java index d14f2860138a..ac6962c5929e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java @@ -150,6 +150,8 @@ private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, Stri Long endTs = null; String step = null; String timeoutStr = null; + int limit = RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT; + int numGroupsLimit = RangeTimeSeriesRequest.DEFAULT_NUM_GROUPS_LIMIT; for (NameValuePair nameValuePair : pairs) { switch (nameValuePair.getName()) { case "query": @@ -167,6 +169,12 @@ private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, Stri case "timeout": timeoutStr = nameValuePair.getValue(); break; + case "limit": + limit = Integer.parseInt(nameValuePair.getValue()); + break; + case "numGroupsLimit": + numGroupsLimit = Integer.parseInt(nameValuePair.getValue()); + break; default: /* Okay to ignore unknown parameters since the language implementor may be using them. */ break; @@ -182,7 +190,8 @@ private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, Stri timeout = HumanReadableDuration.from(timeoutStr); } // TODO: Pass full raw query param string to the request - return new RangeTimeSeriesRequest(language, query, startTs, endTs, stepSeconds, timeout, queryParamString); + return new RangeTimeSeriesRequest(language, query, startTs, endTs, stepSeconds, timeout, limit, numGroupsLimit, + queryParamString); } public static Long getStepSeconds(@Nullable String step) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java index 236d4f858ff0..7f095c47d17a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java @@ -44,10 +44,7 @@ public static TimeSeriesBlock buildTimeSeriesBlock(TimeBuckets timeBuckets, if (groupByResultsBlock.getNumRows() == 0) { return new TimeSeriesBlock(timeBuckets, new HashMap<>()); } - if (groupByResultsBlock.isNumGroupsLimitReached()) { - throw new IllegalStateException(String.format("Series limit reached. Number of series: %s", - groupByResultsBlock.getNumRows())); - } + // TODO: Check isNumGroupsLimitReached, and propagate it somehow to the caller. Map> timeSeriesMap = new HashMap<>(groupByResultsBlock.getNumRows()); List tagNames = getTagNamesFromDataSchema(Objects.requireNonNull(groupByResultsBlock.getDataSchema(), "DataSchema is null in leaf stage of time-series query")); diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java index 42515083c0db..e38ae76d532c 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java @@ -21,8 +21,10 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -78,7 +80,8 @@ public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) { switch (command) { case "fetch": List tokens = commands.get(commandId).subList(1, commands.get(commandId).size()); - currentNode = handleFetchNode(planIdGenerator.generateId(), tokens, children, aggInfo, groupByColumns); + currentNode = handleFetchNode(planIdGenerator.generateId(), tokens, children, aggInfo, groupByColumns, + request); break; case "sum": case "min": @@ -118,7 +121,8 @@ public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) { } public BaseTimeSeriesPlanNode handleFetchNode(String planId, List tokens, - List children, AggInfo aggInfo, List groupByColumns) { + List children, AggInfo aggInfo, List groupByColumns, + RangeTimeSeriesRequest request) { Preconditions.checkState(tokens.size() % 2 == 0, "Mismatched args"); String tableName = null; String timeColumn = null; @@ -152,7 +156,11 @@ public BaseTimeSeriesPlanNode handleFetchNode(String planId, List tokens Preconditions.checkNotNull(timeColumn, "Time column not set. Set via time_col="); Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via time_unit="); Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via value="); + Map queryOptions = new HashMap<>(); + if (request.getNumGroupsLimit() > 0) { + queryOptions.put("numGroupsLimit", Integer.toString(request.getNumGroupsLimit())); + } return new LeafTimeSeriesPlanNode(planId, children, tableName, timeColumn, timeUnit, 0L, filter, valueExpr, aggInfo, - groupByColumns); + groupByColumns, request.getLimit(), queryOptions); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java index cfcd80a9e7ea..72857d403b99 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java @@ -19,7 +19,6 @@ package org.apache.pinot.query.runtime.timeseries; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -111,14 +110,16 @@ QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode, TimeSeriesExec leafNode.getTimeUnit(), timeBuckets, leafNode.getOffsetSeconds() == null ? 0 : leafNode.getOffsetSeconds()); ExpressionContext aggregation = TimeSeriesAggregationFunction.create(context.getLanguage(), leafNode.getValueExpression(), timeTransform, timeBuckets, leafNode.getAggInfo()); + Map queryOptions = new HashMap<>(leafNode.getQueryOptions()); + queryOptions.put(QueryOptionKey.TIMEOUT_MS, Long.toString(Math.max(0L, context.getRemainingTimeMs()))); return new QueryContext.Builder() .setTableName(leafNode.getTableName()) .setFilter(filterContext) .setGroupByExpressions(groupByExpressions) .setSelectExpressions(List.of(aggregation)) - .setQueryOptions(ImmutableMap.of(QueryOptionKey.TIMEOUT_MS, Long.toString(context.getRemainingTimeMs()))) + .setQueryOptions(queryOptions) .setAliasList(Collections.emptyList()) - .setLimit(Integer.MAX_VALUE) + .setLimit(leafNode.getLimit()) .build(); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java index f98e4228e09b..66654b011701 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java @@ -18,8 +18,10 @@ */ package org.apache.pinot.query.runtime.timeseries; +import com.google.common.collect.ImmutableMap; import java.time.Duration; import java.util.Collections; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.metrics.ServerMetrics; @@ -27,6 +29,7 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest; import org.apache.pinot.tsdb.spi.TimeBuckets; import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory; @@ -43,6 +46,8 @@ public class PhysicalTimeSeriesServerPlanVisitorTest { private static final String LANGUAGE = "m3ql"; private static final int DUMMY_DEADLINE_MS = 10_000; + private static final int SERIES_LIMIT = 1000; + private static final Map QUERY_OPTIONS = Collections.emptyMap(); @BeforeClass public void setUp() { @@ -65,20 +70,24 @@ public void testCompileQueryContext() { DUMMY_DEADLINE_MS, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); LeafTimeSeriesPlanNode leafNode = new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), tableName, timeColumn, TimeUnit.SECONDS, 0L, - filterExpr, "orderCount", aggInfo, Collections.singletonList("cityName")); + filterExpr, "orderCount", aggInfo, Collections.singletonList("cityName"), SERIES_LIMIT, + QUERY_OPTIONS); QueryContext queryContext = serverPlanVisitor.compileQueryContext(leafNode, context); assertEquals(queryContext.getFilter().toString(), "(cityName = 'Chicago' AND orderTime > '990' AND orderTime <= '1990')"); assertTrue(isNumber(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS))); + assertEquals(queryContext.getLimit(), SERIES_LIMIT); } - // Case-2: With offset, complex group-by expression, complex value, and non-empty filter + // Case-2: With offset, complex group-by expression, complex value, non-empty filter, 0 limit, query options. { + Map queryOptions = ImmutableMap.of("numGroupsLimit", "1000"); TimeSeriesExecutionContext context = new TimeSeriesExecutionContext(LANGUAGE, TimeBuckets.ofSeconds(1000L, Duration.ofSeconds(10), 100), DUMMY_DEADLINE_MS, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); LeafTimeSeriesPlanNode leafNode = new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), tableName, timeColumn, TimeUnit.SECONDS, 10L, - filterExpr, "orderCount*2", aggInfo, Collections.singletonList("concat(cityName, stateName, '-')")); + filterExpr, "orderCount*2", aggInfo, Collections.singletonList("concat(cityName, stateName, '-')"), + 0 /* limit */, queryOptions); QueryContext queryContext = serverPlanVisitor.compileQueryContext(leafNode, context); assertNotNull(queryContext); assertNotNull(queryContext.getGroupByExpressions()); @@ -87,6 +96,8 @@ public void testCompileQueryContext() { assertEquals(queryContext.getFilter().toString(), "(cityName = 'Chicago' AND orderTime > '980' AND orderTime <= '1980')"); assertTrue(isNumber(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS))); + assertEquals(queryContext.getLimit(), RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT); + assertEquals(queryContext.getQueryOptions().get("numGroupsLimit"), "1000"); } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index 9840f684c0a6..d32a41723adc 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -221,7 +221,7 @@ public enum AggregationFunctionType { PERCENTILERAWKLLMV("percentileRawKLLMV", ReturnTypes.VARCHAR, OperandTypes.family(List.of(SqlTypeFamily.ARRAY, SqlTypeFamily.NUMERIC, SqlTypeFamily.INTEGER), i -> i == 2), SqlTypeName.OTHER), - TIMESERIESAGGREGATE("timeSeriesAggregate", SqlTypeName.OTHER, SqlTypeName.VARCHAR); + TIMESERIESAGGREGATE("timeSeriesAggregate", SqlTypeName.OTHER, SqlTypeName.OTHER); private static final Set NAMES = Arrays.stream(values()) .flatMap(func -> Stream.of(func.name(), func.getName(), func.getName().toLowerCase())) diff --git a/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java b/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java index 8727f64ddcc7..8ae40ebbbaab 100644 --- a/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java +++ b/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; @@ -31,6 +32,9 @@ public class TimeSeriesPlanFragmenterTest { + private static final int SERIES_LIMIT = 1000; + private static final Map QUERY_OPTIONS = Collections.emptyMap(); + @Test public void testGetFragmentsWithMultipleLeafNodes() { /* @@ -136,7 +140,8 @@ public void testGetFragmentsWithSinglePlanNode() { private LeafTimeSeriesPlanNode createMockLeafNode(String id) { return new LeafTimeSeriesPlanNode(id, Collections.emptyList(), "someTableName", "someTimeColumn", - TimeUnit.SECONDS, 0L, "", "", null, Collections.emptyList()); + TimeUnit.SECONDS, 0L, "", "", null, Collections.emptyList(), + SERIES_LIMIT, QUERY_OPTIONS); } static class MockTimeSeriesPlanNode extends BaseTimeSeriesPlanNode { diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java index ecbc3b3f6bb6..f44f181ff984 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java @@ -48,6 +48,9 @@ * */ public class RangeTimeSeriesRequest { + // TODO: It's not ideal to have another default, that plays with numGroupsLimit, etc. + public static final int DEFAULT_SERIES_LIMIT = 100_000; + public static final int DEFAULT_NUM_GROUPS_LIMIT = -1; /** Engine allows a Pinot cluster to support multiple Time-Series Query Languages. */ private final String _language; /** Query is the raw query sent by the caller. */ @@ -63,11 +66,15 @@ public class RangeTimeSeriesRequest { private final long _stepSeconds; /** E2E timeout for the query. */ private final Duration _timeout; + /** Series limit for the query */ + private final int _limit; + /** The numGroupsLimit value used in Pinot's Grouping Algorithm. */ + private final int _numGroupsLimit; /** Full query string to allow language implementations to pass custom parameters. */ private final String _fullQueryString; public RangeTimeSeriesRequest(String language, String query, long startSeconds, long endSeconds, long stepSeconds, - Duration timeout, String fullQueryString) { + Duration timeout, int limit, int numGroupsLimit, String fullQueryString) { Preconditions.checkState(endSeconds >= startSeconds, "Invalid range. startSeconds " + "should be greater than or equal to endSeconds. Found startSeconds=%s and endSeconds=%s", startSeconds, endSeconds); @@ -77,6 +84,8 @@ public RangeTimeSeriesRequest(String language, String query, long startSeconds, _endSeconds = endSeconds; _stepSeconds = stepSeconds; _timeout = timeout; + _limit = limit; + _numGroupsLimit = numGroupsLimit; _fullQueryString = fullQueryString; } @@ -104,6 +113,14 @@ public Duration getTimeout() { return _timeout; } + public int getLimit() { + return _limit; + } + + public int getNumGroupsLimit() { + return _numGroupsLimit; + } + public String getFullQueryString() { return _fullQueryString; } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java index 3deb4c68e68d..b0c7046466eb 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java @@ -22,8 +22,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest; import org.apache.pinot.tsdb.spi.TimeBuckets; import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner; import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; @@ -44,6 +46,8 @@ public class LeafTimeSeriesPlanNode extends BaseTimeSeriesPlanNode { private final String _valueExpression; private final AggInfo _aggInfo; private final List _groupByExpressions; + private final Map _queryOptions; + private final int _limit; @JsonCreator public LeafTimeSeriesPlanNode( @@ -52,7 +56,8 @@ public LeafTimeSeriesPlanNode( @JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offsetSeconds") Long offsetSeconds, @JsonProperty("filterExpression") String filterExpression, @JsonProperty("valueExpression") String valueExpression, @JsonProperty("aggInfo") AggInfo aggInfo, - @JsonProperty("groupByExpressions") List groupByExpressions) { + @JsonProperty("groupByExpressions") List groupByExpressions, + @JsonProperty("limit") int limit, @JsonProperty("queryOptions") Map queryOptions) { super(id, inputs); _tableName = tableName; _timeColumn = timeColumn; @@ -62,17 +67,19 @@ public LeafTimeSeriesPlanNode( _valueExpression = valueExpression; _aggInfo = aggInfo; _groupByExpressions = groupByExpressions; + _limit = limit <= 0 ? RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT : limit; + _queryOptions = queryOptions; } public LeafTimeSeriesPlanNode withAggInfo(AggInfo newAggInfo) { return new LeafTimeSeriesPlanNode(_id, _inputs, _tableName, _timeColumn, _timeUnit, _offsetSeconds, - _filterExpression, _valueExpression, newAggInfo, _groupByExpressions); + _filterExpression, _valueExpression, newAggInfo, _groupByExpressions, _limit, _queryOptions); } @Override public BaseTimeSeriesPlanNode withInputs(List newInputs) { return new LeafTimeSeriesPlanNode(_id, newInputs, _tableName, _timeColumn, _timeUnit, _offsetSeconds, - _filterExpression, _valueExpression, _aggInfo, _groupByExpressions); + _filterExpression, _valueExpression, _aggInfo, _groupByExpressions, _limit, _queryOptions); } @Override @@ -83,8 +90,8 @@ public String getKlass() { @Override public String getExplainName() { return String.format("LEAF_TIME_SERIES_PLAN_NODE(%s, table=%s, timeExpr=%s, valueExpr=%s, aggInfo=%s, " - + "groupBy=%s, filter=%s, offsetSeconds=%s)", _id, _tableName, _timeColumn, _valueExpression, - _aggInfo.getAggFunction(), _groupByExpressions, _filterExpression, _offsetSeconds); + + "groupBy=%s, filter=%s, offsetSeconds=%s, limit=%s)", _id, _tableName, _timeColumn, _valueExpression, + _aggInfo.getAggFunction(), _groupByExpressions, _filterExpression, _offsetSeconds, _limit); } @Override @@ -124,6 +131,14 @@ public List getGroupByExpressions() { return _groupByExpressions; } + public int getLimit() { + return _limit; + } + + public Map getQueryOptions() { + return _queryOptions; + } + public String getEffectiveFilter(TimeBuckets timeBuckets) { String filter = _filterExpression == null ? "" : _filterExpression; long startTime = _timeUnit.convert(Duration.ofSeconds(timeBuckets.getTimeRangeStartExclusive() - _offsetSeconds)); diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java index d326ed49b58f..bb65619ceb83 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java @@ -18,8 +18,10 @@ */ package org.apache.pinot.tsdb.spi.plan; +import com.google.common.collect.ImmutableMap; import java.time.Duration; import java.util.Collections; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.pinot.tsdb.spi.AggInfo; import org.apache.pinot.tsdb.spi.TimeBuckets; @@ -33,6 +35,8 @@ public class LeafTimeSeriesPlanNodeTest { private static final String TABLE = "myTable"; private static final String TIME_COLUMN = "orderTime"; private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; + private static final int SERIES_LIMIT = 10; + private static final Map QUERY_OPTIONS = ImmutableMap.of("numGroupsLimit", "100000"); @Test public void testGetEffectiveFilter() { @@ -44,7 +48,7 @@ public void testGetEffectiveFilter() { { LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 0L, "", "value_col", - new AggInfo("SUM", false, null), Collections.singletonList("cityName")); + new AggInfo("SUM", false, null), Collections.singletonList("cityName"), SERIES_LIMIT, QUERY_OPTIONS); assertEquals(planNode.getEffectiveFilter(timeBuckets), "orderTime > " + expectedStartTimeInFilter + " AND orderTime <= " + expectedEndTimeInFilter); } @@ -52,7 +56,7 @@ public void testGetEffectiveFilter() { { LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 123L, "", "value_col", - new AggInfo("SUM", false, null), Collections.singletonList("cityName")); + new AggInfo("SUM", false, null), Collections.singletonList("cityName"), SERIES_LIMIT, QUERY_OPTIONS); assertEquals(planNode.getEffectiveFilter(timeBuckets), "orderTime > " + (expectedStartTimeInFilter - 123) + " AND orderTime <= " + (expectedEndTimeInFilter - 123)); } @@ -60,7 +64,8 @@ public void testGetEffectiveFilter() { { LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 123L, nonEmptyFilter, - "value_col", new AggInfo("SUM", false, Collections.emptyMap()), Collections.singletonList("cityName")); + "value_col", new AggInfo("SUM", false, Collections.emptyMap()), Collections.singletonList("cityName"), + SERIES_LIMIT, QUERY_OPTIONS); assertEquals(planNode.getEffectiveFilter(timeBuckets), String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", nonEmptyFilter, (expectedStartTimeInFilter - 123), (expectedEndTimeInFilter - 123))); @@ -70,7 +75,7 @@ public void testGetEffectiveFilter() { LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TimeUnit.MILLISECONDS, 123L, nonEmptyFilter, "value_col", new AggInfo("SUM", false, Collections.emptyMap()), - Collections.singletonList("cityName")); + Collections.singletonList("cityName"), SERIES_LIMIT, QUERY_OPTIONS); assertEquals(planNode.getEffectiveFilter(timeBuckets), String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", nonEmptyFilter, (expectedStartTimeInFilter * 1000 - 123 * 1000), (expectedEndTimeInFilter * 1000 - 123 * 1000))); diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java index 71bf2323fdb4..a8eb68a5b9ba 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.tsdb.spi.plan.serde; +import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -34,18 +35,24 @@ public class TimeSeriesPlanSerdeTest { + private static final int SERIES_LIMIT = 1000; + private static final Map QUERY_OPTIONS = ImmutableMap.of("numGroupsLimit", "1000"); + @Test public void testSerdeForScanFilterProjectNode() { Map aggParams = new HashMap<>(); aggParams.put("window", "5m"); - + // create leaf node LeafTimeSeriesPlanNode leafTimeSeriesPlanNode = new LeafTimeSeriesPlanNode("sfp#0", new ArrayList<>(), "myTable", "myTimeColumn", TimeUnit.MILLISECONDS, 0L, - "myFilterExpression", "myValueExpression", new AggInfo("SUM", false, aggParams), new ArrayList<>()); + "myFilterExpression", "myValueExpression", new AggInfo("SUM", false, aggParams), new ArrayList<>(), + SERIES_LIMIT, QUERY_OPTIONS); + // serialize and deserialize to re-create another node BaseTimeSeriesPlanNode planNode = TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(leafTimeSeriesPlanNode)); assertTrue(planNode instanceof LeafTimeSeriesPlanNode); LeafTimeSeriesPlanNode deserializedNode = (LeafTimeSeriesPlanNode) planNode; + // assert that deserialized node is same as serialized node assertEquals(deserializedNode.getTableName(), "myTable"); assertEquals(deserializedNode.getTimeColumn(), "myTimeColumn"); assertEquals(deserializedNode.getTimeUnit(), TimeUnit.MILLISECONDS); @@ -57,5 +64,7 @@ public void testSerdeForScanFilterProjectNode() { assertNotNull(deserializedNode.getAggInfo().getParams()); assertEquals(deserializedNode.getAggInfo().getParams().get("window"), "5m"); assertEquals(deserializedNode.getGroupByExpressions().size(), 0); + assertEquals(deserializedNode.getLimit(), SERIES_LIMIT); + assertEquals(deserializedNode.getQueryOptions(), QUERY_OPTIONS); } }