chore: add experimental rate/increase calc (#6432)

This commit is contained in:
Srikanth Chekuri 2024-11-13 11:47:56 +05:30 committed by GitHub
parent 01fda51959
commit 323da3494b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,6 +2,7 @@ package cumulative
import (
"fmt"
"os"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/constants"
@ -40,6 +41,9 @@ import (
const (
rateWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window)))`
increaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window)))`
experimentalRateWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
experimentalIncreaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window))`
)
// prepareTimeAggregationSubQueryTimeSeries prepares the sub-query to be used for temporal aggregation
@ -151,14 +155,22 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationRate:
innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
rateExp := rateWithoutNegative
if _, ok := os.LookupEnv("EXPERIMENTAL_RATE_WITHOUT_NEGATIVE"); ok {
rateExp = fmt.Sprintf(experimentalRateWithoutNegative, start)
}
rateQueryTmpl :=
"SELECT %s ts, " + rateWithoutNegative +
"SELECT %s ts, " + rateExp +
" as per_series_value FROM (%s) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)"
subQuery = fmt.Sprintf(rateQueryTmpl, selectLabels, innerSubQuery)
case v3.TimeAggregationIncrease:
innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
increaseExp := increaseWithoutNegative
if _, ok := os.LookupEnv("EXPERIMENTAL_INCREASE_WITHOUT_NEGATIVE"); ok {
increaseExp = experimentalIncreaseWithoutNegative
}
rateQueryTmpl :=
"SELECT %s ts, " + increaseWithoutNegative +
"SELECT %s ts, " + increaseExp +
" as per_series_value FROM (%s) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)"
subQuery = fmt.Sprintf(rateQueryTmpl, selectLabels, innerSubQuery)
}