Skip to content

Commit

Permalink
Add watermark as separate param
Browse files Browse the repository at this point in the history
  • Loading branch information
AdityaHegde committed Jan 14, 2025
1 parent 1fbb43a commit 43b7da2
Show file tree
Hide file tree
Showing 14 changed files with 105 additions and 44 deletions.
8 changes: 4 additions & 4 deletions runtime/compilers/rillv1/parse_alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
// AlertYAML is the raw structure of an Alert resource defined in YAML (does not include common fields)
type AlertYAML struct {
commonYAML `yaml:",inline"` // Not accessed here, only setting it so we can use KnownFields for YAML parsing
DisplayName string `yaml:"display_name"`
Title string `yaml:"title"` // Deprecated: use display_name
Refresh *ScheduleYAML `yaml:"refresh"`
Watermark string `yaml:"watermark"` // options: "trigger_time", "inherit"
DisplayName string `yaml:"display_name"`
Title string `yaml:"title"` // Deprecated: use display_name
Refresh *ScheduleYAML `yaml:"refresh"`
Watermark string `yaml:"watermark"` // options: "trigger_time", "inherit"
Intervals struct {
Duration string `yaml:"duration"`
Limit uint `yaml:"limit"`
Expand Down
15 changes: 6 additions & 9 deletions runtime/compilers/rillv1/parse_canvas.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/google/uuid"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/pkg/rilltime"
"golang.org/x/exp/maps"
"gopkg.in/yaml.v3"
)
Expand Down Expand Up @@ -72,18 +73,14 @@ func (p *Parser) parseCanvas(node *Node) error {
// Build and validate time ranges
var timeRanges []*runtimev1.ExploreTimeRange
for _, tr := range tmp.TimeRanges {
if err := validateISO8601(tr.Range, false, false); err != nil {
if _, err := rilltime.Parse(tr.Range); err != nil {
return fmt.Errorf("invalid time range %q: %w", tr.Range, err)
}
res := &runtimev1.ExploreTimeRange{Range: tr.Range}
for _, ctr := range tr.ComparisonTimeRanges {
if err := validateISO8601(ctr.Offset, false, false); err != nil {
return fmt.Errorf("invalid comparison offset %q: %w", ctr.Offset, err)
}
if ctr.Range != "" {
if err := validateISO8601(ctr.Range, false, false); err != nil {
return fmt.Errorf("invalid comparison range %q: %w", ctr.Range, err)
}
err = rilltime.ParseCompatibility(ctr.Range, ctr.Offset)
if err != nil {
return err
}
res.ComparisonTimeRanges = append(res.ComparisonTimeRanges, &runtimev1.ExploreComparisonTimeRange{
Offset: ctr.Offset,
Expand Down Expand Up @@ -148,7 +145,7 @@ func (p *Parser) parseCanvas(node *Node) error {
var defaultPreset *runtimev1.CanvasPreset
if tmp.Defaults != nil {
if tmp.Defaults.TimeRange != "" {
if err := validateISO8601(tmp.Defaults.TimeRange, false, false); err != nil {
if _, err := rilltime.Parse(tmp.Defaults.TimeRange); err != nil {
return fmt.Errorf("invalid time range %q: %w", tmp.Defaults.TimeRange, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/compilers/rillv1/parse_explore.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type ExploreYAML struct {
commonYAML `yaml:",inline"` // Not accessed here, only setting it so we can use KnownFields for YAML parsing
commonYAML `yaml:",inline"` // Not accessed here, only setting it so we can use KnownFields for YAML parsing
DisplayName string `yaml:"display_name"`
Title string `yaml:"title"` // Deprecated: use display_name
Description string `yaml:"description"`
Expand Down
29 changes: 15 additions & 14 deletions runtime/compilers/rillv1/parse_metrics_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,31 @@ import (
"fmt"
"strings"
"time"
// Load IANA time zone data
_ "time/tzdata"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/pkg/rilltime"
"google.golang.org/protobuf/types/known/structpb"
"gopkg.in/yaml.v3"

// Load IANA time zone data
_ "time/tzdata"
)

// MetricsViewYAML is the raw structure of a MetricsView resource defined in YAML
type MetricsViewYAML struct {
commonYAML `yaml:",inline"` // Not accessed here, only setting it so we can use KnownFields for YAML parsing
DisplayName string `yaml:"display_name"`
Title string `yaml:"title"` // Deprecated: use display_name
Description string `yaml:"description"`
Model string `yaml:"model"`
Database string `yaml:"database"`
DatabaseSchema string `yaml:"database_schema"`
Table string `yaml:"table"`
TimeDimension string `yaml:"timeseries"`
Watermark string `yaml:"watermark"`
SmallestTimeGrain string `yaml:"smallest_time_grain"`
FirstDayOfWeek uint32 `yaml:"first_day_of_week"`
FirstMonthOfYear uint32 `yaml:"first_month_of_year"`
DisplayName string `yaml:"display_name"`
Title string `yaml:"title"` // Deprecated: use display_name
Description string `yaml:"description"`
Model string `yaml:"model"`
Database string `yaml:"database"`
DatabaseSchema string `yaml:"database_schema"`
Table string `yaml:"table"`
TimeDimension string `yaml:"timeseries"`
Watermark string `yaml:"watermark"`
SmallestTimeGrain string `yaml:"smallest_time_grain"`
FirstDayOfWeek uint32 `yaml:"first_day_of_week"`
FirstMonthOfYear uint32 `yaml:"first_month_of_year"`
Dimensions []*struct {
Name string
DisplayName string `yaml:"display_name"`
Expand Down
8 changes: 4 additions & 4 deletions runtime/compilers/rillv1/parse_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
// ReportYAML is the raw structure of a Report resource defined in YAML (does not include common fields)
type ReportYAML struct {
commonYAML `yaml:",inline"` // Not accessed here, only setting it so we can use KnownFields for YAML parsing
DisplayName string `yaml:"display_name"`
Title string `yaml:"title"` // Deprecated: use display_name
Refresh *ScheduleYAML `yaml:"refresh"`
Watermark string `yaml:"watermark"` // options: "trigger_time", "inherit"
DisplayName string `yaml:"display_name"`
Title string `yaml:"title"` // Deprecated: use display_name
Refresh *ScheduleYAML `yaml:"refresh"`
Watermark string `yaml:"watermark"` // options: "trigger_time", "inherit"
Intervals struct {
Duration string `yaml:"duration"`
Limit uint `yaml:"limit"`
Expand Down
33 changes: 33 additions & 0 deletions runtime/metricsview/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (e *Executor) Watermark(ctx context.Context) (time.Time, error) {
return e.loadWatermark(ctx, nil)
}

// MinTime is a temporary function that fetches min time. Will be replaced with metrics_time_range resolver in a future PR.
func (e *Executor) MinTime(ctx context.Context, colName string) (time.Time, error) {
if colName == "" {
// we cannot get min time without a time dimension or a column name specified. return a 0 time
Expand Down Expand Up @@ -164,6 +165,38 @@ func (e *Executor) MinTime(ctx context.Context, colName string) (time.Time, erro
return t, nil
}

// MaxTime is a temporary function that fetches max time. Will be replaced with metrics_time_range resolver in a future PR.
func (e *Executor) MaxTime(ctx context.Context, colName string) (time.Time, error) {
if colName == "" {
// we cannot get min time without a time dimension or a column name specified. return a 0 time
return time.Time{}, nil
}

dialect := e.olap.Dialect()
sql := fmt.Sprintf("SELECT MAX(%s) FROM %s", dialect.EscapeIdentifier(colName), dialect.EscapeTable(e.metricsView.Database, e.metricsView.DatabaseSchema, e.metricsView.Table))

res, err := e.olap.Execute(ctx, &drivers.Statement{
Query: sql,
Priority: e.priority,
ExecutionTimeout: defaultInteractiveTimeout,
})
if err != nil {
return time.Time{}, err
}
defer res.Close()

var t time.Time
if res.Next() {
if err := res.Scan(&t); err != nil {
return time.Time{}, fmt.Errorf("failed to scan time anchor: %w", err)
}
}
if res.Err() != nil {
return time.Time{}, fmt.Errorf("failed to scan time anchor: %w", res.Err())
}
return t, nil
}

// Schema returns a schema for the metrics view's dimensions and measures.
func (e *Executor) Schema(ctx context.Context) (*runtimev1.StructType, error) {
if !e.security.CanAccess() {
Expand Down
8 changes: 7 additions & 1 deletion runtime/metricsview/executor_rewrite_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,16 @@ func (e *Executor) resolveTimeRange(ctx context.Context, tr *TimeRange, tz *time
return err
}

maxTime, err := e.MaxTime(ctx, e.metricsView.TimeDimension)
if err != nil {
return err
}

tr.Start, tr.End, err = rillTime.Eval(rilltime.EvalOptions{
Now: time.Now(),
MinTime: minTime,
MaxTime: watermark,
MaxTime: maxTime,
Watermark: watermark,
FirstDay: int(e.metricsView.FirstDayOfWeek),
FirstMonth: int(e.metricsView.FirstMonthOfYear),
})
Expand Down
3 changes: 3 additions & 0 deletions runtime/pkg/metricssql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ func (c *Compiler) Rewrite(ctx context.Context, sql string) (*metricsview.Query,
if err := q.parseFrom(ctx, stmt.From); err != nil {
return nil, err
}
if q.executor != nil {
defer q.executor.Close()
}

// parse select fields
if err := q.parseSelect(stmt.Fields); err != nil {
Expand Down
14 changes: 12 additions & 2 deletions runtime/pkg/metricssql/time_range_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@ func (q *query) parseTimeRangeStart(ctx context.Context, node *ast.FuncCallExpr)
if err != nil {
return nil, err
}
maxTime, err := q.executor.MaxTime(ctx, colName)
if err != nil {
return nil, err
}

watermark, _, err = rillTime.Eval(rilltime.EvalOptions{
Now: time.Now(),
MinTime: minTime,
MaxTime: watermark,
MaxTime: maxTime,
Watermark: watermark,
FirstDay: int(q.metricsViewSpec.FirstDayOfWeek),
FirstMonth: int(q.metricsViewSpec.FirstMonthOfYear),
})
Expand Down Expand Up @@ -65,11 +70,16 @@ func (q *query) parseTimeRangeEnd(ctx context.Context, node *ast.FuncCallExpr) (
if err != nil {
return nil, err
}
maxTime, err := q.executor.MaxTime(ctx, colName)
if err != nil {
return nil, err
}

_, watermark, err = rillTime.Eval(rilltime.EvalOptions{
Now: time.Now(),
MinTime: minTime,
MaxTime: watermark,
MaxTime: maxTime,
Watermark: watermark,
FirstDay: int(q.metricsViewSpec.FirstDayOfWeek),
FirstMonth: int(q.metricsViewSpec.FirstMonthOfYear),
})
Expand Down
12 changes: 7 additions & 5 deletions runtime/pkg/rilltime/rilltime.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func ParseISO(from string, strict bool) (*Expression, error) {

rt := &Expression{
Start: &TimeAnchor{},
End: &TimeAnchor{Now: true},
End: &TimeAnchor{Latest: true},
// mirrors old UI behaviour
isComplete: false,
}
Expand Down Expand Up @@ -268,10 +268,12 @@ func (e *Expression) Modify(evalOpts EvalOptions, ta *TimeAnchor, tm time.Time,

if ta.isoDuration != nil {
// handling for old iso format
return ta.isoDuration.Sub(evalOpts.MinTime.In(e.timeZone))
}

if ta.Now {
tm = ta.isoDuration.Sub(evalOpts.MaxTime.In(e.timeZone))
isTruncate = true
if e.grain != nil && e.grain.Grain != "" {
truncateGrain = grainMap[e.grain.Grain]
}
} else if ta.Now {
tm = evalOpts.Now.In(e.timeZone)
isTruncate = e.isComplete
isBoundary = true
Expand Down
8 changes: 7 additions & 1 deletion runtime/pkg/rilltime/rilltime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
func Test_Resolve(t *testing.T) {
now := parseTestTime(t, "2024-08-09T10:32:36Z")
maxTime := parseTestTime(t, "2024-08-06T06:32:36Z")
watermark := parseTestTime(t, "2024-08-05T06:32:36Z")
testCases := []struct {
timeRange string
start string
Expand All @@ -30,6 +31,8 @@ func Test_Resolve(t *testing.T) {
{`-2d, now/d : h @ -5d`, "2024-08-02T00:00:00Z", "2024-08-04T00:00:00Z"},
{`-2d, now/d @ -5d`, "2024-08-02T00:00:00Z", "2024-08-04T00:00:00Z"},

{`watermark-7D, watermark : h`, "2024-07-29T07:00:00Z", "2024-08-05T07:00:00Z"},

{`-7d, now/d : h @ {Asia/Kathmandu}`, "2024-08-01T18:15:00Z", "2024-08-08T18:15:00Z"},
{`-7d, now/d : |h| @ {Asia/Kathmandu}`, "2024-08-01T18:15:00Z", "2024-08-08T18:15:00Z"},
{`-7d, now/d : |h| @ -5d {Asia/Kathmandu}`, "2024-07-27T18:15:00Z", "2024-08-03T18:15:00Z"},
Expand All @@ -47,7 +50,9 @@ func Test_Resolve(t *testing.T) {
{`-7W+5d, latest : h`, "2024-06-17T00:00:00Z", "2024-08-06T07:00:00Z"},
{`-7W+8d, latest : h`, "2024-06-24T00:00:00Z", "2024-08-06T07:00:00Z"},

// TODO: add backwards compatibility tests
{"P2DT10H", "2024-08-03T20:00:00Z", "2024-08-06T07:32:36Z"},
{"rill-MTD", "2024-08-01T00:00:00Z", "2024-08-06T06:32:37Z"},
{"rill-PW", "2024-07-29T00:00:00Z", "2024-08-05T00:00:00Z"},
}

for _, tc := range testCases {
Expand All @@ -59,6 +64,7 @@ func Test_Resolve(t *testing.T) {
Now: now,
MinTime: now.AddDate(-1, 0, 0),
MaxTime: maxTime,
Watermark: watermark,
FirstDay: 1,
FirstMonth: 1,
})
Expand Down
4 changes: 2 additions & 2 deletions runtime/queries/metricsview_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (q *MetricsViewAggregation) rewriteToMetricsViewQuery(export bool) (*metric
if q.TimeRange.End != nil {
res.End = q.TimeRange.End.AsTime()
}
res.Expression = q.TimeRange.RillTime
res.Expression = q.TimeRange.Expression
res.IsoDuration = q.TimeRange.IsoDuration
res.IsoOffset = q.TimeRange.IsoOffset
res.RoundToGrain = metricsview.TimeGrainFromProto(q.TimeRange.RoundToGrain)
Expand All @@ -268,7 +268,7 @@ func (q *MetricsViewAggregation) rewriteToMetricsViewQuery(export bool) (*metric
if q.ComparisonTimeRange.End != nil {
res.End = q.ComparisonTimeRange.End.AsTime()
}
res.Expression = q.ComparisonTimeRange.RillTime
res.Expression = q.ComparisonTimeRange.Expression
res.IsoDuration = q.ComparisonTimeRange.IsoDuration
res.IsoOffset = q.ComparisonTimeRange.IsoOffset
res.RoundToGrain = metricsview.TimeGrainFromProto(q.ComparisonTimeRange.RoundToGrain)
Expand Down
4 changes: 3 additions & 1 deletion runtime/queries/metricsview_resolve_time_ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type MetricsViewTimeRanges struct {
MetricsViewName string `json:"metrics_view_name,omitempty"`
MinTime time.Time `json:"min_time,omitempty"`
MaxTime time.Time `json:"max_time,omitempty"`
Expressions []string `json:"expressions,omitempty"`
SecurityClaims *runtime.SecurityClaims `json:"security_claims,omitempty"`

Expand Down Expand Up @@ -86,7 +87,8 @@ func (q *MetricsViewTimeRanges) Resolve(ctx context.Context, rt *runtime.Runtime
start, end, err := rillTime.Eval(rilltime.EvalOptions{
Now: now,
MinTime: q.MinTime,
MaxTime: watermark,
MaxTime: q.MaxTime,
Watermark: watermark,
FirstDay: int(mv.ValidSpec.FirstDayOfWeek),
FirstMonth: int(mv.ValidSpec.FirstMonthOfYear),
})
Expand Down
1 change: 1 addition & 0 deletions runtime/server/queries_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ func (s *Server) MetricsViewTimeRanges(ctx context.Context, req *runtimev1.Metri
q := &queries.MetricsViewTimeRanges{
MetricsViewName: req.MetricsViewName,
MinTime: timeRangeQuery.Result.TimeRangeSummary.Min.AsTime(),
MaxTime: timeRangeQuery.Result.TimeRangeSummary.Max.AsTime(),
Expressions: req.Expressions,
SecurityClaims: auth.GetClaims(ctx).SecurityClaims(),
}
Expand Down

0 comments on commit 43b7da2

Please sign in to comment.