feat: allow limit on metrics time series result (#2979)

This commit is contained in:
Srikanth Chekuri 2023-07-13 14:22:30 +05:30 committed by GitHub
parent 538261aa99
commit 08d496e314
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 423 additions and 11 deletions

View File

@ -4162,7 +4162,22 @@ func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNam
var seriesList []*v3.Series var seriesList []*v3.Series
for key := range seriesToPoints { for key := range seriesToPoints {
series := v3.Series{Labels: seriesToAttrs[key], Points: seriesToPoints[key]} points := seriesToPoints[key]
// find the grouping sets point for the series
// this is the point with the zero timestamp
// if there is no such point, then the series is not grouped
// and we can skip this step
var groupingSetsPoint *v3.Point
for idx, point := range points {
if point.Timestamp <= 0 {
groupingSetsPoint = &point
// remove the grouping sets point from the list of points
points = append(points[:idx], points[idx+1:]...)
break
}
}
series := v3.Series{Labels: seriesToAttrs[key], Points: points, GroupingSetsPoint: groupingSetsPoint}
seriesList = append(seriesList, &series) seriesList = append(seriesList, &series)
} }
return seriesList, nil return seriesList, nil

View File

@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -2758,6 +2759,8 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
return return
} }
applyMetricLimit(result, queryRangeParams)
resp := v3.QueryRangeResponse{ resp := v3.QueryRangeResponse{
Result: result, Result: result,
} }
@ -2775,3 +2778,33 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
aH.queryRangeV3(r.Context(), queryRangeParams, w, r) aH.queryRangeV3(r.Context(), queryRangeParams, w, r)
} }
func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
// apply limit if any for metrics
// use the grouping set points to apply the limit
for _, result := range results {
builderQueries := queryRangeParams.CompositeQuery.BuilderQueries
if builderQueries != nil && builderQueries[result.QueryName].DataSource == v3.DataSourceMetrics {
limit := builderQueries[result.QueryName].Limit
var orderAsc bool
for _, item := range builderQueries[result.QueryName].OrderBy {
if item.ColumnName == constants.SigNozOrderByValue {
orderAsc = strings.ToLower(item.Order) == "asc"
break
}
}
if limit != 0 {
sort.Slice(result.Series, func(i, j int) bool {
if orderAsc {
return result.Series[i].Points[0].Value < result.Series[j].Points[0].Value
}
return result.Series[i].Points[0].Value > result.Series[j].Points[0].Value
})
if len(result.Series) > int(limit) {
result.Series = result.Series[:limit]
}
}
}
}
}

View File

