From abaf6126e5e9feb71fccad8b5b4685ee5885defd Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 16 Jan 2024 16:56:20 +0530 Subject: [PATCH] chore: add /v4/query_range endpoint (#4361) --- ee/query-service/app/server.go | 1 + pkg/query-service/app/http_handler.go | 83 +++- .../app/metrics/v4/cumulative/table.go | 11 +- .../app/metrics/v4/cumulative/table_test.go | 2 +- .../app/metrics/v4/cumulative/timeseries.go | 25 +- .../metrics/v4/cumulative/timeseries_test.go | 2 +- .../app/metrics/v4/delta/helper.go | 61 --- .../app/metrics/v4/delta/table.go | 16 +- .../app/metrics/v4/delta/table_test.go | 4 +- .../app/metrics/v4/delta/time_series_test.go | 4 +- .../app/metrics/v4/delta/timeseries.go | 110 ++++- .../helper.go => helpers/clauses.go} | 30 +- .../app/metrics/v4/helpers/sub_query.go | 86 ++++ .../app/metrics/v4/query_builder.go | 128 +++--- .../app/metrics/v4/query_builder_test.go | 382 +++++++++++++++++- pkg/query-service/app/server.go | 1 + pkg/query-service/common/metrics.go | 19 + pkg/query-service/model/v3/v3.go | 22 +- 18 files changed, 785 insertions(+), 202 deletions(-) delete mode 100644 pkg/query-service/app/metrics/v4/delta/helper.go rename pkg/query-service/app/metrics/v4/{cumulative/helper.go => helpers/clauses.go} (58%) create mode 100644 pkg/query-service/app/metrics/v4/helpers/sub_query.go create mode 100644 pkg/query-service/common/metrics.go diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 699894e691..a5c7c1db22 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -331,6 +331,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler) (*http.Server, e apiHandler.RegisterMetricsRoutes(r, am) apiHandler.RegisterLogsRoutes(r, am) apiHandler.RegisterQueryRangeV3Routes(r, am) + apiHandler.RegisterQueryRangeV4Routes(r, am) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 1d01267860..173fe39ef4 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -29,6 +29,7 @@ import ( metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" "go.signoz.io/signoz/pkg/query-service/app/parser" "go.signoz.io/signoz/pkg/query-service/app/querier" + querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/auth" @@ -78,6 +79,7 @@ type APIHandler struct { featureFlags interfaces.FeatureLookup ready func(http.HandlerFunc) http.HandlerFunc querier interfaces.Querier + querierV2 interfaces.Querier queryBuilder *queryBuilder.QueryBuilder preferDelta bool preferSpanMetrics bool @@ -142,7 +144,16 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { FeatureLookup: opts.FeatureFlags, } + querierOptsV2 := querierV2.QuerierOptions{ + Reader: opts.Reader, + Cache: opts.Cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FluxInterval: opts.FluxInterval, + FeatureLookup: opts.FeatureFlags, + } + querier := querier.NewQuerier(querierOpts) + querierv2 := querierV2.NewQuerier(querierOptsV2) aH := &APIHandler{ reader: opts.Reader, @@ -158,6 +169,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { featureFlags: opts.FeatureFlags, LogsParsingPipelineController: opts.LogsParsingPipelineController, querier: querier, + querierV2: querierv2, } builderOpts := queryBuilder.QueryBuilderOptions{ @@ -320,6 +332,11 @@ func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMid subRouter.HandleFunc("/logs/livetail", am.ViewAccess(aH.liveTailLogs)).Methods(http.MethodGet) } +func (aH *APIHandler) RegisterQueryRangeV4Routes(router *mux.Router, am *AuthMiddleware) { + subRouter := router.PathPrefix("/api/v4").Subrouter() + subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeV4)).Methods(http.MethodPost) +} + func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) { writeHttpResponse(w, data) } @@ -542,7 +559,7 @@ func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParam if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { for name := range qp.CompositeQuery.BuilderQueries { query := qp.CompositeQuery.BuilderQueries[name] - if query.DataSource == v3.DataSourceMetrics { + if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" { if aH.preferDelta && metricNameToTemporality[query.AggregateAttribute.Key][v3.Delta] { query.Temporality = v3.Delta } else if metricNameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] { @@ -3241,3 +3258,67 @@ func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) { } } } + +func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) { + + var result []*v3.Result + var err error + var errQuriesByName map[string]string + var spanKeys map[string]v3.AttributeKey + if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder { + // check if any enrichment is required for logs if yes then enrich them + if logsv3.EnrichmentRequired(queryRangeParams) { + // get the fields if any logs query is present + var fields map[string]v3.AttributeKey + fields, err = aH.getLogFieldsV3(ctx, queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + logsv3.Enrich(queryRangeParams, fields) + } + + spanKeys, err = aH.getSpanKeysV3(ctx, queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + } + + result, err, errQuriesByName = aH.querierV2.QueryRange(ctx, queryRangeParams, spanKeys) + + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + + resp := v3.QueryRangeResponse{ + Result: result, + } + + aH.Respond(w, resp) +} + +func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) { + queryRangeParams, apiErrorObj := ParseQueryRangeParams(r) + + if apiErrorObj != nil { + zap.S().Errorf(apiErrorObj.Err.Error()) + RespondError(w, apiErrorObj, nil) + return + } + + // add temporality for each metric + + temporalityErr := aH.addTemporality(r.Context(), queryRangeParams) + if temporalityErr != nil { + zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr) + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil) + return + } + + aH.queryRangeV4(r.Context(), queryRangeParams, w, r) +} diff --git a/pkg/query-service/app/metrics/v4/cumulative/table.go b/pkg/query-service/app/metrics/v4/cumulative/table.go index b81f3e7d8c..3e021a5811 100644 --- a/pkg/query-service/app/metrics/v4/cumulative/table.go +++ b/pkg/query-service/app/metrics/v4/cumulative/table.go @@ -3,11 +3,12 @@ package cumulative import ( "fmt" + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) -// prepareMetricQueryTable prepares the query to be used for fetching metrics -func prepareMetricQueryTable(start, end, step int64, mq *v3.BuilderQuery) (string, error) { +// PrepareMetricQueryCumulativeTable prepares the query to be used for fetching metrics +func PrepareMetricQueryCumulativeTable(start, end, step int64, mq *v3.BuilderQuery) (string, error) { var query string temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq) @@ -15,9 +16,9 @@ func prepareMetricQueryTable(start, end, step int64, mq *v3.BuilderQuery) (strin return "", err } - groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...) - orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) - selectLabels := groupByAttributeKeyTags(mq.GroupBy...) + groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...) + orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) + selectLabels := helpers.GroupByAttributeKeyTags(mq.GroupBy...) queryTmpl := "SELECT %s," + diff --git a/pkg/query-service/app/metrics/v4/cumulative/table_test.go b/pkg/query-service/app/metrics/v4/cumulative/table_test.go index 45a6e657ea..d562b5d93a 100644 --- a/pkg/query-service/app/metrics/v4/cumulative/table_test.go +++ b/pkg/query-service/app/metrics/v4/cumulative/table_test.go @@ -99,7 +99,7 @@ func TestPrepareTableQuery(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - query, err := prepareMetricQueryTable( + query, err := PrepareMetricQueryCumulativeTable( testCase.start, testCase.end, testCase.builderQuery.StepInterval, diff --git a/pkg/query-service/app/metrics/v4/cumulative/timeseries.go b/pkg/query-service/app/metrics/v4/cumulative/timeseries.go index 6f39a952cb..7dfa8fef87 100644 --- a/pkg/query-service/app/metrics/v4/cumulative/timeseries.go +++ b/pkg/query-service/app/metrics/v4/cumulative/timeseries.go @@ -3,7 +3,7 @@ package cumulative import ( "fmt" - v4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils" @@ -107,7 +107,7 @@ const ( func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) { var subQuery string - timeSeriesSubQuery, err := v4.PrepareTimeseriesFilterQuery(mq) + timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq) if err != nil { return "", err } @@ -127,15 +127,8 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) " GROUP BY fingerprint, ts" + " ORDER BY fingerprint, ts" - var selectLabelsAny string - for _, tag := range mq.GroupBy { - selectLabelsAny += fmt.Sprintf("any(%s) as %s,", tag.Key, tag.Key) - } - - var selectLabels string - for _, tag := range mq.GroupBy { - selectLabels += tag.Key + "," - } + selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy) + selectLabels := helpers.SelectLabels(mq.GroupBy) switch mq.TimeAggregation { case v3.TimeAggregationAvg: @@ -177,8 +170,8 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) return subQuery, nil } -// prepareMetricQueryCumulativeTimeSeries prepares the query to be used for fetching metrics -func prepareMetricQueryCumulativeTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) { +// PrepareMetricQueryCumulativeTimeSeries prepares the query to be used for fetching metrics +func PrepareMetricQueryCumulativeTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) { var query string temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq) @@ -186,9 +179,9 @@ func prepareMetricQueryCumulativeTimeSeries(start, end, step int64, mq *v3.Build return "", err } - groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...) - orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) - selectLabels := groupByAttributeKeyTags(mq.GroupBy...) + groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...) + orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) + selectLabels := helpers.GroupByAttributeKeyTags(mq.GroupBy...) queryTmpl := "SELECT %s," + diff --git a/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go b/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go index 6b1d1e43b9..91dd1c4a1e 100644 --- a/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go +++ b/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go @@ -216,7 +216,7 @@ func TestPrepareTimeseriesQuery(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - query, err := prepareMetricQueryCumulativeTimeSeries( + query, err := PrepareMetricQueryCumulativeTimeSeries( testCase.start, testCase.end, testCase.builderQuery.StepInterval, diff --git a/pkg/query-service/app/metrics/v4/delta/helper.go b/pkg/query-service/app/metrics/v4/delta/helper.go deleted file mode 100644 index 972120fc15..0000000000 --- a/pkg/query-service/app/metrics/v4/delta/helper.go +++ /dev/null @@ -1,61 +0,0 @@ -package delta - -import ( - "fmt" - "strings" - - v3 "go.signoz.io/signoz/pkg/query-service/model/v3" -) - -// groupingSets returns a string of comma separated tags for group by clause -// `ts` is always added to the group by clause -func groupingSets(tags ...string) string { - withTs := append(tags, "ts") - if len(withTs) > 1 { - return fmt.Sprintf(`GROUPING SETS ( (%s), (%s) )`, strings.Join(withTs, ", "), strings.Join(tags, ", ")) - } else { - return strings.Join(withTs, ", ") - } -} - -// groupingSetsByAttributeKeyTags returns a string of comma separated tags for group by clause -func groupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string { - groupTags := []string{} - for _, tag := range tags { - groupTags = append(groupTags, tag.Key) - } - return groupingSets(groupTags...) -} - -// groupBy returns a string of comma separated tags for group by clause -func groupByAttributeKeyTags(tags ...v3.AttributeKey) string { - groupTags := []string{} - for _, tag := range tags { - groupTags = append(groupTags, tag.Key) - } - groupTags = append(groupTags, "ts") - return strings.Join(groupTags, ", ") -} - -// orderBy returns a string of comma separated tags for order by clause -// if the order is not specified, it defaults to ASC -func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string { - var orderBy []string - for _, tag := range tags { - found := false - for _, item := range items { - if item.ColumnName == tag.Key { - found = true - orderBy = append(orderBy, fmt.Sprintf("%s %s", item.ColumnName, item.Order)) - break - } - } - if !found { - orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag.Key)) - } - } - - orderBy = append(orderBy, "ts ASC") - - return strings.Join(orderBy, ", ") -} diff --git a/pkg/query-service/app/metrics/v4/delta/table.go b/pkg/query-service/app/metrics/v4/delta/table.go index b2b42bb9a6..bec10772f5 100644 --- a/pkg/query-service/app/metrics/v4/delta/table.go +++ b/pkg/query-service/app/metrics/v4/delta/table.go @@ -3,11 +3,17 @@ package delta import ( "fmt" + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) -// prepareMetricQueryDeltaTable builds the query to be used for fetching metrics -func prepareMetricQueryDeltaTable(start, end, step int64, mq *v3.BuilderQuery) (string, error) { +// PrepareMetricQueryDeltaTable builds the query to be used for fetching metrics +func PrepareMetricQueryDeltaTable(start, end, step int64, mq *v3.BuilderQuery) (string, error) { + + if canShortCircuit(mq) { + return prepareQueryOptimized(start, end, step, mq) + } + var query string temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq) @@ -15,9 +21,9 @@ func prepareMetricQueryDeltaTable(start, end, step int64, mq *v3.BuilderQuery) ( return "", err } - groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...) - orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) - selectLabels := groupByAttributeKeyTags(mq.GroupBy...) + groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...) + orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) + selectLabels := helpers.GroupByAttributeKeyTags(mq.GroupBy...) queryTmpl := "SELECT %s," + diff --git a/pkg/query-service/app/metrics/v4/delta/table_test.go b/pkg/query-service/app/metrics/v4/delta/table_test.go index 271afcd0d2..c7bce4268c 100644 --- a/pkg/query-service/app/metrics/v4/delta/table_test.go +++ b/pkg/query-service/app/metrics/v4/delta/table_test.go @@ -95,13 +95,13 @@ func TestPrepareTableQuery(t *testing.T) { }, start: 1701794980000, end: 1701796780000, - expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", + expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - query, err := prepareMetricQueryDeltaTable( + query, err := PrepareMetricQueryDeltaTable( testCase.start, testCase.end, testCase.builderQuery.StepInterval, diff --git a/pkg/query-service/app/metrics/v4/delta/time_series_test.go b/pkg/query-service/app/metrics/v4/delta/time_series_test.go index 6eada21482..024371d328 100644 --- a/pkg/query-service/app/metrics/v4/delta/time_series_test.go +++ b/pkg/query-service/app/metrics/v4/delta/time_series_test.go @@ -210,13 +210,13 @@ func TestPrepareTimeseriesQuery(t *testing.T) { }, start: 1701794980000, end: 1701796780000, - expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", + expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - query, err := prepareMetricQueryDeltaTimeSeries( + query, err := PrepareMetricQueryDeltaTimeSeries( testCase.start, testCase.end, testCase.builderQuery.StepInterval, diff --git a/pkg/query-service/app/metrics/v4/delta/timeseries.go b/pkg/query-service/app/metrics/v4/delta/timeseries.go index 83f9e2f111..3d6999f425 100644 --- a/pkg/query-service/app/metrics/v4/delta/timeseries.go +++ b/pkg/query-service/app/metrics/v4/delta/timeseries.go @@ -3,18 +3,18 @@ package delta import ( "fmt" - v4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils" ) -// prepareTimeAggregationSubQueryTimeSeries builds the sub-query to be used for temporal aggregation +// prepareTimeAggregationSubQuery builds the sub-query to be used for temporal aggregation func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) { var subQuery string - timeSeriesSubQuery, err := v4.PrepareTimeseriesFilterQuery(mq) + timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq) if err != nil { return "", err } @@ -34,15 +34,7 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) " GROUP BY fingerprint, ts" + " ORDER BY fingerprint, ts" - var selectLabelsAny string - for _, tag := range mq.GroupBy { - selectLabelsAny += fmt.Sprintf("any(%s) as %s,", tag.Key, tag.Key) - } - - var selectLabels string - for _, tag := range mq.GroupBy { - selectLabels += tag.Key + "," - } + selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy) switch mq.TimeAggregation { case v3.TimeAggregationAvg: @@ -76,8 +68,58 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) return subQuery, nil } -// prepareMetricQueryDeltaTimeSeries builds the query to be used for fetching metrics -func prepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) { +// See `canShortCircuit` below for details +func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string, error) { + + groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...) + orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) + selectLabels := helpers.SelectLabels(mq.GroupBy) + + var query string + + timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq) + if err != nil { + return "", err + } + + samplesTableFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + + // Select the aggregate value for interval + queryTmpl := + "SELECT %s" + + " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " %s as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableFilter + + " GROUP BY %s" + + " ORDER BY %s" + + switch mq.SpaceAggregation { + case v3.SpaceAggregationSum: + op := "sum(value)" + if mq.TimeAggregation == v3.TimeAggregationRate { + op = "sum(value)/" + fmt.Sprintf("%d", step) + } + query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy) + case v3.SpaceAggregationMin: + op := "min(value)" + query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy) + case v3.SpaceAggregationMax: + op := "max(value)" + query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy) + } + return query, nil +} + +// PrepareMetricQueryDeltaTimeSeries builds the query to be used for fetching metrics +func PrepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) { + + if canShortCircuit(mq) { + return prepareQueryOptimized(start, end, step, mq) + } var query string @@ -86,9 +128,9 @@ func prepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQue return "", err } - groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...) - orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) - selectLabels := groupByAttributeKeyTags(mq.GroupBy...) + groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...) + orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) + selectLabels := helpers.GroupByAttributeKeyTags(mq.GroupBy...) queryTmpl := "SELECT %s," + @@ -118,3 +160,37 @@ func prepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQue return query, nil } + +// canShortCircuit returns true if we can use the optimized query +// for the given query +// This is used to avoid the group by fingerprint thus improving the performance +// for certain queries +// cases where we can short circuit: +// 1. time aggregation = (rate|increase) and space aggregation = sum +// - rate = sum(value)/step, increase = sum(value) - sum of sums is same as sum of all values +// +// 2. time aggregation = sum and space aggregation = sum +// - sum of sums is same as sum of all values +// +// 3. time aggregation = min and space aggregation = min +// - min of mins is same as min of all values +// +// 4. time aggregation = max and space aggregation = max +// - max of maxs is same as max of all values +// +// all of this is true only for delta metrics +func canShortCircuit(mq *v3.BuilderQuery) bool { + if (mq.TimeAggregation == v3.TimeAggregationRate || mq.TimeAggregation == v3.TimeAggregationIncrease) && mq.SpaceAggregation == v3.SpaceAggregationSum { + return true + } + if mq.TimeAggregation == v3.TimeAggregationSum && mq.SpaceAggregation == v3.SpaceAggregationSum { + return true + } + if mq.TimeAggregation == v3.TimeAggregationMin && mq.SpaceAggregation == v3.SpaceAggregationMin { + return true + } + if mq.TimeAggregation == v3.TimeAggregationMax && mq.SpaceAggregation == v3.SpaceAggregationMax { + return true + } + return false +} diff --git a/pkg/query-service/app/metrics/v4/cumulative/helper.go b/pkg/query-service/app/metrics/v4/helpers/clauses.go similarity index 58% rename from pkg/query-service/app/metrics/v4/cumulative/helper.go rename to pkg/query-service/app/metrics/v4/helpers/clauses.go index 6e692d3f37..06f4b13cea 100644 --- a/pkg/query-service/app/metrics/v4/cumulative/helper.go +++ b/pkg/query-service/app/metrics/v4/helpers/clauses.go @@ -1,4 +1,4 @@ -package cumulative +package helpers import ( "fmt" @@ -18,8 +18,8 @@ func groupingSets(tags ...string) string { } } -// groupingSetsByAttributeKeyTags returns a string of comma separated tags for group by clause -func groupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string { +// GroupingSetsByAttributeKeyTags returns a string of comma separated tags for group by clause +func GroupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string { groupTags := []string{} for _, tag := range tags { groupTags = append(groupTags, tag.Key) @@ -27,8 +27,8 @@ func groupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string { return groupingSets(groupTags...) } -// groupBy returns a string of comma separated tags for group by clause -func groupByAttributeKeyTags(tags ...v3.AttributeKey) string { +// GroupByAttributeKeyTags returns a string of comma separated tags for group by clause +func GroupByAttributeKeyTags(tags ...v3.AttributeKey) string { groupTags := []string{} for _, tag := range tags { groupTags = append(groupTags, tag.Key) @@ -37,9 +37,9 @@ func groupByAttributeKeyTags(tags ...v3.AttributeKey) string { return strings.Join(groupTags, ", ") } -// orderBy returns a string of comma separated tags for order by clause +// OrderByAttributeKeyTags returns a string of comma separated tags for order by clause // if the order is not specified, it defaults to ASC -func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string { +func OrderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string { var orderBy []string for _, tag := range tags { found := false @@ -59,3 +59,19 @@ func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string return strings.Join(orderBy, ", ") } + +func SelectLabelsAny(tags []v3.AttributeKey) string { + var selectLabelsAny []string + for _, tag := range tags { + selectLabelsAny = append(selectLabelsAny, fmt.Sprintf("any(%s) as %s,", tag.Key, tag.Key)) + } + return strings.Join(selectLabelsAny, " ") +} + +func SelectLabels(tags []v3.AttributeKey) string { + var selectLabels []string + for _, tag := range tags { + selectLabels = append(selectLabels, fmt.Sprintf("%s,", tag.Key)) + } + return strings.Join(selectLabels, " ") +} diff --git a/pkg/query-service/app/metrics/v4/helpers/sub_query.go b/pkg/query-service/app/metrics/v4/helpers/sub_query.go new file mode 100644 index 0000000000..97176e54bd --- /dev/null +++ b/pkg/query-service/app/metrics/v4/helpers/sub_query.go @@ -0,0 +1,86 @@ +package helpers + +import ( + "fmt" + "strings" + + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +// PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria +func PrepareTimeseriesFilterQuery(mq *v3.BuilderQuery) (string, error) { + var conditions []string + var fs *v3.FilterSet = mq.Filters + var groupTags []v3.AttributeKey = mq.GroupBy + + conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key))) + conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality)) + + if fs != nil && len(fs.Items) != 0 { + for _, item := range fs.Items { + toFormat := item.Value + op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator)))) + if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains { + toFormat = fmt.Sprintf("%%%s%%", toFormat) + } + fmtVal := utils.ClickHouseFormattedValue(toFormat) + switch op { + case v3.FilterOperatorEqual: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotEqual: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorIn: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotIn: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorLike: + conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotLike: + conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorRegex: + conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotRegex: + conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorGreaterThan: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorGreaterThanOrEq: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorLessThan: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorLessThanOrEq: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorContains: + conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotContains: + conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorExists: + conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", item.Key.Key)) + case v3.FilterOperatorNotExists: + conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", item.Key.Key)) + default: + return "", fmt.Errorf("unsupported filter operator") + } + } + } + whereClause := strings.Join(conditions, " AND ") + + var selectLabels string + for _, tag := range groupTags { + selectLabels += fmt.Sprintf("JSONExtractString(labels, '%s') as %s, ", tag.Key, tag.Key) + } + + // The table JOIN key always exists + selectLabels += "fingerprint" + + filterSubQuery := fmt.Sprintf( + "SELECT DISTINCT %s FROM %s.%s WHERE %s", + selectLabels, + constants.SIGNOZ_METRIC_DBNAME, + constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, + whereClause, + ) + + return filterSubQuery, nil +} diff --git a/pkg/query-service/app/metrics/v4/query_builder.go b/pkg/query-service/app/metrics/v4/query_builder.go index 5e6c18d72a..f54f2ff059 100644 --- a/pkg/query-service/app/metrics/v4/query_builder.go +++ b/pkg/query-service/app/metrics/v4/query_builder.go @@ -2,100 +2,66 @@ package v4 import ( "fmt" - "strings" "time" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" - "go.signoz.io/signoz/pkg/query-service/constants" + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/cumulative" + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/delta" + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" + "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" - "go.signoz.io/signoz/pkg/query-service/utils" ) -// PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria -func PrepareTimeseriesFilterQuery(mq *v3.BuilderQuery) (string, error) { - var conditions []string - var fs *v3.FilterSet = mq.Filters - var groupTags []v3.AttributeKey = mq.GroupBy - - conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key))) - conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality)) - - if fs != nil && len(fs.Items) != 0 { - for _, item := range fs.Items { - toFormat := item.Value - op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator)))) - if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains { - toFormat = fmt.Sprintf("%%%s%%", toFormat) - } - fmtVal := utils.ClickHouseFormattedValue(toFormat) - switch op { - case v3.FilterOperatorEqual: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorNotEqual: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorIn: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorNotIn: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorLike: - conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) - case v3.FilterOperatorNotLike: - conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) - case v3.FilterOperatorRegex: - conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) - case v3.FilterOperatorNotRegex: - conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) - case v3.FilterOperatorGreaterThan: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorGreaterThanOrEq: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorLessThan: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorLessThanOrEq: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorContains: - conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) - case v3.FilterOperatorNotContains: - conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) - case v3.FilterOperatorExists: - conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", item.Key.Key)) - case v3.FilterOperatorNotExists: - conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", item.Key.Key)) - default: - return "", fmt.Errorf("unsupported filter operator") - } - } - } - whereClause := strings.Join(conditions, " AND ") - - var selectLabels string - for _, tag := range groupTags { - selectLabels += fmt.Sprintf("JSONExtractString(labels, '%s') as %s, ", tag.Key, tag.Key) - } - - // The table JOIN key always exists - selectLabels += "fingerprint" - - filterSubQuery := fmt.Sprintf( - "SELECT DISTINCT %s FROM %s.%s WHERE %s", - selectLabels, - constants.SIGNOZ_METRIC_DBNAME, - constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, - whereClause, - ) - - return filterSubQuery, nil -} - // PrepareMetricQuery prepares the query to be used for fetching metrics // from the database // start and end are in milliseconds // step is in seconds func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options metricsV3.Options) (string, error) { - // TODO(srikanthcc): implement - return "", nil + start, end = common.AdjustedMetricTimeRange(start, end, mq.StepInterval, mq.TimeAggregation) + + groupBy := helpers.GroupByAttributeKeyTags(mq.GroupBy...) + orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) + + if mq.Quantile != 0 { + // If quantile is set, we need to group by le + // and set the space aggregation to sum + // and time aggregation to rate + mq.TimeAggregation = v3.TimeAggregationRate + mq.SpaceAggregation = v3.SpaceAggregationSum + mq.GroupBy = append(mq.GroupBy, v3.AttributeKey{ + Key: "le", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }) + } + + var query string + var err error + if mq.Temporality == v3.Delta { + if panelType == v3.PanelTypeTable { + query, err = delta.PrepareMetricQueryDeltaTable(start, end, mq.StepInterval, mq) + } else { + query, err = delta.PrepareMetricQueryDeltaTimeSeries(start, end, mq.StepInterval, mq) + } + } else { + if panelType == v3.PanelTypeTable { + query, err = cumulative.PrepareMetricQueryCumulativeTable(start, end, mq.StepInterval, mq) + } else { + query, err = cumulative.PrepareMetricQueryCumulativeTimeSeries(start, end, mq.StepInterval, mq) + } + } + + if err != nil { + return "", err + } + + if mq.Quantile != 0 { + query = fmt.Sprintf(`SELECT %s, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s`, groupBy, mq.Quantile, query, groupBy, orderBy) + } + + return query, nil } func BuildPromQuery(promQuery *v3.PromQuery, step, start, end int64) *model.QueryRangeParams { diff --git a/pkg/query-service/app/metrics/v4/query_builder_test.go b/pkg/query-service/app/metrics/v4/query_builder_test.go index eb071ecb2f..429c25b8e8 100644 --- a/pkg/query-service/app/metrics/v4/query_builder_test.go +++ b/pkg/query-service/app/metrics/v4/query_builder_test.go @@ -4,6 +4,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) @@ -142,7 +144,385 @@ func TestPrepareTimeseriesFilterQuery(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - query, err := PrepareTimeseriesFilterQuery(testCase.builderQuery) + query, err := helpers.PrepareTimeseriesFilterQuery(testCase.builderQuery) + assert.Nil(t, err) + assert.Contains(t, query, testCase.expectedQueryContains) + }) + } +} + +func TestPrepareMetricQueryCumulativeRate(t *testing.T) { + testCases := []struct { + name string + builderQuery *v3.BuilderQuery + expectedQueryContains string + }{ + { + name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_calls_total", + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "frontend", + }, + }, + }, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }}, + Expression: "A", + Disabled: false, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + }, + expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", + }, + { + name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative, multiple group by", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_calls_total", + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "endpoint", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, + Expression: "A", + Disabled: false, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + }, + expectedQueryContains: "SELECT service_name, endpoint, ts, sum(per_series_value) as value FROM (SELECT service_name, endpoint, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(endpoint) as endpoint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, endpoint, ts), (service_name, endpoint) ) ORDER BY service_name ASC, endpoint ASC, ts ASC", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) + assert.Nil(t, err) + assert.Contains(t, query, testCase.expectedQueryContains) + }) + } +} + +func TestPrepareMetricQueryDeltaRate(t *testing.T) { + testCases := []struct { + name string + builderQuery *v3.BuilderQuery + expectedQueryContains string + }{ + { + name: "test time aggregation = rate, space aggregation = sum, temporality = delta, no group by", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_calls_total", + }, + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + Expression: "A", + Disabled: false, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + }, + expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts ASC", + }, + { + name: "test time aggregation = rate, space aggregation = sum, temporality = delta, group by service_name", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_calls_total", + }, + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }}, + Expression: "A", + Disabled: false, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + }, + expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) + assert.Nil(t, err) + assert.Contains(t, query, testCase.expectedQueryContains) + }) + } +} + +func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) { + testCases := []struct { + name string + builderQuery *v3.BuilderQuery + expectedQueryContains string + }{ + { + name: "test temporality = cumulative, quantile = 0.99", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "frontend", + }, + }, + }, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }}, + Expression: "A", + Disabled: false, + Quantile: 0.99, + }, + expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", + }, + { + name: "test temporality = cumulative, quantile = 0.99 without group by", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "frontend", + }, + }, + }, + Expression: "A", + Disabled: false, + Quantile: 0.99, + }, + expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) + assert.Nil(t, err) + assert.Contains(t, query, testCase.expectedQueryContains) + }) + } +} + +func TestPrepreMetricQueryDeltaQuantile(t *testing.T) { + testCases := []struct { + name string + builderQuery *v3.BuilderQuery + expectedQueryContains string + }{ + { + name: "test temporality = delta, quantile = 0.99 group by service_name", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "frontend", + }, + }, + }, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }}, + Expression: "A", + Disabled: false, + Quantile: 0.99, + }, + expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", + }, + { + name: "test temporality = delta, quantile = 0.99 no group by", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "frontend", + }, + }, + }, + Expression: "A", + Disabled: false, + Quantile: 0.99, + }, + expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) + assert.Nil(t, err) + assert.Contains(t, query, testCase.expectedQueryContains) + }) + } +} + +func TestPrepareMetricQueryGauge(t *testing.T) { + testCases := []struct { + name string + builderQuery *v3.BuilderQuery + expectedQueryContains string + }{ + { + name: "test gauge query with no group by", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_usage", + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + Expression: "A", + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified') as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", + }, + { + name: "test gauge query with group by host_name", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_usage", + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{{ + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }}, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Expression: "A", + Disabled: false, + }, + expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified') as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (host_name, ts), (host_name) ) ORDER BY host_name ASC, ts ASC", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) assert.Nil(t, err) assert.Contains(t, query, testCase.expectedQueryContains) }) diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index f7fa328b9f..eb50a775ce 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -267,6 +267,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) { api.RegisterMetricsRoutes(r, am) api.RegisterLogsRoutes(r, am) api.RegisterQueryRangeV3Routes(r, am) + api.RegisterQueryRangeV4Routes(r, am) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, diff --git a/pkg/query-service/common/metrics.go b/pkg/query-service/common/metrics.go new file mode 100644 index 0000000000..8596ba9d7c --- /dev/null +++ b/pkg/query-service/common/metrics.go @@ -0,0 +1,19 @@ +package common + +import ( + "math" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func AdjustedMetricTimeRange(start, end, step int64, aggregaOperator v3.TimeAggregation) (int64, int64) { + start = start - (start % (step * 1000)) + // if the query is a rate query, we adjust the start time by one more step + // so that we can calculate the rate for the first data point + if aggregaOperator.IsRateOperator() { + start -= step * 1000 + } + adjustStep := int64(math.Min(float64(step), 60)) + end = end - (end % (adjustStep * 1000)) + return start, end +} diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 968fe188e6..a11e888c15 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -462,6 +462,15 @@ const ( TimeAggregationIncrease TimeAggregation = "increase" ) +func (t TimeAggregation) IsRateOperator() bool { + switch t { + case TimeAggregationRate, TimeAggregationIncrease: + return true + default: + return false + } +} + type SpaceAggregation string const ( @@ -500,6 +509,7 @@ type BuilderQuery struct { SelectColumns []AttributeKey `json:"selectColumns,omitempty"` TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"` SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"` + Quantile float64 `json:"quantile,omitempty"` Functions []Function `json:"functions,omitempty"` } @@ -517,8 +527,16 @@ func (b *BuilderQuery) Validate() error { if err := b.DataSource.Validate(); err != nil { return fmt.Errorf("data source is invalid: %w", err) } - if err := b.AggregateOperator.Validate(); err != nil { - return fmt.Errorf("aggregate operator is invalid: %w", err) + if b.DataSource == DataSourceMetrics { + if b.TimeAggregation == TimeAggregationUnspecified && b.Quantile == 0 { + if err := b.AggregateOperator.Validate(); err != nil { + return fmt.Errorf("aggregate operator is invalid: %w", err) + } + } + } else { + if err := b.AggregateOperator.Validate(); err != nil { + return fmt.Errorf("aggregate operator is invalid: %w", err) + } } if b.AggregateAttribute == (AttributeKey{}) && b.AggregateOperator.RequireAttribute(b.DataSource) { return fmt.Errorf("aggregate attribute is required")