From 08d496e31445ad61625e10bada5ca5e3e77ac5a7 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 13 Jul 2023 14:22:30 +0530 Subject: [PATCH] feat: allow limit on metrics time series result (#2979) --- .../app/clickhouseReader/reader.go | 17 +- pkg/query-service/app/http_handler.go | 33 ++ pkg/query-service/app/http_handler_test.go | 346 ++++++++++++++++++ .../app/metrics/v3/query_builder.go | 31 +- .../app/metrics/v3/query_builder_test.go | 2 +- pkg/query-service/model/v3/v3.go | 5 +- 6 files changed, 423 insertions(+), 11 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index f30db29af6..0a32730f97 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -4162,7 +4162,22 @@ func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNam var seriesList []*v3.Series for key := range seriesToPoints { - series := v3.Series{Labels: seriesToAttrs[key], Points: seriesToPoints[key]} + points := seriesToPoints[key] + + // find the grouping sets point for the series + // this is the point with the zero timestamp + // if there is no such point, then the series is not grouped + // and we can skip this step + var groupingSetsPoint *v3.Point + for idx, point := range points { + if point.Timestamp <= 0 { + groupingSetsPoint = &point + // remove the grouping sets point from the list of points + points = append(points[:idx], points[idx+1:]...) + break + } + } + series := v3.Series{Labels: seriesToAttrs[key], Points: points, GroupingSetsPoint: groupingSetsPoint} seriesList = append(seriesList, &series) } return seriesList, nil diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 604b1f046a..e4803b4c2e 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -8,6 +8,7 @@ import ( "fmt" "io/ioutil" "net/http" + "sort" "strconv" "strings" "sync" @@ -2758,6 +2759,8 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que return } + applyMetricLimit(result, queryRangeParams) + resp := v3.QueryRangeResponse{ Result: result, } @@ -2775,3 +2778,33 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) { aH.queryRangeV3(r.Context(), queryRangeParams, w, r) } + +func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) { + // apply limit if any for metrics + // use the grouping set points to apply the limit + + for _, result := range results { + builderQueries := queryRangeParams.CompositeQuery.BuilderQueries + if builderQueries != nil && builderQueries[result.QueryName].DataSource == v3.DataSourceMetrics { + limit := builderQueries[result.QueryName].Limit + var orderAsc bool + for _, item := range builderQueries[result.QueryName].OrderBy { + if item.ColumnName == constants.SigNozOrderByValue { + orderAsc = strings.ToLower(item.Order) == "asc" + break + } + } + if limit != 0 { + sort.Slice(result.Series, func(i, j int) bool { + if orderAsc { + return result.Series[i].Points[0].Value < result.Series[j].Points[0].Value + } + return result.Series[i].Points[0].Value > result.Series[j].Points[0].Value + }) + if len(result.Series) > int(limit) { + result.Series = result.Series[:limit] + } + } + } + } +} diff --git a/pkg/query-service/app/http_handler_test.go b/pkg/query-service/app/http_handler_test.go index 84782f7cae..958bcdeee2 100644 --- a/pkg/query-service/app/http_handler_test.go +++ b/pkg/query-service/app/http_handler_test.go @@ -8,7 +8,9 @@ import ( "strings" "testing" + "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) func TestPrepareQuery(t *testing.T) { @@ -130,3 +132,347 @@ func TestPrepareQuery(t *testing.T) { }) } } + +func TestApplyLimitOnMetricResult(t *testing.T) { + cases := []struct { + name string + inputResult []*v3.Result + params *v3.QueryRangeParamsV3 + expectedResult []*v3.Result + }{ + { + name: "test limit 1 without order", // top most (latency/error) as default + inputResult: []*v3.Result{ + { + QueryName: "A", + Series: []*v3.Series{ + { + Labels: map[string]string{ + "service_name": "frontend", + }, + Points: []v3.Point{ + { + Timestamp: 1689220036000, + Value: 19.2, + }, + { + Timestamp: 1689220096000, + Value: 19.5, + }, + }, + GroupingSetsPoint: &v3.Point{ + Timestamp: 0, + Value: 19.3, + }, + }, + { + Labels: map[string]string{ + "service_name": "route", + }, + Points: []v3.Point{ + { + Timestamp: 1689220036000, + Value: 8.83, + }, + { + Timestamp: 1689220096000, + Value: 8.83, + }, + }, + GroupingSetsPoint: &v3.Point{ + Timestamp: 0, + Value: 8.83, + }, + }, + }, + }, + }, + params: &v3.QueryRangeParamsV3{ + Start: 1689220036000, + End: 1689220096000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "signo_calls_total"}, + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorSumRate, + Expression: "A", + GroupBy: []v3.AttributeKey{{Key: "service_name"}}, + Limit: 1, + }, + }, + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + }, + }, + expectedResult: []*v3.Result{ + { + QueryName: "A", + Series: []*v3.Series{ + { + Labels: map[string]string{ + "service_name": "frontend", + }, + Points: []v3.Point{ + { + Timestamp: 1689220036000, + Value: 19.2, + }, + { + Timestamp: 1689220096000, + Value: 19.5, + }, + }, + GroupingSetsPoint: &v3.Point{ + Timestamp: 0, + Value: 19.3, + }, + }, + }, + }, + }, + }, + { + name: "test limit with order asc", + inputResult: []*v3.Result{ + { + QueryName: "A", + Series: []*v3.Series{ + { + Labels: map[string]string{ + "service_name": "frontend", + }, + Points: []v3.Point{ + { + Timestamp: 1689220036000, + Value: 19.2, + }, + { + Timestamp: 1689220096000, + Value: 19.5, + }, + }, + GroupingSetsPoint: &v3.Point{ + Timestamp: 0, + Value: 19.3, + }, + }, + { + Labels: map[string]string{ + "service_name": "route", + }, + Points: []v3.Point{ + { + Timestamp: 1689220036000, + Value: 8.83, + }, + { + Timestamp: 1689220096000, + Value: 8.83, + }, + }, + GroupingSetsPoint: &v3.Point{ + Timestamp: 0, + Value: 8.83, + }, + }, + }, + }, + }, + params: &v3.QueryRangeParamsV3{ + Start: 1689220036000, + End: 1689220096000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "signo_calls_total"}, + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorSumRate, + Expression: "A", + GroupBy: []v3.AttributeKey{{Key: "service_name"}}, + Limit: 1, + OrderBy: []v3.OrderBy{{ColumnName: constants.SigNozOrderByValue, Order: "asc"}}, + }, + }, + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + }, + }, + expectedResult: []*v3.Result{ + { + QueryName: "A", + Series: []*v3.Series{ + { + Labels: map[string]string{ + "service_name": "route", + }, + Points: []v3.Point{ + { + Timestamp: 1689220036000, + Value: 8.83, + }, + { + Timestamp: 1689220096000, + Value: 8.83, + }, + }, + GroupingSetsPoint: &v3.Point{ + Timestamp: 0, + Value: 8.83, + }, + }, + }, + }, + }, + }, + { + name: "test data source not metrics", + inputResult: []*v3.Result{ + { + QueryName: "A", + Series: []*v3.Series{ + { + Labels: map[string]string{ + "service_name": "frontend", + }, + Points: []v3.Point{ + { + Timestamp: 1689220036000, + Value: 69, + }, + { + Timestamp: 1689220096000, + Value: 240, + }, + }, + GroupingSetsPoint: &v3.Point{ + Timestamp: 0, + Value: 154.5, + }, + }, + { + Labels: map[string]string{ + "service_name": "redis", + }, + Points: []v3.Point{ + { + Timestamp: 1689220036000, + Value: 420, + }, + { + Timestamp: 1689220096000, + Value: 260, + }, + }, + GroupingSetsPoint: &v3.Point{ + Timestamp: 0, + Value: 340, + }, + }, + }, + }, + }, + params: &v3.QueryRangeParamsV3{ + Start: 1689220036000, + End: 1689220096000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "service_name"}, + DataSource: v3.DataSourceTraces, + AggregateOperator: v3.AggregateOperatorSum, + Expression: "A", + GroupBy: []v3.AttributeKey{{Key: "service_name"}}, + Limit: 1, + OrderBy: []v3.OrderBy{{ColumnName: constants.SigNozOrderByValue, Order: "asc"}}, + }, + }, + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + }, + }, + expectedResult: []*v3.Result{ + { + QueryName: "A", + Series: []*v3.Series{ + { + Labels: map[string]string{ + "service_name": "frontend", + }, + Points: []v3.Point{ + { + Timestamp: 1689220036000, + Value: 69, + }, + { + Timestamp: 1689220096000, + Value: 240, + }, + }, + GroupingSetsPoint: &v3.Point{ + Timestamp: 0, + Value: 154.5, + }, + }, + { + Labels: map[string]string{ + "service_name": "redis", + }, + Points: []v3.Point{ + { + Timestamp: 1689220036000, + Value: 420, + }, + { + Timestamp: 1689220096000, + Value: 260, + }, + }, + GroupingSetsPoint: &v3.Point{ + Timestamp: 0, + Value: 340, + }, + }, + }, + }, + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + result := c.inputResult + applyMetricLimit(result, c.params) + if len(result) != len(c.expectedResult) { + t.Errorf("expected result length: %d, but got: %d", len(c.expectedResult), len(result)) + } + for i, r := range result { + if r.QueryName != c.expectedResult[i].QueryName { + t.Errorf("expected query name: %s, but got: %s", c.expectedResult[i].QueryName, r.QueryName) + } + if len(r.Series) != len(c.expectedResult[i].Series) { + t.Errorf("expected series length: %d, but got: %d", len(c.expectedResult[i].Series), len(r.Series)) + } + for j, s := range r.Series { + if len(s.Points) != len(c.expectedResult[i].Series[j].Points) { + t.Errorf("expected points length: %d, but got: %d", len(c.expectedResult[i].Series[j].Points), len(s.Points)) + } + for k, p := range s.Points { + if p.Timestamp != c.expectedResult[i].Series[j].Points[k].Timestamp { + t.Errorf("expected point timestamp: %d, but got: %d", c.expectedResult[i].Series[j].Points[k].Timestamp, p.Timestamp) + } + if p.Value != c.expectedResult[i].Series[j].Points[k].Value { + t.Errorf("expected point value: %f, but got: %f", c.expectedResult[i].Series[j].Points[k].Value, p.Value) + } + } + } + } + }) + } +} diff --git a/pkg/query-service/app/metrics/v3/query_builder.go b/pkg/query-service/app/metrics/v3/query_builder.go index f7e3956cca..1bff2ae2ba 100644 --- a/pkg/query-service/app/metrics/v3/query_builder.go +++ b/pkg/query-service/app/metrics/v3/query_builder.go @@ -193,6 +193,7 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str groupBy := groupByAttributeKeyTags(metricQueryGroupBy...) groupTags := groupSelectAttributeKeyTags(metricQueryGroupBy...) + groupSets := groupingSetsByAttributeKeyTags(metricQueryGroupBy...) orderBy := orderByAttributeKeyTags(mq.OrderBy, metricQueryGroupBy) if len(orderBy) != 0 { @@ -226,7 +227,7 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str ) // labels will be same so any should be fine query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0` query = fmt.Sprintf(query, groupTags, subQuery) - query = fmt.Sprintf(`SELECT %s ts, %s(value) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, aggregateOperatorToSQLFunc[mq.AggregateOperator], query, groupBy, orderBy) + query = fmt.Sprintf(`SELECT %s ts, %s(value) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, aggregateOperatorToSQLFunc[mq.AggregateOperator], query, groupSets, orderBy) return query, nil case v3.AggregateOperatorRateSum, @@ -234,7 +235,7 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str v3.AggregateOperatorRateAvg, v3.AggregateOperatorRateMin: op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) - subQuery := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + subQuery := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupSets, orderBy) query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0` query = fmt.Sprintf(query, groupTags, subQuery) return query, nil @@ -249,7 +250,7 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str v3.AggregateOperatorP95, v3.AggregateOperatorP99: op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator]) - query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupSets, orderBy) return query, nil case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99: rateGroupBy := "fingerprint, " + groupBy @@ -261,22 +262,22 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str ) // labels will be same so any should be fine query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WHERE isNaN(value) = 0` query = fmt.Sprintf(query, groupTags, subQuery) - query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s HAVING isNaN(value) = 0 ORDER BY %s ts`, groupTags, query, groupBy, orderBy) + query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s HAVING isNaN(value) = 0 ORDER BY %s ts`, groupTags, query, groupSets, orderBy) value := aggregateOperatorToPercentile[mq.AggregateOperator] query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, orderWithoutLe) return query, nil case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) - query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupSets, orderBy) return query, nil case v3.AggregateOperatorCount: op := "toFloat64(count(*))" - query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupSets, orderBy) return query, nil case v3.AggregateOperatorCountDistinct: op := "toFloat64(count(distinct(value)))" - query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupSets, orderBy) return query, nil case v3.AggregateOperatorNoOp: queryTmpl := @@ -297,6 +298,13 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str } } +// groupingSets returns a string of comma separated tags for group by clause +// `ts` is always added to the group by clause +func groupingSets(tags ...string) string { + withTs := append(tags, "ts") + return fmt.Sprintf(`GROUPING SETS ( (%s), (%s) )`, strings.Join(withTs, ", "), strings.Join(tags, ", ")) +} + // groupBy returns a string of comma separated tags for group by clause // `ts` is always added to the group by clause func groupBy(tags ...string) string { @@ -313,6 +321,14 @@ func groupSelect(tags ...string) string { return groupTags } +func groupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string { + groupTags := []string{} + for _, tag := range tags { + groupTags = append(groupTags, tag.Key) + } + return groupingSets(groupTags...) +} + func groupByAttributeKeyTags(tags ...v3.AttributeKey) string { groupTags := []string{} for _, tag := range tags { @@ -346,6 +362,7 @@ func orderBy(items []v3.OrderBy, tags []string) string { orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag)) } } + return strings.Join(orderBy, ",") } diff --git a/pkg/query-service/app/metrics/v3/query_builder_test.go b/pkg/query-service/app/metrics/v3/query_builder_test.go index 7319236254..f3f91129a8 100644 --- a/pkg/query-service/app/metrics/v3/query_builder_test.go +++ b/pkg/query-service/app/metrics/v3/query_builder_test.go @@ -238,7 +238,7 @@ func TestBuildQueryOperators(t *testing.T) { func TestBuildQueryXRate(t *testing.T) { t.Run("TestBuildQueryXRate", func(t *testing.T) { - tmpl := `SELECT ts, %s(value) as value FROM (SELECT ts, if (runningDifference(value) < 0 OR runningDifference(ts) <= 0, nan, runningDifference(value)/runningDifference(ts))as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 0 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'name') as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY ts ORDER BY ts` + tmpl := `SELECT ts, %s(value) as value FROM (SELECT ts, if (runningDifference(value) < 0 OR runningDifference(ts) <= 0, nan, runningDifference(value)/runningDifference(ts))as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 0 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'name') as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY GROUPING SETS ( (ts), () ) ORDER BY ts` cases := []struct { aggregateOperator v3.AggregateOperator diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 57f290f133..e320cc2877 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -576,8 +576,9 @@ type Result struct { } type Series struct { - Labels map[string]string `json:"labels"` - Points []Point `json:"values"` + Labels map[string]string `json:"labels"` + Points []Point `json:"values"` + GroupingSetsPoint *Point `json:"-"` } func (s *Series) SortPoints() {