From f01b4f2c030097a20763c28a58511c69476f13c0 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Wed, 12 Jun 2024 12:21:27 +0530 Subject: [PATCH] chore: support custom step interval (#5186) * chore: support custom step interval * chore: add comments --- pkg/query-service/app/metrics/v3/query_builder.go | 12 +++--------- pkg/query-service/app/metrics/v4/query_builder.go | 2 +- .../app/metrics/v4/query_builder_test.go | 4 ++-- pkg/query-service/app/querier/helper.go | 8 +++++--- pkg/query-service/app/querier/v2/helper.go | 4 ++-- pkg/query-service/common/query_range.go | 10 ++++++++-- pkg/query-service/postprocess/gaps.go | 4 ++-- pkg/query-service/rules/thresholdRule.go | 6 ++++-- 8 files changed, 27 insertions(+), 23 deletions(-) diff --git a/pkg/query-service/app/metrics/v3/query_builder.go b/pkg/query-service/app/metrics/v3/query_builder.go index 9307363361..bb13937d4e 100644 --- a/pkg/query-service/app/metrics/v3/query_builder.go +++ b/pkg/query-service/app/metrics/v3/query_builder.go @@ -2,11 +2,11 @@ package v3 import ( "fmt" - "math" "strings" "time" "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" + "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" @@ -332,14 +332,8 @@ func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v // start and end are in milliseconds // step is in seconds func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options Options) (string, error) { - start = start - (start % (mq.StepInterval * 1000)) - // if the query is a rate query, we adjust the start time by one more step - // so that we can calculate the rate for the first data point - if mq.AggregateOperator.IsRateOperator() && mq.Temporality != v3.Delta { - start -= mq.StepInterval * 1000 - } - adjustStep := int64(math.Min(float64(mq.StepInterval), 60)) - end = end - (end % (adjustStep * 1000)) + + start, end = common.AdjustedMetricTimeRange(start, end, mq.StepInterval, *mq) // if the aggregate operator is a histogram quantile, and user has not forgotten // the le tag in the group by then add the le tag to the group by diff --git a/pkg/query-service/app/metrics/v4/query_builder.go b/pkg/query-service/app/metrics/v4/query_builder.go index bd8813dd3a..380fb44f22 100644 --- a/pkg/query-service/app/metrics/v4/query_builder.go +++ b/pkg/query-service/app/metrics/v4/query_builder.go @@ -19,7 +19,7 @@ import ( // step is in seconds func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options metricsV3.Options) (string, error) { - start, end = common.AdjustedMetricTimeRange(start, end, mq.StepInterval, mq.TimeAggregation) + start, end = common.AdjustedMetricTimeRange(start, end, mq.StepInterval, *mq) var quantile float64 diff --git a/pkg/query-service/app/metrics/v4/query_builder_test.go b/pkg/query-service/app/metrics/v4/query_builder_test.go index 790fa670c8..08d0f087ff 100644 --- a/pkg/query-service/app/metrics/v4/query_builder_test.go +++ b/pkg/query-service/app/metrics/v4/query_builder_test.go @@ -266,7 +266,7 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) { TimeAggregation: v3.TimeAggregationRate, SpaceAggregation: v3.SpaceAggregationSum, }, - expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY ts ORDER BY ts ASC", + expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY ts ORDER BY ts ASC", }, { name: "test time aggregation = rate, space aggregation = sum, temporality = delta, group by service_name", @@ -292,7 +292,7 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) { TimeAggregation: v3.TimeAggregationRate, SpaceAggregation: v3.SpaceAggregationSum, }, - expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", + expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", }, } diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index 71ee5da72d..d65627bd92 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -14,6 +14,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/cache/status" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/postprocess" "go.uber.org/zap" ) @@ -121,7 +122,7 @@ func (q *querier) runBuilderQuery( cachedData = data } } - misses := q.findMissingTimeRanges(start, end, params.Step, cachedData) + misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) missedSeries := make([]*v3.Series, 0) cachedSeries := make([]*v3.Series, 0) for _, miss := range misses { @@ -256,7 +257,7 @@ func (q *querier) runBuilderQuery( cachedData = data } } - misses := q.findMissingTimeRanges(start, end, params.Step, cachedData) + misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) missedSeries := make([]*v3.Series, 0) cachedSeries := make([]*v3.Series, 0) for _, miss := range misses { @@ -358,7 +359,8 @@ func (q *querier) runBuilderExpression( cachedData = data } } - misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) + step := postprocess.StepIntervalForFunction(params, queryName) + misses := q.findMissingTimeRanges(params.Start, params.End, step, cachedData) missedSeries := make([]*v3.Series, 0) cachedSeries := make([]*v3.Series, 0) for _, miss := range misses { diff --git a/pkg/query-service/app/querier/v2/helper.go b/pkg/query-service/app/querier/v2/helper.go index 130ba09fc9..04f798ad1b 100644 --- a/pkg/query-service/app/querier/v2/helper.go +++ b/pkg/query-service/app/querier/v2/helper.go @@ -123,7 +123,7 @@ func (q *querier) runBuilderQuery( cachedData = data } } - misses := q.findMissingTimeRanges(start, end, params.Step, cachedData) + misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) missedSeries := make([]*v3.Series, 0) cachedSeries := make([]*v3.Series, 0) for _, miss := range misses { @@ -257,7 +257,7 @@ func (q *querier) runBuilderQuery( cachedData = data } } - misses := q.findMissingTimeRanges(start, end, params.Step, cachedData) + misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) missedSeries := make([]*v3.Series, 0) cachedSeries := make([]*v3.Series, 0) for _, miss := range misses { diff --git a/pkg/query-service/common/query_range.go b/pkg/query-service/common/query_range.go index c3f95207d6..c119f2d25e 100644 --- a/pkg/query-service/common/query_range.go +++ b/pkg/query-service/common/query_range.go @@ -8,13 +8,16 @@ import ( v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) -func AdjustedMetricTimeRange(start, end, step int64, aggregaOperator v3.TimeAggregation) (int64, int64) { +func AdjustedMetricTimeRange(start, end, step int64, mq v3.BuilderQuery) (int64, int64) { + // align the start to the step interval start = start - (start % (step * 1000)) // if the query is a rate query, we adjust the start time by one more step // so that we can calculate the rate for the first data point - if aggregaOperator.IsRateOperator() { + if (mq.AggregateOperator.IsRateOperator() || mq.TimeAggregation.IsRateOperator()) && + mq.Temporality != v3.Delta { start -= step * 1000 } + // align the end to the nearest minute adjustStep := int64(math.Min(float64(step), 60)) end = end - (end % (adjustStep * 1000)) return start, end @@ -28,6 +31,9 @@ func PastDayRoundOff() int64 { // start and end are in milliseconds func MinAllowedStepInterval(start, end int64) int64 { step := (end - start) / constants.MaxAllowedPointsInTimeSeries / 1000 + if step < 60 { + return step + } // return the nearest lower multiple of 60 return step - step%60 } diff --git a/pkg/query-service/postprocess/gaps.go b/pkg/query-service/postprocess/gaps.go index ac80c4a180..170d77844d 100644 --- a/pkg/query-service/postprocess/gaps.go +++ b/pkg/query-service/postprocess/gaps.go @@ -6,7 +6,7 @@ import ( v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) -func stepIntervalForFunction(params *v3.QueryRangeParamsV3, query string) int64 { +func StepIntervalForFunction(params *v3.QueryRangeParamsV3, query string) int64 { q := params.CompositeQuery.BuilderQueries[query] if q.QueryName != q.Expression { expression, _ := govaluate.NewEvaluableExpressionWithFunctions(q.Expression, EvalFuncs()) @@ -61,7 +61,7 @@ func FillGaps(results []*v3.Result, params *v3.QueryRangeParamsV3) { builderQueries := params.CompositeQuery.BuilderQueries if builderQueries != nil { // The values should be added at the intervals of `step` - step := stepIntervalForFunction(params, result.QueryName) + step := StepIntervalForFunction(params, result.QueryName) for idx := range result.Series { result.Series[idx] = fillGap(result.Series[idx], params.Start, params.End, step) } diff --git a/pkg/query-service/rules/thresholdRule.go b/pkg/query-service/rules/thresholdRule.go index 640c7a5b4f..2af4ce647f 100644 --- a/pkg/query-service/rules/thresholdRule.go +++ b/pkg/query-service/rules/thresholdRule.go @@ -427,7 +427,6 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 { // 60 seconds (SDK) + 10 seconds (batch) + rest for n/w + serialization + write to disk etc.. start := ts.Add(-time.Duration(r.evalWindow)).UnixMilli() - 2*60*1000 end := ts.UnixMilli() - 2*60*1000 - // round to minute otherwise we could potentially miss data start = start - (start % (60 * 1000)) end = end - (end % (60 * 1000)) @@ -478,7 +477,10 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 { if r.ruleCondition.CompositeQuery != nil && r.ruleCondition.CompositeQuery.BuilderQueries != nil { for _, q := range r.ruleCondition.CompositeQuery.BuilderQueries { - q.StepInterval = int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)) + // If the step interval is less than the minimum allowed step interval, set it to the minimum allowed step interval + if minStep := common.MinAllowedStepInterval(start, end); q.StepInterval < minStep { + q.StepInterval = minStep + } } }