@ -8,7 +8,9 @@ import (
"strings" "strings"
"testing" "testing"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
) )
func TestPrepareQuery(t *testing.T) { func TestPrepareQuery(t *testing.T) {
@ -130,3 +132,347 @@ func TestPrepareQuery(t *testing.T) {
}) })
} }
} }
func TestApplyLimitOnMetricResult(t *testing.T) {
cases := []struct {
name string
inputResult []*v3.Result
params *v3.QueryRangeParamsV3
expectedResult []*v3.Result
}{
{
name: "test limit 1 without order", // top most (latency/error) as default
inputResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
{
Labels: map[string]string{
"service_name": "route",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
Start: 1689220036000,
End: 1689220096000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "signo_calls_total"},
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
GroupBy: []v3.AttributeKey{{Key: "service_name"}},
Limit: 1,
},
},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
},
},
expectedResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
},
},
},
},
{
name: "test limit with order asc",
inputResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
{
Labels: map[string]string{
"service_name": "route",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
Start: 1689220036000,
End: 1689220096000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "signo_calls_total"},
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
GroupBy: []v3.AttributeKey{{Key: "service_name"}},
Limit: 1,
OrderBy: []v3.OrderBy{{ColumnName: constants.SigNozOrderByValue, Order: "asc"}},
},
},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
},
},
expectedResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "route",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
},
{
name: "test data source not metrics",
inputResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 69,
},
{
Timestamp: 1689220096000,
Value: 240,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 154.5,
},
},
{
Labels: map[string]string{
"service_name": "redis",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 420,
},
{
Timestamp: 1689220096000,
Value: 260,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 340,
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
Start: 1689220036000,
End: 1689220096000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "service_name"},
DataSource: v3.DataSourceTraces,
AggregateOperator: v3.AggregateOperatorSum,
Expression: "A",
GroupBy: []v3.AttributeKey{{Key: "service_name"}},
Limit: 1,
OrderBy: []v3.OrderBy{{ColumnName: constants.SigNozOrderByValue, Order: "asc"}},
},
},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
},
},
expectedResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 69,
},
{
Timestamp: 1689220096000,
Value: 240,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 154.5,
},
},
{
Labels: map[string]string{
"service_name": "redis",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 420,
},
{
Timestamp: 1689220096000,
Value: 260,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 340,
},
},
},
},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
result := c.inputResult
applyMetricLimit(result, c.params)
if len(result) != len(c.expectedResult) {
t.Errorf("expected result length: %d, but got: %d", len(c.expectedResult), len(result))
}
for i, r := range result {
if r.QueryName != c.expectedResult[i].QueryName {
t.Errorf("expected query name: %s, but got: %s", c.expectedResult[i].QueryName, r.QueryName)
}
if len(r.Series) != len(c.expectedResult[i].Series) {
t.Errorf("expected series length: %d, but got: %d", len(c.expectedResult[i].Series), len(r.Series))
}
for j, s := range r.Series {
if len(s.Points) != len(c.expectedResult[i].Series[j].Points) {
t.Errorf("expected points length: %d, but got: %d", len(c.expectedResult[i].Series[j].Points), len(s.Points))
}
for k, p := range s.Points {
if p.Timestamp != c.expectedResult[i].Series[j].Points[k].Timestamp {
t.Errorf("expected point timestamp: %d, but got: %d", c.expectedResult[i].Series[j].Points[k].Timestamp, p.Timestamp)
}
if p.Value != c.expectedResult[i].Series[j].Points[k].Value {
t.Errorf("expected point value: %f, but got: %f", c.expectedResult[i].Series[j].Points[k].Value, p.Value)
}
}
}
}
})
}
}

View File

