mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-14 15:26:00 +08:00
fix: use window function lagInFrame for rate calculation (#4068)
This commit is contained in:
parent
dae817640b
commit
b6a79ab22c
@ -3,6 +3,7 @@ package v3
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
@ -129,13 +130,18 @@ func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableNam
|
||||
rateGroupBy := "fingerprint, " + groupBy
|
||||
rateGroupTags := "fingerprint, " + groupTags
|
||||
rateOrderBy := "fingerprint, " + orderBy
|
||||
partitionBy := "fingerprint"
|
||||
if len(groupTags) != 0 {
|
||||
partitionBy += ", " + groupTags
|
||||
partitionBy = strings.Trim(partitionBy, ", ")
|
||||
}
|
||||
op := "max(value)"
|
||||
subQuery := fmt.Sprintf(
|
||||
queryTmplCounterInner, rateGroupTags, step, op, filterSubQuery, rateGroupBy, rateOrderBy,
|
||||
) // labels will be same so any should be fine
|
||||
query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0`
|
||||
query = fmt.Sprintf(query, groupTags, subQuery)
|
||||
query = fmt.Sprintf(`SELECT %s toStartOfHour(now()) as ts, %s(value)/%d as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, aggregateOperatorToSQLFunc[mq.AggregateOperator], points, query, groupBy, orderBy)
|
||||
query := `SELECT %s ts, ` + rateWithoutNegative + `as rate_value FROM(%s) WINDOW rate_window as (PARTITION BY %s ORDER BY %s ts)`
|
||||
query = fmt.Sprintf(query, groupTags, subQuery, partitionBy, rateOrderBy)
|
||||
query = fmt.Sprintf(`SELECT %s toStartOfHour(now()) as ts, %s(rate_value)/%d as value FROM (%s) WHERE isNaN(rate_value) = 0 GROUP BY %s ORDER BY %s ts`, groupTags, aggregateOperatorToSQLFunc[mq.AggregateOperator], points, query, groupBy, orderBy)
|
||||
return query, nil
|
||||
case
|
||||
v3.AggregateOperatorRateSum,
|
||||
@ -145,8 +151,13 @@ func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableNam
|
||||
step = ((end - start + 1) / 1000) / 2
|
||||
op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator])
|
||||
subQuery := fmt.Sprintf(queryTmplCounterInner, groupTags, step, op, filterSubQuery, groupBy, orderBy)
|
||||
query := `SELECT %s toStartOfHour(now()) as ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0`
|
||||
query = fmt.Sprintf(query, groupTags, subQuery)
|
||||
partitionBy := ""
|
||||
if len(groupTags) != 0 {
|
||||
partitionBy = "PARTITION BY " + groupTags
|
||||
partitionBy = strings.Trim(partitionBy, ", ")
|
||||
}
|
||||
query := `SELECT %s toStartOfHour(now()) as ts, ` + rateWithoutNegative + `as value FROM(%s) WINDOW rate_window as (%s ORDER BY %s ts)`
|
||||
query = fmt.Sprintf(query, groupTags, subQuery, partitionBy, groupTags)
|
||||
return query, nil
|
||||
case
|
||||
v3.AggregateOperatorP05,
|
||||
@ -165,13 +176,18 @@ func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableNam
|
||||
rateGroupBy := "fingerprint, " + groupBy
|
||||
rateGroupTags := "fingerprint, " + groupTags
|
||||
rateOrderBy := "fingerprint, " + orderBy
|
||||
partitionBy := "fingerprint"
|
||||
if len(groupTags) != 0 {
|
||||
partitionBy += ", " + groupTags
|
||||
partitionBy = strings.Trim(partitionBy, ", ")
|
||||
}
|
||||
op := "max(value)"
|
||||
subQuery := fmt.Sprintf(
|
||||
queryTmplCounterInner, rateGroupTags, step, op, filterSubQuery, rateGroupBy, rateOrderBy,
|
||||
) // labels will be same so any should be fine
|
||||
query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WHERE isNaN(value) = 0`
|
||||
query = fmt.Sprintf(query, groupTags, subQuery)
|
||||
query = fmt.Sprintf(`SELECT %s toStartOfHour(now()) as ts, sum(value)/%d as value FROM (%s) GROUP BY %s HAVING isNaN(value) = 0 ORDER BY %s ts`, groupTags, points, query, groupBy, orderBy)
|
||||
query := `SELECT %s ts, ` + rateWithoutNegative + ` as rate_value FROM(%s) WINDOW rate_window as (PARTITION BY %s ORDER BY %s ts)`
|
||||
query = fmt.Sprintf(query, groupTags, subQuery, partitionBy, rateOrderBy)
|
||||
query = fmt.Sprintf(`SELECT %s toStartOfHour(now()) as ts, sum(rate_value)/%d as value FROM (%s) WHERE isNaN(rate_value) = 0 GROUP BY %s ORDER BY %s ts`, groupTags, points, query, groupBy, orderBy)
|
||||
value := aggregateOperatorToPercentile[mq.AggregateOperator]
|
||||
|
||||
query = fmt.Sprintf(`SELECT %s toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, orderWithoutLe)
|
||||
|
@ -38,7 +38,7 @@ func TestPanelTableForCumulative(t *testing.T) {
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT toStartOfHour(now()) as ts, sum(value)/29 as value FROM (SELECT ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY ts ORDER BY ts",
|
||||
expected: "SELECT toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "latency p50",
|
||||
@ -61,7 +61,7 @@ func TestPanelTableForCumulative(t *testing.T) {
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/29 as value FROM (SELECT le, ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WHERE isNaN(value) = 0) GROUP BY le,ts HAVING isNaN(value) = 0 ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
|
||||
expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, le ORDER BY fingerprint, le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "latency p99 with group by",
|
||||
@ -80,7 +80,7 @@ func TestPanelTableForCumulative(t *testing.T) {
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/29 as value FROM (SELECT service_name,le, ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WHERE isNaN(value) = 0) GROUP BY service_name,le,ts HAVING isNaN(value) = 0 ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT service_name,le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, service_name,le ORDER BY fingerprint, service_name ASC,le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
},
|
||||
}
|
||||
|
||||
@ -88,11 +88,11 @@ func TestPanelTableForCumulative(t *testing.T) {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
query, err := buildMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query, "distributed_time_series_v2")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
t.Fatalf("unexpected error: %v\n", err)
|
||||
}
|
||||
|
||||
if query != c.expected {
|
||||
t.Fatalf("expected: %s, got: %s", c.expected, query)
|
||||
t.Fatalf("expected: %s, got: %s\n", c.expected, query)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{
|
||||
}
|
||||
|
||||
// See https://github.com/SigNoz/signoz/issues/2151#issuecomment-1467249056
|
||||
var rateWithoutNegative = `if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) `
|
||||
var rateWithoutNegative = `If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) `
|
||||
|
||||
// buildMetricsTimeSeriesFilterQuery builds the sub-query to be used for filtering
|
||||
// timeseries based on search criteria
|
||||
@ -219,35 +219,46 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str
|
||||
groupBy = "fingerprint, ts"
|
||||
orderBy = "fingerprint, "
|
||||
groupTags = "fingerprint,"
|
||||
partitionBy := "fingerprint"
|
||||
op := "max(value)" // max value should be the closest value for point in time
|
||||
subQuery := fmt.Sprintf(
|
||||
queryTmpl, "any(labels) as labels, "+groupTags, step, op, filterSubQuery, groupBy, orderBy,
|
||||
) // labels will be same so any should be fine
|
||||
query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WHERE isNaN(value) = 0`
|
||||
query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WINDOW rate_window as (PARTITION BY %s ORDER BY %s ts) `
|
||||
|
||||
query = fmt.Sprintf(query, "labels as fullLabels,", subQuery)
|
||||
query = fmt.Sprintf(query, "labels as fullLabels,", subQuery, partitionBy, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorSumRate, v3.AggregateOperatorAvgRate, v3.AggregateOperatorMaxRate, v3.AggregateOperatorMinRate:
|
||||
rateGroupBy := "fingerprint, " + groupBy
|
||||
rateGroupTags := "fingerprint, " + groupTags
|
||||
rateOrderBy := "fingerprint, " + orderBy
|
||||
partitionBy := "fingerprint"
|
||||
if len(groupTags) != 0 {
|
||||
partitionBy += ", " + groupTags
|
||||
partitionBy = strings.Trim(partitionBy, ", ")
|
||||
}
|
||||
op := "max(value)"
|
||||
subQuery := fmt.Sprintf(
|
||||
queryTmpl, rateGroupTags, step, op, filterSubQuery, rateGroupBy, rateOrderBy,
|
||||
) // labels will be same so any should be fine
|
||||
query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0`
|
||||
query = fmt.Sprintf(query, groupTags, subQuery)
|
||||
query = fmt.Sprintf(`SELECT %s ts, %s(value) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, aggregateOperatorToSQLFunc[mq.AggregateOperator], query, groupSets, orderBy)
|
||||
query := `SELECT %s ts, ` + rateWithoutNegative + ` as rate_value FROM(%s) WINDOW rate_window as (PARTITION BY %s ORDER BY %s ts) `
|
||||
query = fmt.Sprintf(query, groupTags, subQuery, partitionBy, rateOrderBy)
|
||||
query = fmt.Sprintf(`SELECT %s ts, %s(rate_value) as value FROM (%s) WHERE isNaN(rate_value) = 0 GROUP BY %s ORDER BY %s ts`, groupTags, aggregateOperatorToSQLFunc[mq.AggregateOperator], query, groupSets, orderBy)
|
||||
return query, nil
|
||||
case
|
||||
v3.AggregateOperatorRateSum,
|
||||
v3.AggregateOperatorRateMax,
|
||||
v3.AggregateOperatorRateAvg,
|
||||
v3.AggregateOperatorRateMin:
|
||||
partitionBy := ""
|
||||
if len(groupTags) != 0 {
|
||||
partitionBy = "PARTITION BY " + groupTags
|
||||
partitionBy = strings.Trim(partitionBy, ", ")
|
||||
}
|
||||
op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator])
|
||||
subQuery := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupSets, orderBy)
|
||||
query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0`
|
||||
query = fmt.Sprintf(query, groupTags, subQuery)
|
||||
query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WINDOW rate_window as (%s ORDER BY %s ts) `
|
||||
query = fmt.Sprintf(query, groupTags, subQuery, partitionBy, groupTags)
|
||||
return query, nil
|
||||
case
|
||||
v3.AggregateOperatorP05,
|
||||
@ -266,13 +277,18 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str
|
||||
rateGroupBy := "fingerprint, " + groupBy
|
||||
rateGroupTags := "fingerprint, " + groupTags
|
||||
rateOrderBy := "fingerprint, " + orderBy
|
||||
partitionBy := "fingerprint"
|
||||
if len(groupTags) != 0 {
|
||||
partitionBy += ", " + groupTags
|
||||
partitionBy = strings.Trim(partitionBy, ", ")
|
||||
}
|
||||
op := "max(value)"
|
||||
subQuery := fmt.Sprintf(
|
||||
queryTmpl, rateGroupTags, step, op, filterSubQuery, rateGroupBy, rateOrderBy,
|
||||
) // labels will be same so any should be fine
|
||||
query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WHERE isNaN(value) = 0`
|
||||
query = fmt.Sprintf(query, groupTags, subQuery)
|
||||
query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s HAVING isNaN(value) = 0 ORDER BY %s ts`, groupTags, query, groupSets, orderBy)
|
||||
query := `SELECT %s ts, ` + rateWithoutNegative + ` as rate_value FROM(%s) WINDOW rate_window as (PARTITION BY %s ORDER BY %s ts) `
|
||||
query = fmt.Sprintf(query, groupTags, subQuery, partitionBy, rateOrderBy)
|
||||
query = fmt.Sprintf(`SELECT %s ts, sum(rate_value) as value FROM (%s) WHERE isNaN(rate_value) = 0 GROUP BY %s ORDER BY %s ts`, groupTags, query, groupSets, orderBy)
|
||||
value := aggregateOperatorToPercentile[mq.AggregateOperator]
|
||||
|
||||
query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, orderWithoutLe)
|
||||
|
@ -245,7 +245,7 @@ func TestBuildQueryOperators(t *testing.T) {
|
||||
func TestBuildQueryXRate(t *testing.T) {
|
||||
t.Run("TestBuildQueryXRate", func(t *testing.T) {
|
||||
|
||||
tmpl := `SELECT ts, %s(value) as value FROM (SELECT ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY GROUPING SETS ( (ts), () ) ORDER BY ts`
|
||||
tmpl := `SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY GROUPING SETS ( (ts), () ) ORDER BY ts`
|
||||
|
||||
cases := []struct {
|
||||
aggregateOperator v3.AggregateOperator
|
||||
@ -298,7 +298,7 @@ func TestBuildQueryXRate(t *testing.T) {
|
||||
func TestBuildQueryRPM(t *testing.T) {
|
||||
t.Run("TestBuildQueryXRate", func(t *testing.T) {
|
||||
|
||||
tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(value) as value FROM (SELECT ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY GROUPING SETS ( (ts), () ) ORDER BY ts)`
|
||||
tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY GROUPING SETS ( (ts), () ) ORDER BY ts)`
|
||||
|
||||
cases := []struct {
|
||||
aggregateOperator v3.AggregateOperator
|
||||
|
@ -55,7 +55,7 @@ func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) {
|
||||
|
||||
require.Contains(t, queries["C"], "SELECT A.ts as ts, A.value / B.value")
|
||||
require.Contains(t, queries["C"], "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'in') IN ['a','b','c']")
|
||||
require.Contains(t, queries["C"], "runningDifference(value) / runningDifference(ts)")
|
||||
require.Contains(t, queries["C"], "(value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window)))")
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user