chore: support custom step interval (#5186)

* chore: support custom step interval

* chore: add comments
This commit is contained in:
Srikanth Chekuri 2024-06-12 12:21:27 +05:30 committed by GitHub
parent a94231c00a
commit f01b4f2c03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 27 additions and 23 deletions

View File

@ -2,11 +2,11 @@ package v3
import (
"fmt"
"math"
"strings"
"time"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
@ -332,14 +332,8 @@ func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v
// start and end are in milliseconds
// step is in seconds
func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options Options) (string, error) {
start = start - (start % (mq.StepInterval * 1000))
// if the query is a rate query, we adjust the start time by one more step
// so that we can calculate the rate for the first data point
if mq.AggregateOperator.IsRateOperator() && mq.Temporality != v3.Delta {
start -= mq.StepInterval * 1000
}
adjustStep := int64(math.Min(float64(mq.StepInterval), 60))
end = end - (end % (adjustStep * 1000))
start, end = common.AdjustedMetricTimeRange(start, end, mq.StepInterval, *mq)
// if the aggregate operator is a histogram quantile, and user has not forgotten
// the le tag in the group by then add the le tag to the group by

View File

@ -19,7 +19,7 @@ import (
// step is in seconds
func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options metricsV3.Options) (string, error) {
start, end = common.AdjustedMetricTimeRange(start, end, mq.StepInterval, mq.TimeAggregation)
start, end = common.AdjustedMetricTimeRange(start, end, mq.StepInterval, *mq)
var quantile float64

View File

@ -266,7 +266,7 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) {
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY ts ORDER BY ts ASC",
expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY ts ORDER BY ts ASC",
},
{
name: "test time aggregation = rate, space aggregation = sum, temporality = delta, group by service_name",
@ -292,7 +292,7 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) {
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
},
}

View File

@ -14,6 +14,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/cache/status"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess"
"go.uber.org/zap"
)
@ -121,7 +122,7 @@ func (q *querier) runBuilderQuery(
cachedData = data
}
}
misses := q.findMissingTimeRanges(start, end, params.Step, cachedData)
misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
@ -256,7 +257,7 @@ func (q *querier) runBuilderQuery(
cachedData = data
}
}
misses := q.findMissingTimeRanges(start, end, params.Step, cachedData)
misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
@ -358,7 +359,8 @@ func (q *querier) runBuilderExpression(
cachedData = data
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
step := postprocess.StepIntervalForFunction(params, queryName)
misses := q.findMissingTimeRanges(params.Start, params.End, step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {

View File

@ -123,7 +123,7 @@ func (q *querier) runBuilderQuery(
cachedData = data
}
}
misses := q.findMissingTimeRanges(start, end, params.Step, cachedData)
misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
@ -257,7 +257,7 @@ func (q *querier) runBuilderQuery(
cachedData = data
}
}
misses := q.findMissingTimeRanges(start, end, params.Step, cachedData)
misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {

View File

@ -8,13 +8,16 @@ import (
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func AdjustedMetricTimeRange(start, end, step int64, aggregaOperator v3.TimeAggregation) (int64, int64) {
func AdjustedMetricTimeRange(start, end, step int64, mq v3.BuilderQuery) (int64, int64) {
// align the start to the step interval
start = start - (start % (step * 1000))
// if the query is a rate query, we adjust the start time by one more step
// so that we can calculate the rate for the first data point
if aggregaOperator.IsRateOperator() {
if (mq.AggregateOperator.IsRateOperator() || mq.TimeAggregation.IsRateOperator()) &&
mq.Temporality != v3.Delta {
start -= step * 1000
}
// align the end to the nearest minute
adjustStep := int64(math.Min(float64(step), 60))
end = end - (end % (adjustStep * 1000))
return start, end
@ -28,6 +31,9 @@ func PastDayRoundOff() int64 {
// start and end are in milliseconds
func MinAllowedStepInterval(start, end int64) int64 {
step := (end - start) / constants.MaxAllowedPointsInTimeSeries / 1000
if step < 60 {
return step
}
// return the nearest lower multiple of 60
return step - step%60
}

View File

@ -6,7 +6,7 @@ import (
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func stepIntervalForFunction(params *v3.QueryRangeParamsV3, query string) int64 {
func StepIntervalForFunction(params *v3.QueryRangeParamsV3, query string) int64 {
q := params.CompositeQuery.BuilderQueries[query]
if q.QueryName != q.Expression {
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(q.Expression, EvalFuncs())
@ -61,7 +61,7 @@ func FillGaps(results []*v3.Result, params *v3.QueryRangeParamsV3) {
builderQueries := params.CompositeQuery.BuilderQueries
if builderQueries != nil {
// The values should be added at the intervals of `step`
step := stepIntervalForFunction(params, result.QueryName)
step := StepIntervalForFunction(params, result.QueryName)
for idx := range result.Series {
result.Series[idx] = fillGap(result.Series[idx], params.Start, params.End, step)
}

View File

@ -427,7 +427,6 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 {
// 60 seconds (SDK) + 10 seconds (batch) + rest for n/w + serialization + write to disk etc..
start := ts.Add(-time.Duration(r.evalWindow)).UnixMilli() - 2*60*1000
end := ts.UnixMilli() - 2*60*1000
// round to minute otherwise we could potentially miss data
start = start - (start % (60 * 1000))
end = end - (end % (60 * 1000))
@ -478,7 +477,10 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 {
if r.ruleCondition.CompositeQuery != nil && r.ruleCondition.CompositeQuery.BuilderQueries != nil {
for _, q := range r.ruleCondition.CompositeQuery.BuilderQueries {
q.StepInterval = int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60))
// If the step interval is less than the minimum allowed step interval, set it to the minimum allowed step interval
if minStep := common.MinAllowedStepInterval(start, end); q.StepInterval < minStep {
q.StepInterval = minStep
}
}
}