mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-16 07:55:53 +08:00
chore: add prepare query for cumulative/unspecified timeseries (#4166)
This commit is contained in:
parent
55664872bd
commit
29b1344557
57
pkg/query-service/app/metrics/v4/cumulative/helper.go
Normal file
57
pkg/query-service/app/metrics/v4/cumulative/helper.go
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
package cumulative
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
// groupingSets returns a string of comma separated tags for group by clause
|
||||||
|
// `ts` is always added to the group by clause
|
||||||
|
func groupingSets(tags ...string) string {
|
||||||
|
withTs := append(tags, "ts")
|
||||||
|
return fmt.Sprintf(`GROUPING SETS ( (%s), (%s) )`, strings.Join(withTs, ", "), strings.Join(tags, ", "))
|
||||||
|
}
|
||||||
|
|
||||||
|
// groupingSetsByAttributeKeyTags returns a string of comma separated tags for group by clause
|
||||||
|
func groupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string {
|
||||||
|
groupTags := []string{}
|
||||||
|
for _, tag := range tags {
|
||||||
|
groupTags = append(groupTags, tag.Key)
|
||||||
|
}
|
||||||
|
return groupingSets(groupTags...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// groupBy returns a string of comma separated tags for group by clause
|
||||||
|
func groupByAttributeKeyTags(tags ...v3.AttributeKey) string {
|
||||||
|
groupTags := []string{}
|
||||||
|
for _, tag := range tags {
|
||||||
|
groupTags = append(groupTags, tag.Key)
|
||||||
|
}
|
||||||
|
groupTags = append(groupTags, "ts")
|
||||||
|
return strings.Join(groupTags, ", ")
|
||||||
|
}
|
||||||
|
|
||||||
|
// orderBy returns a string of comma separated tags for order by clause
|
||||||
|
// if the order is not specified, it defaults to ASC
|
||||||
|
func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string {
|
||||||
|
var orderBy []string
|
||||||
|
for _, tag := range tags {
|
||||||
|
found := false
|
||||||
|
for _, item := range items {
|
||||||
|
if item.ColumnName == tag.Key {
|
||||||
|
found = true
|
||||||
|
orderBy = append(orderBy, fmt.Sprintf("%s %s", item.ColumnName, item.Order))
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag.Key))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
orderBy = append(orderBy, "ts ASC")
|
||||||
|
|
||||||
|
return strings.Join(orderBy, ", ")
|
||||||
|
}
|
220
pkg/query-service/app/metrics/v4/cumulative/timeseries.go
Normal file
220
pkg/query-service/app/metrics/v4/cumulative/timeseries.go
Normal file
@ -0,0 +1,220 @@
|
|||||||
|
package cumulative
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
v4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
// See https://clickhouse.com/docs/en/sql-reference/window-functions for more details on `lagInFrame` function
|
||||||
|
//
|
||||||
|
// Calculating the rate of change of a metric is a common use case.
|
||||||
|
// Requests and errors are two examples of metrics that are often expressed as a rate of change.
|
||||||
|
// The rate of change is the difference between the current value and the previous value divided by
|
||||||
|
// the time difference between the current and previous values (i.e. the time interval).
|
||||||
|
//
|
||||||
|
// The value of a cumulative counter always increases. However, the rate of change can be negative
|
||||||
|
// if the value decreases between two samples. This can happen if the counter is reset when the
|
||||||
|
// application restarts or if the counter is reset manually. In this case, the rate of change is
|
||||||
|
// not meaningful and should be ignored.
|
||||||
|
//
|
||||||
|
// The condition `(per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0`
|
||||||
|
// checks if the rate of change is negative. If it is negative, the value is replaced with `nan`.
|
||||||
|
//
|
||||||
|
// The condition `ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400` checks
|
||||||
|
// if the time difference between the current and previous values is greater than or equal to 1 day.
|
||||||
|
// The first sample of a metric is always `nan` because there is no previous value to compare it to.
|
||||||
|
// When the first sample is encountered, the previous value for the time is set to default i.e `1970-01-01`.
|
||||||
|
// Since any difference between the first sample timestamp and the previous value timestamp will be
|
||||||
|
// greater than or equal to 1 day, the rate of change for the first sample will be `nan`.
|
||||||
|
//
|
||||||
|
// If neither of the above conditions are true, the rate of change is calculated as
|
||||||
|
// `(per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window)`
|
||||||
|
// where `rate_window` is a window function that partitions the data by fingerprint and orders it by timestamp.
|
||||||
|
// We want to calculate the rate of change for each time series, so we partition the data by fingerprint.
|
||||||
|
//
|
||||||
|
// The `increase` function is similar to the `rate` function, except that it does not divide by the time interval.
|
||||||
|
const (
|
||||||
|
rateWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window)))`
|
||||||
|
increaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window)))`
|
||||||
|
)
|
||||||
|
|
||||||
|
// prepareTimeAggregationSubQueryTimeSeries prepares the sub-query to be used for temporal aggregation
|
||||||
|
// of time series data
|
||||||
|
|
||||||
|
// The following example illustrates how the sub-query is used to calculate the sume of values for each
|
||||||
|
// time series in a 15 seconds interval:
|
||||||
|
|
||||||
|
// ```
|
||||||
|
// timestamp 01.00 01.05 01.10 01.15 01.20 01.25 01.30 01.35 01.40
|
||||||
|
// +------+------+------+------+------+------+------+------+------+
|
||||||
|
// | | | | | | | | | |
|
||||||
|
// | v1 | v2 | v3 | v4 | v5 | v6 | v7 | v8 | v9 |
|
||||||
|
// | | | | | | | | | |
|
||||||
|
// +------+------+------+------+------+------+------+------+------+
|
||||||
|
// | | | | | | | | |
|
||||||
|
// | | | | | | | | |
|
||||||
|
// | | |
|
||||||
|
// +------+ +------+ +------+
|
||||||
|
// | v1+ | | v4+ | | v7+ |
|
||||||
|
// | v2+ | | v5+ | | v8+ |
|
||||||
|
// | v3 | | v6 | | v9 |
|
||||||
|
// +------+ +------+ +------+
|
||||||
|
// 01.00 01.15 01.30
|
||||||
|
// ```
|
||||||
|
|
||||||
|
// Calculating the rate/increase involves an additional step. We first calculate the maximum value for each time series
|
||||||
|
// in a 15 seconds interval. Then, we calculate the difference between the current maximum value and the previous
|
||||||
|
// maximum value
|
||||||
|
|
||||||
|
// The following example illustrates how the sub-query is used to calculate the rate of change for each time series
|
||||||
|
// in a 15 seconds interval:
|
||||||
|
|
||||||
|
// ```
|
||||||
|
// timestamp 01.00 01.05 01.10 01.15 01.20 01.25 01.30 01.35 01.40
|
||||||
|
// +------+------+------+------+------+------+------+------+------+
|
||||||
|
// | | | | | | | | | |
|
||||||
|
// | v1 | v2 | v3 | v4 | v5 | v6 | v7 | v8 | v9 |
|
||||||
|
// | | | | | | | | | |
|
||||||
|
// +------+------+------+------+------+------+------+------+------+
|
||||||
|
// | | | | | | | | |
|
||||||
|
// | | | | | | | | |
|
||||||
|
// | | |
|
||||||
|
// +------+ +------+ +------+
|
||||||
|
// max(| v1, | max(| v4, | max(| v7, |
|
||||||
|
// | v2, | | v5, | | v8, |
|
||||||
|
// | v3 |) | v6 |) | v9 |)
|
||||||
|
// +------+ +------+ +------+
|
||||||
|
// 01.00 01.15 01.30
|
||||||
|
|
||||||
|
// +-------+ +--------+
|
||||||
|
// | V6-V2 | | V9-V6 |
|
||||||
|
// | | | |
|
||||||
|
// | | | |
|
||||||
|
// +------+ +--------+
|
||||||
|
// 01.00 01.15
|
||||||
|
// ```
|
||||||
|
|
||||||
|
// The rate of change is calculated as (Vy - Vx) / (Ty - Tx) where Vx and Vy are the values at time Tx and Ty respectively.
|
||||||
|
// In an ideal scenario, the last value of each interval could be used to calculate the rate of change. Instead, we use
|
||||||
|
// the maximum value of each interval to calculate the rate of change. This is because any process restart can cause the
|
||||||
|
// value to be reset to 0. This will produce an inaccurate result. The max is the best approximation we can get.
|
||||||
|
// We don't expect the process to restart very often, so this should be a good approximation.
|
||||||
|
|
||||||
|
func prepareTimeAggregationSubQueryTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
||||||
|
var subQuery string
|
||||||
|
|
||||||
|
timeSeriesSubQuery, err := v4.PrepareTimeseriesFilterQuery(mq)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
samplesTableFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
|
||||||
|
|
||||||
|
// Select the aggregate value for interval
|
||||||
|
queryTmpl :=
|
||||||
|
"SELECT fingerprint, %s" +
|
||||||
|
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
|
||||||
|
" %s as per_series_value" +
|
||||||
|
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||||
|
" INNER JOIN" +
|
||||||
|
" (%s) as filtered_time_series" +
|
||||||
|
" USING fingerprint" +
|
||||||
|
" WHERE " + samplesTableFilter +
|
||||||
|
" GROUP BY fingerprint, ts" +
|
||||||
|
" ORDER BY fingerprint, ts"
|
||||||
|
|
||||||
|
var selectLabelsAny string
|
||||||
|
for _, tag := range mq.GroupBy {
|
||||||
|
selectLabelsAny += fmt.Sprintf("any(%s) as %s,", tag.Key, tag.Key)
|
||||||
|
}
|
||||||
|
|
||||||
|
var selectLabels string
|
||||||
|
for _, tag := range mq.GroupBy {
|
||||||
|
selectLabels += tag.Key + ","
|
||||||
|
}
|
||||||
|
|
||||||
|
switch mq.TimeAggregation {
|
||||||
|
case v3.TimeAggregationAvg:
|
||||||
|
op := "avg(value)"
|
||||||
|
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
|
||||||
|
case v3.TimeAggregationSum:
|
||||||
|
op := "sum(value)"
|
||||||
|
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
|
||||||
|
case v3.TimeAggregationMin:
|
||||||
|
op := "min(value)"
|
||||||
|
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
|
||||||
|
case v3.TimeAggregationMax:
|
||||||
|
op := "max(value)"
|
||||||
|
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
|
||||||
|
case v3.TimeAggregationCount:
|
||||||
|
op := "count(value)"
|
||||||
|
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
|
||||||
|
case v3.TimeAggregationCountDistinct:
|
||||||
|
op := "count(distinct(value))"
|
||||||
|
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
|
||||||
|
case v3.TimeAggregationAnyLast:
|
||||||
|
op := "anyLast(value)"
|
||||||
|
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
|
||||||
|
case v3.TimeAggregationRate:
|
||||||
|
op := "max(value)"
|
||||||
|
innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
|
||||||
|
rateQueryTmpl :=
|
||||||
|
"SELECT %s ts, " + rateWithoutNegative +
|
||||||
|
" as per_series_value FROM (%s) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)"
|
||||||
|
subQuery = fmt.Sprintf(rateQueryTmpl, selectLabels, innerSubQuery)
|
||||||
|
case v3.TimeAggregationIncrease:
|
||||||
|
op := "max(value)"
|
||||||
|
innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
|
||||||
|
rateQueryTmpl :=
|
||||||
|
"SELECT %s ts, " + increaseWithoutNegative +
|
||||||
|
" as per_series_value FROM (%s) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)"
|
||||||
|
subQuery = fmt.Sprintf(rateQueryTmpl, selectLabels, innerSubQuery)
|
||||||
|
}
|
||||||
|
return subQuery, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// prepareMetricQueryCumulativeTimeSeries prepares the query to be used for fetching metrics
|
||||||
|
func prepareMetricQueryCumulativeTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
||||||
|
var query string
|
||||||
|
|
||||||
|
temporalAggSubQuery, err := prepareTimeAggregationSubQueryTimeSeries(start, end, step, mq)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...)
|
||||||
|
orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
|
||||||
|
selectLabels := groupByAttributeKeyTags(mq.GroupBy...)
|
||||||
|
|
||||||
|
queryTmpl :=
|
||||||
|
"SELECT %s," +
|
||||||
|
" %s as value" +
|
||||||
|
" FROM (%s)" +
|
||||||
|
" WHERE isNaN(per_series_value) = 0" +
|
||||||
|
" GROUP BY %s" +
|
||||||
|
" ORDER BY %s"
|
||||||
|
|
||||||
|
switch mq.SpaceAggregation {
|
||||||
|
case v3.SpaceAggregationAvg:
|
||||||
|
op := "avg(per_series_value)"
|
||||||
|
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
|
||||||
|
case v3.SpaceAggregationSum:
|
||||||
|
op := "sum(per_series_value)"
|
||||||
|
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
|
||||||
|
case v3.SpaceAggregationMin:
|
||||||
|
op := "min(per_series_value)"
|
||||||
|
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
|
||||||
|
case v3.SpaceAggregationMax:
|
||||||
|
op := "max(per_series_value)"
|
||||||
|
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
|
||||||
|
case v3.SpaceAggregationCount:
|
||||||
|
op := "count(per_series_value)"
|
||||||
|
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
|
||||||
|
}
|
||||||
|
|
||||||
|
return query, nil
|
||||||
|
}
|
229
pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go
Normal file
229
pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go
Normal file
@ -0,0 +1,229 @@
|
|||||||
|
package cumulative
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPrepareTimeAggregationSubQuery(t *testing.T) {
|
||||||
|
// The time aggregation is performed for each unique series - since the fingerprint represents the
|
||||||
|
// unique hash of label set, we always group by fingerprint regardless of the GroupBy
|
||||||
|
// This sub result is then aggregated on dimensions using the provided GroupBy clause keys
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
builderQuery *v3.BuilderQuery
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
expectedQueryContains string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "test time aggregation = avg, temporality = cumulative",
|
||||||
|
builderQuery: &v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "http_requests",
|
||||||
|
DataType: v3.AttributeKeyDataTypeFloat64,
|
||||||
|
Type: v3.AttributeKeyTypeUnspecified,
|
||||||
|
IsColumn: true,
|
||||||
|
IsJSON: false,
|
||||||
|
},
|
||||||
|
Temporality: v3.Cumulative,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "service_name",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
},
|
||||||
|
Operator: v3.FilterOperatorNotEqual,
|
||||||
|
Value: "payment_service",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "endpoint",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
},
|
||||||
|
Operator: v3.FilterOperatorIn,
|
||||||
|
Value: []interface{}{"/paycallback", "/payme", "/paypal"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
GroupBy: []v3.AttributeKey{{
|
||||||
|
Key: "service_name",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
}},
|
||||||
|
Expression: "A",
|
||||||
|
Disabled: false,
|
||||||
|
TimeAggregation: v3.TimeAggregationAvg,
|
||||||
|
},
|
||||||
|
start: 1701794980000,
|
||||||
|
end: 1701796780000,
|
||||||
|
expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "test time aggregation = rate, temporality = cumulative",
|
||||||
|
builderQuery: &v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "http_requests",
|
||||||
|
DataType: v3.AttributeKeyDataTypeFloat64,
|
||||||
|
Type: v3.AttributeKeyTypeUnspecified,
|
||||||
|
IsColumn: true,
|
||||||
|
IsJSON: false,
|
||||||
|
},
|
||||||
|
Temporality: v3.Cumulative,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "service_name",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
},
|
||||||
|
Operator: v3.FilterOperatorContains,
|
||||||
|
Value: "payment_service",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
GroupBy: []v3.AttributeKey{{
|
||||||
|
Key: "service_name",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
}},
|
||||||
|
Expression: "A",
|
||||||
|
Disabled: false,
|
||||||
|
TimeAggregation: v3.TimeAggregationRate,
|
||||||
|
},
|
||||||
|
start: 1701794980000,
|
||||||
|
end: 1701796780000,
|
||||||
|
expectedQueryContains: "SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, testCase := range testCases {
|
||||||
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
|
query, err := prepareTimeAggregationSubQueryTimeSeries(
|
||||||
|
testCase.start,
|
||||||
|
testCase.end,
|
||||||
|
testCase.builderQuery.StepInterval,
|
||||||
|
testCase.builderQuery,
|
||||||
|
)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Contains(t, query, testCase.expectedQueryContains)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func TestPrepareTimeseriesQuery(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
builderQuery *v3.BuilderQuery
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
expectedQueryContains string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "test time aggregation = avg, space aggregation = sum, temporality = unspecified",
|
||||||
|
builderQuery: &v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "system_memory_usage",
|
||||||
|
DataType: v3.AttributeKeyDataTypeFloat64,
|
||||||
|
Type: v3.AttributeKeyTypeUnspecified,
|
||||||
|
IsColumn: true,
|
||||||
|
IsJSON: false,
|
||||||
|
},
|
||||||
|
Temporality: v3.Unspecified,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "state",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
},
|
||||||
|
Operator: v3.FilterOperatorNotEqual,
|
||||||
|
Value: "idle",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
GroupBy: []v3.AttributeKey{},
|
||||||
|
Expression: "A",
|
||||||
|
Disabled: false,
|
||||||
|
TimeAggregation: v3.TimeAggregationAvg,
|
||||||
|
SpaceAggregation: v3.SpaceAggregationSum,
|
||||||
|
},
|
||||||
|
start: 1701794980000,
|
||||||
|
end: 1701796780000,
|
||||||
|
expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (ts), () ) ORDER BY ts ASC",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative",
|
||||||
|
builderQuery: &v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "http_requests",
|
||||||
|
DataType: v3.AttributeKeyDataTypeFloat64,
|
||||||
|
Type: v3.AttributeKeyTypeUnspecified,
|
||||||
|
IsColumn: true,
|
||||||
|
IsJSON: false,
|
||||||
|
},
|
||||||
|
Temporality: v3.Cumulative,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "service_name",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
},
|
||||||
|
Operator: v3.FilterOperatorContains,
|
||||||
|
Value: "payment_service",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
GroupBy: []v3.AttributeKey{{
|
||||||
|
Key: "service_name",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
}},
|
||||||
|
Expression: "A",
|
||||||
|
Disabled: false,
|
||||||
|
TimeAggregation: v3.TimeAggregationRate,
|
||||||
|
SpaceAggregation: v3.SpaceAggregationSum,
|
||||||
|
},
|
||||||
|
start: 1701794980000,
|
||||||
|
end: 1701796780000,
|
||||||
|
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, testCase := range testCases {
|
||||||
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
|
query, err := prepareMetricQueryCumulativeTimeSeries(
|
||||||
|
testCase.start,
|
||||||
|
testCase.end,
|
||||||
|
testCase.builderQuery.StepInterval,
|
||||||
|
testCase.builderQuery,
|
||||||
|
)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Contains(t, query, testCase.expectedQueryContains)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user