From be6bca3717152a54e889457f0d9b742399b6724d Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Mon, 8 Jan 2024 20:04:21 +0530 Subject: [PATCH] chore: add prepare query for cumulative/unspecified table (#4169) --- .../app/metrics/v4/cumulative/table.go | 49 ++++++++ .../app/metrics/v4/cumulative/table_test.go | 112 ++++++++++++++++++ .../app/metrics/v4/cumulative/timeseries.go | 4 +- .../metrics/v4/cumulative/timeseries_test.go | 2 +- 4 files changed, 164 insertions(+), 3 deletions(-) create mode 100644 pkg/query-service/app/metrics/v4/cumulative/table.go create mode 100644 pkg/query-service/app/metrics/v4/cumulative/table_test.go diff --git a/pkg/query-service/app/metrics/v4/cumulative/table.go b/pkg/query-service/app/metrics/v4/cumulative/table.go new file mode 100644 index 0000000000..b81f3e7d8c --- /dev/null +++ b/pkg/query-service/app/metrics/v4/cumulative/table.go @@ -0,0 +1,49 @@ +package cumulative + +import ( + "fmt" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +// prepareMetricQueryTable prepares the query to be used for fetching metrics +func prepareMetricQueryTable(start, end, step int64, mq *v3.BuilderQuery) (string, error) { + var query string + + temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq) + if err != nil { + return "", err + } + + groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...) + orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) + selectLabels := groupByAttributeKeyTags(mq.GroupBy...) + + queryTmpl := + "SELECT %s," + + " %s as value" + + " FROM (%s)" + + " WHERE isNaN(per_series_value) = 0" + + " GROUP BY %s" + + " ORDER BY %s" + + switch mq.SpaceAggregation { + case v3.SpaceAggregationAvg: + op := "avg(per_series_value)" + query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy) + case v3.SpaceAggregationSum: + op := "sum(per_series_value)" + query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy) + case v3.SpaceAggregationMin: + op := "min(per_series_value)" + query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy) + case v3.SpaceAggregationMax: + op := "max(per_series_value)" + query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy) + case v3.SpaceAggregationCount: + op := "count(per_series_value)" + query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy) + } + + return query, nil +} diff --git a/pkg/query-service/app/metrics/v4/cumulative/table_test.go b/pkg/query-service/app/metrics/v4/cumulative/table_test.go new file mode 100644 index 0000000000..45a6e657ea --- /dev/null +++ b/pkg/query-service/app/metrics/v4/cumulative/table_test.go @@ -0,0 +1,112 @@ +package cumulative + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestPrepareTableQuery(t *testing.T) { + testCases := []struct { + name string + builderQuery *v3.BuilderQuery + start int64 + end int64 + expectedQueryContains string + }{ + { + name: "test time aggregation = avg, space aggregation = sum, temporality = unspecified", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_memory_usage", + DataType: v3.AttributeKeyDataTypeFloat64, + Type: v3.AttributeKeyTypeUnspecified, + IsColumn: true, + IsJSON: false, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "state", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorNotEqual, + Value: "idle", + }, + }, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "A", + Disabled: false, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + }, + start: 1701794980000, + end: 1701796780000, + expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", + }, + { + name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "http_requests", + DataType: v3.AttributeKeyDataTypeFloat64, + Type: v3.AttributeKeyTypeUnspecified, + IsColumn: true, + IsJSON: false, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "payment_service", + }, + }, + }, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }}, + Expression: "A", + Disabled: false, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + }, + start: 1701794980000, + end: 1701796780000, + expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + query, err := prepareMetricQueryTable( + testCase.start, + testCase.end, + testCase.builderQuery.StepInterval, + testCase.builderQuery, + ) + assert.Nil(t, err) + assert.Contains(t, query, testCase.expectedQueryContains) + }) + } +} diff --git a/pkg/query-service/app/metrics/v4/cumulative/timeseries.go b/pkg/query-service/app/metrics/v4/cumulative/timeseries.go index 78d22be4aa..6f39a952cb 100644 --- a/pkg/query-service/app/metrics/v4/cumulative/timeseries.go +++ b/pkg/query-service/app/metrics/v4/cumulative/timeseries.go @@ -104,7 +104,7 @@ const ( // value to be reset to 0. This will produce an inaccurate result. The max is the best approximation we can get. // We don't expect the process to restart very often, so this should be a good approximation. -func prepareTimeAggregationSubQueryTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) { +func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) { var subQuery string timeSeriesSubQuery, err := v4.PrepareTimeseriesFilterQuery(mq) @@ -181,7 +181,7 @@ func prepareTimeAggregationSubQueryTimeSeries(start, end, step int64, mq *v3.Bui func prepareMetricQueryCumulativeTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) { var query string - temporalAggSubQuery, err := prepareTimeAggregationSubQueryTimeSeries(start, end, step, mq) + temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq) if err != nil { return "", err } diff --git a/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go b/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go index 671af6ac69..6b1d1e43b9 100644 --- a/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go +++ b/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go @@ -113,7 +113,7 @@ func TestPrepareTimeAggregationSubQuery(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - query, err := prepareTimeAggregationSubQueryTimeSeries( + query, err := prepareTimeAggregationSubQuery( testCase.start, testCase.end, testCase.builderQuery.StepInterval,