@ -193,6 +193,7 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str
groupBy := groupByAttributeKeyTags(metricQueryGroupBy...) groupBy := groupByAttributeKeyTags(metricQueryGroupBy...)
groupTags := groupSelectAttributeKeyTags(metricQueryGroupBy...) groupTags := groupSelectAttributeKeyTags(metricQueryGroupBy...)
groupSets := groupingSetsByAttributeKeyTags(metricQueryGroupBy...)
orderBy := orderByAttributeKeyTags(mq.OrderBy, metricQueryGroupBy) orderBy := orderByAttributeKeyTags(mq.OrderBy, metricQueryGroupBy)
if len(orderBy) != 0 { if len(orderBy) != 0 {
@ -226,7 +227,7 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str
) // labels will be same so any should be fine ) // labels will be same so any should be fine
query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0` query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0`
query = fmt.Sprintf(query, groupTags, subQuery) query = fmt.Sprintf(query, groupTags, subQuery)
query = fmt.Sprintf(`SELECT %s ts, %s(value) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, aggregateOperatorToSQLFunc[mq.AggregateOperator], query, groupBy, orderBy) query = fmt.Sprintf(`SELECT %s ts, %s(value) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, aggregateOperatorToSQLFunc[mq.AggregateOperator], query, groupSets, orderBy)
return query, nil return query, nil
case case
v3.AggregateOperatorRateSum, v3.AggregateOperatorRateSum,
@ -234,7 +235,7 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str
v3.AggregateOperatorRateAvg, v3.AggregateOperatorRateAvg,
v3.AggregateOperatorRateMin: v3.AggregateOperatorRateMin:
op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator])
subQuery := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) subQuery := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupSets, orderBy)
query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0` query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0`
query = fmt.Sprintf(query, groupTags, subQuery) query = fmt.Sprintf(query, groupTags, subQuery)
return query, nil return query, nil
@ -249,7 +250,7 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str
v3.AggregateOperatorP95, v3.AggregateOperatorP95,
v3.AggregateOperatorP99: v3.AggregateOperatorP99:
op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator]) op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator])
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupSets, orderBy)
return query, nil return query, nil
case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99: case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99:
rateGroupBy := "fingerprint, " + groupBy rateGroupBy := "fingerprint, " + groupBy
@ -261,22 +262,22 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str
) // labels will be same so any should be fine ) // labels will be same so any should be fine
query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WHERE isNaN(value) = 0` query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WHERE isNaN(value) = 0`
query = fmt.Sprintf(query, groupTags, subQuery) query = fmt.Sprintf(query, groupTags, subQuery)
query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s HAVING isNaN(value) = 0 ORDER BY %s ts`, groupTags, query, groupBy, orderBy) query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s HAVING isNaN(value) = 0 ORDER BY %s ts`, groupTags, query, groupSets, orderBy)
value := aggregateOperatorToPercentile[mq.AggregateOperator] value := aggregateOperatorToPercentile[mq.AggregateOperator]
query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, orderWithoutLe) query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, orderWithoutLe)
return query, nil return query, nil
case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax:
op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator])
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupSets, orderBy)
return query, nil return query, nil
case v3.AggregateOperatorCount: case v3.AggregateOperatorCount:
op := "toFloat64(count(*))" op := "toFloat64(count(*))"
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupSets, orderBy)
return query, nil return query, nil
case v3.AggregateOperatorCountDistinct: case v3.AggregateOperatorCountDistinct:
op := "toFloat64(count(distinct(value)))" op := "toFloat64(count(distinct(value)))"
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupSets, orderBy)
return query, nil return query, nil
case v3.AggregateOperatorNoOp: case v3.AggregateOperatorNoOp:
queryTmpl := queryTmpl :=
@ -297,6 +298,13 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str
} }
} }
// groupingSets returns a string of comma separated tags for group by clause
// `ts` is always added to the group by clause
func groupingSets(tags ...string) string {
withTs := append(tags, "ts")
return fmt.Sprintf(`GROUPING SETS ( (%s), (%s) )`, strings.Join(withTs, ", "), strings.Join(tags, ", "))
}
// groupBy returns a string of comma separated tags for group by clause // groupBy returns a string of comma separated tags for group by clause
// `ts` is always added to the group by clause // `ts` is always added to the group by clause
func groupBy(tags ...string) string { func groupBy(tags ...string) string {
@ -313,6 +321,14 @@ func groupSelect(tags ...string) string {
return groupTags return groupTags
} }
func groupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string {
groupTags := []string{}
for _, tag := range tags {
groupTags = append(groupTags, tag.Key)
}
return groupingSets(groupTags...)
}
func groupByAttributeKeyTags(tags ...v3.AttributeKey) string { func groupByAttributeKeyTags(tags ...v3.AttributeKey) string {
groupTags := []string{} groupTags := []string{}
for _, tag := range tags { for _, tag := range tags {
@ -346,6 +362,7 @@ func orderBy(items []v3.OrderBy, tags []string) string {
orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag)) orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag))
} }
} }
return strings.Join(orderBy, ",") return strings.Join(orderBy, ",")
} }

View File

@ -238,7 +238,7 @@ func TestBuildQueryOperators(t *testing.T) {
func TestBuildQueryXRate(t *testing.T) { func TestBuildQueryXRate(t *testing.T) {
t.Run("TestBuildQueryXRate", func(t *testing.T) { t.Run("TestBuildQueryXRate", func(t *testing.T) {
tmpl := `SELECT ts, %s(value) as value FROM (SELECT ts, if (runningDifference(value) < 0 OR runningDifference(ts) <= 0, nan, runningDifference(value)/runningDifference(ts))as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 0 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'name') as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY ts ORDER BY ts` tmpl := `SELECT ts, %s(value) as value FROM (SELECT ts, if (runningDifference(value) < 0 OR runningDifference(ts) <= 0, nan, runningDifference(value)/runningDifference(ts))as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 0 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'name') as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY GROUPING SETS ( (ts), () ) ORDER BY ts`
cases := []struct { cases := []struct {
aggregateOperator v3.AggregateOperator aggregateOperator v3.AggregateOperator

View File

@ -578,6 +578,7 @@ type Result struct {
type Series struct { type Series struct {
Labels map[string]string `json:"labels"` Labels map[string]string `json:"labels"`
Points []Point `json:"values"` Points []Point `json:"values"`
GroupingSetsPoint *Point `json:"-"`
} }
func (s *Series) SortPoints() { func (s *Series) SortPoints() {