From 29b134455747099fa63292ef1fa0b7f05e13e231 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Wed, 13 Dec 2023 16:40:17 +0530 Subject: [PATCH] chore: add prepare query for cumulative/unspecified timeseries (#4166) --- .../app/metrics/v4/cumulative/helper.go | 57 +++++ .../app/metrics/v4/cumulative/timeseries.go | 220 +++++++++++++++++ .../metrics/v4/cumulative/timeseries_test.go | 229 ++++++++++++++++++ 3 files changed, 506 insertions(+) create mode 100644 pkg/query-service/app/metrics/v4/cumulative/helper.go create mode 100644 pkg/query-service/app/metrics/v4/cumulative/timeseries.go create mode 100644 pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go diff --git a/pkg/query-service/app/metrics/v4/cumulative/helper.go b/pkg/query-service/app/metrics/v4/cumulative/helper.go new file mode 100644 index 0000000000..5914ee495d --- /dev/null +++ b/pkg/query-service/app/metrics/v4/cumulative/helper.go @@ -0,0 +1,57 @@ +package cumulative + +import ( + "fmt" + "strings" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +// groupingSets returns a string of comma separated tags for group by clause +// `ts` is always added to the group by clause +func groupingSets(tags ...string) string { + withTs := append(tags, "ts") + return fmt.Sprintf(`GROUPING SETS ( (%s), (%s) )`, strings.Join(withTs, ", "), strings.Join(tags, ", ")) +} + +// groupingSetsByAttributeKeyTags returns a string of comma separated tags for group by clause +func groupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string { + groupTags := []string{} + for _, tag := range tags { + groupTags = append(groupTags, tag.Key) + } + return groupingSets(groupTags...) +} + +// groupBy returns a string of comma separated tags for group by clause +func groupByAttributeKeyTags(tags ...v3.AttributeKey) string { + groupTags := []string{} + for _, tag := range tags { + groupTags = append(groupTags, tag.Key) + } + groupTags = append(groupTags, "ts") + return strings.Join(groupTags, ", ") +} + +// orderBy returns a string of comma separated tags for order by clause +// if the order is not specified, it defaults to ASC +func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string { + var orderBy []string + for _, tag := range tags { + found := false + for _, item := range items { + if item.ColumnName == tag.Key { + found = true + orderBy = append(orderBy, fmt.Sprintf("%s %s", item.ColumnName, item.Order)) + break + } + } + if !found { + orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag.Key)) + } + } + + orderBy = append(orderBy, "ts ASC") + + return strings.Join(orderBy, ", ") +} diff --git a/pkg/query-service/app/metrics/v4/cumulative/timeseries.go b/pkg/query-service/app/metrics/v4/cumulative/timeseries.go new file mode 100644 index 0000000000..78d22be4aa --- /dev/null +++ b/pkg/query-service/app/metrics/v4/cumulative/timeseries.go @@ -0,0 +1,220 @@ +package cumulative + +import ( + "fmt" + + v4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +// See https://clickhouse.com/docs/en/sql-reference/window-functions for more details on `lagInFrame` function +// +// Calculating the rate of change of a metric is a common use case. +// Requests and errors are two examples of metrics that are often expressed as a rate of change. +// The rate of change is the difference between the current value and the previous value divided by +// the time difference between the current and previous values (i.e. the time interval). +// +// The value of a cumulative counter always increases. However, the rate of change can be negative +// if the value decreases between two samples. This can happen if the counter is reset when the +// application restarts or if the counter is reset manually. In this case, the rate of change is +// not meaningful and should be ignored. +// +// The condition `(per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0` +// checks if the rate of change is negative. If it is negative, the value is replaced with `nan`. +// +// The condition `ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400` checks +// if the time difference between the current and previous values is greater than or equal to 1 day. +// The first sample of a metric is always `nan` because there is no previous value to compare it to. +// When the first sample is encountered, the previous value for the time is set to default i.e `1970-01-01`. +// Since any difference between the first sample timestamp and the previous value timestamp will be +// greater than or equal to 1 day, the rate of change for the first sample will be `nan`. +// +// If neither of the above conditions are true, the rate of change is calculated as +// `(per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window)` +// where `rate_window` is a window function that partitions the data by fingerprint and orders it by timestamp. +// We want to calculate the rate of change for each time series, so we partition the data by fingerprint. +// +// The `increase` function is similar to the `rate` function, except that it does not divide by the time interval. +const ( + rateWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window)))` + increaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window)))` +) + +// prepareTimeAggregationSubQueryTimeSeries prepares the sub-query to be used for temporal aggregation +// of time series data + +// The following example illustrates how the sub-query is used to calculate the sume of values for each +// time series in a 15 seconds interval: + +// ``` +// timestamp 01.00 01.05 01.10 01.15 01.20 01.25 01.30 01.35 01.40 +// +------+------+------+------+------+------+------+------+------+ +// | | | | | | | | | | +// | v1 | v2 | v3 | v4 | v5 | v6 | v7 | v8 | v9 | +// | | | | | | | | | | +// +------+------+------+------+------+------+------+------+------+ +// | | | | | | | | | +// | | | | | | | | | +// | | | +// +------+ +------+ +------+ +// | v1+ | | v4+ | | v7+ | +// | v2+ | | v5+ | | v8+ | +// | v3 | | v6 | | v9 | +// +------+ +------+ +------+ +// 01.00 01.15 01.30 +// ``` + +// Calculating the rate/increase involves an additional step. We first calculate the maximum value for each time series +// in a 15 seconds interval. Then, we calculate the difference between the current maximum value and the previous +// maximum value + +// The following example illustrates how the sub-query is used to calculate the rate of change for each time series +// in a 15 seconds interval: + +// ``` +// timestamp 01.00 01.05 01.10 01.15 01.20 01.25 01.30 01.35 01.40 +// +------+------+------+------+------+------+------+------+------+ +// | | | | | | | | | | +// | v1 | v2 | v3 | v4 | v5 | v6 | v7 | v8 | v9 | +// | | | | | | | | | | +// +------+------+------+------+------+------+------+------+------+ +// | | | | | | | | | +// | | | | | | | | | +// | | | +// +------+ +------+ +------+ +// max(| v1, | max(| v4, | max(| v7, | +// | v2, | | v5, | | v8, | +// | v3 |) | v6 |) | v9 |) +// +------+ +------+ +------+ +// 01.00 01.15 01.30 + +// +-------+ +--------+ +// | V6-V2 | | V9-V6 | +// | | | | +// | | | | +// +------+ +--------+ +// 01.00 01.15 +// ``` + +// The rate of change is calculated as (Vy - Vx) / (Ty - Tx) where Vx and Vy are the values at time Tx and Ty respectively. +// In an ideal scenario, the last value of each interval could be used to calculate the rate of change. Instead, we use +// the maximum value of each interval to calculate the rate of change. This is because any process restart can cause the +// value to be reset to 0. This will produce an inaccurate result. The max is the best approximation we can get. +// We don't expect the process to restart very often, so this should be a good approximation. + +func prepareTimeAggregationSubQueryTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) { + var subQuery string + + timeSeriesSubQuery, err := v4.PrepareTimeseriesFilterQuery(mq) + if err != nil { + return "", err + } + + samplesTableFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + + // Select the aggregate value for interval + queryTmpl := + "SELECT fingerprint, %s" + + " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " %s as per_series_value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableFilter + + " GROUP BY fingerprint, ts" + + " ORDER BY fingerprint, ts" + + var selectLabelsAny string + for _, tag := range mq.GroupBy { + selectLabelsAny += fmt.Sprintf("any(%s) as %s,", tag.Key, tag.Key) + } + + var selectLabels string + for _, tag := range mq.GroupBy { + selectLabels += tag.Key + "," + } + + switch mq.TimeAggregation { + case v3.TimeAggregationAvg: + op := "avg(value)" + subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) + case v3.TimeAggregationSum: + op := "sum(value)" + subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) + case v3.TimeAggregationMin: + op := "min(value)" + subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) + case v3.TimeAggregationMax: + op := "max(value)" + subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) + case v3.TimeAggregationCount: + op := "count(value)" + subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) + case v3.TimeAggregationCountDistinct: + op := "count(distinct(value))" + subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) + case v3.TimeAggregationAnyLast: + op := "anyLast(value)" + subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) + case v3.TimeAggregationRate: + op := "max(value)" + innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) + rateQueryTmpl := + "SELECT %s ts, " + rateWithoutNegative + + " as per_series_value FROM (%s) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)" + subQuery = fmt.Sprintf(rateQueryTmpl, selectLabels, innerSubQuery) + case v3.TimeAggregationIncrease: + op := "max(value)" + innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) + rateQueryTmpl := + "SELECT %s ts, " + increaseWithoutNegative + + " as per_series_value FROM (%s) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)" + subQuery = fmt.Sprintf(rateQueryTmpl, selectLabels, innerSubQuery) + } + return subQuery, nil +} + +// prepareMetricQueryCumulativeTimeSeries prepares the query to be used for fetching metrics +func prepareMetricQueryCumulativeTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) { + var query string + + temporalAggSubQuery, err := prepareTimeAggregationSubQueryTimeSeries(start, end, step, mq) + if err != nil { + return "", err + } + + groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...) + orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) + selectLabels := groupByAttributeKeyTags(mq.GroupBy...) + + queryTmpl := + "SELECT %s," + + " %s as value" + + " FROM (%s)" + + " WHERE isNaN(per_series_value) = 0" + + " GROUP BY %s" + + " ORDER BY %s" + + switch mq.SpaceAggregation { + case v3.SpaceAggregationAvg: + op := "avg(per_series_value)" + query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy) + case v3.SpaceAggregationSum: + op := "sum(per_series_value)" + query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy) + case v3.SpaceAggregationMin: + op := "min(per_series_value)" + query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy) + case v3.SpaceAggregationMax: + op := "max(per_series_value)" + query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy) + case v3.SpaceAggregationCount: + op := "count(per_series_value)" + query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy) + } + + return query, nil +} diff --git a/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go b/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go new file mode 100644 index 0000000000..70f2e1d3ef --- /dev/null +++ b/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go @@ -0,0 +1,229 @@ +package cumulative + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestPrepareTimeAggregationSubQuery(t *testing.T) { + // The time aggregation is performed for each unique series - since the fingerprint represents the + // unique hash of label set, we always group by fingerprint regardless of the GroupBy + // This sub result is then aggregated on dimensions using the provided GroupBy clause keys + testCases := []struct { + name string + builderQuery *v3.BuilderQuery + start int64 + end int64 + expectedQueryContains string + }{ + { + name: "test time aggregation = avg, temporality = cumulative", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "http_requests", + DataType: v3.AttributeKeyDataTypeFloat64, + Type: v3.AttributeKeyTypeUnspecified, + IsColumn: true, + IsJSON: false, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorNotEqual, + Value: "payment_service", + }, + { + Key: v3.AttributeKey{ + Key: "endpoint", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"/paycallback", "/payme", "/paypal"}, + }, + }, + }, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }}, + Expression: "A", + Disabled: false, + TimeAggregation: v3.TimeAggregationAvg, + }, + start: 1701794980000, + end: 1701796780000, + expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts", + }, + { + name: "test time aggregation = rate, temporality = cumulative", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "http_requests", + DataType: v3.AttributeKeyDataTypeFloat64, + Type: v3.AttributeKeyTypeUnspecified, + IsColumn: true, + IsJSON: false, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "payment_service", + }, + }, + }, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }}, + Expression: "A", + Disabled: false, + TimeAggregation: v3.TimeAggregationRate, + }, + start: 1701794980000, + end: 1701796780000, + expectedQueryContains: "SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + query, err := prepareTimeAggregationSubQueryTimeSeries( + testCase.start, + testCase.end, + testCase.builderQuery.StepInterval, + testCase.builderQuery, + ) + assert.Nil(t, err) + assert.Contains(t, query, testCase.expectedQueryContains) + }) + } +} +func TestPrepareTimeseriesQuery(t *testing.T) { + testCases := []struct { + name string + builderQuery *v3.BuilderQuery + start int64 + end int64 + expectedQueryContains string + }{ + { + name: "test time aggregation = avg, space aggregation = sum, temporality = unspecified", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_memory_usage", + DataType: v3.AttributeKeyDataTypeFloat64, + Type: v3.AttributeKeyTypeUnspecified, + IsColumn: true, + IsJSON: false, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "state", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorNotEqual, + Value: "idle", + }, + }, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "A", + Disabled: false, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + }, + start: 1701794980000, + end: 1701796780000, + expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (ts), () ) ORDER BY ts ASC", + }, + { + name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "http_requests", + DataType: v3.AttributeKeyDataTypeFloat64, + Type: v3.AttributeKeyTypeUnspecified, + IsColumn: true, + IsJSON: false, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "payment_service", + }, + }, + }, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }}, + Expression: "A", + Disabled: false, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + }, + start: 1701794980000, + end: 1701796780000, + expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + query, err := prepareMetricQueryCumulativeTimeSeries( + testCase.start, + testCase.end, + testCase.builderQuery.StepInterval, + testCase.builderQuery, + ) + assert.Nil(t, err) + assert.Contains(t, query, testCase.expectedQueryContains) + }) + } +}