From 5e89211f53f1ade206ae392fdd563f66dddefb89 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Mon, 17 Jul 2023 21:08:54 +0530 Subject: [PATCH] feat: table view support for cumulative & delta metrics (#3110) --- .../app/metrics/v3/cumulative_table.go | 196 ++++++++++++++++++ .../app/metrics/v3/cumulative_table_test.go | 99 +++++++++ .../app/metrics/v3/delta_table.go | 148 +++++++++++++ .../app/metrics/v3/delta_table_test.go | 99 +++++++++ .../app/metrics/v3/query_builder.go | 12 +- 5 files changed, 552 insertions(+), 2 deletions(-) create mode 100644 pkg/query-service/app/metrics/v3/cumulative_table.go create mode 100644 pkg/query-service/app/metrics/v3/cumulative_table_test.go create mode 100644 pkg/query-service/app/metrics/v3/delta_table.go create mode 100644 pkg/query-service/app/metrics/v3/delta_table_test.go diff --git a/pkg/query-service/app/metrics/v3/cumulative_table.go b/pkg/query-service/app/metrics/v3/cumulative_table.go new file mode 100644 index 0000000000..fbd5c27447 --- /dev/null +++ b/pkg/query-service/app/metrics/v3/cumulative_table.go @@ -0,0 +1,196 @@ +package v3 + +import ( + "fmt" + "math" + + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +// This logic is little convoluted for a reason. +// When we work with cumulative metrics, the table view need to show the data for the entire time range. +// In some cases, we could take the points at the start and end of the time range and divide it by the +// duration. But, the problem is there is no guarantee that the trend will be linear between the start and end. +// We can sum the rate of change for some interval X, this interval can be step size of time series. +// However, the speed of query depends on the number of timestamps, so we bump up the xx the step size. +// This should be a good balance between speed and accuracy. +// TODO: find a better way to do this +func stepForTableCumulative(start, end int64) int64 { + // round up to the nearest multiple of 60 + duration := (end - start + 1) / 1000 + step := math.Max(math.Floor(float64(duration)/120), 60) // assuming 120 max points + if duration > 1800 { // bump for longer duration + step = step * 5 + } + return int64(step) +} + +func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableName string) (string, error) { + + step := stepForTableCumulative(start, end) + + points := ((end - start + 1) / 1000) / step + + metricQueryGroupBy := mq.GroupBy + + // if the aggregate operator is a histogram quantile, and user has not forgotten + // the le tag in the group by then add the le tag to the group by + if mq.AggregateOperator == v3.AggregateOperatorHistQuant50 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant75 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant90 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant95 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant99 { + found := false + for _, tag := range mq.GroupBy { + if tag.Key == "le" { + found = true + break + } + } + if !found { + metricQueryGroupBy = append( + metricQueryGroupBy, + v3.AttributeKey{ + Key: "le", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + }, + ) + } + } + + filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq) + if err != nil { + return "", err + } + + samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + + // Select the aggregate value for interval + queryTmplCounterInner := + "SELECT %s" + + " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " %s as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " GLOBAL INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableTimeFilter + + " GROUP BY %s" + + " ORDER BY %s ts" + + // Select the aggregate value for interval + queryTmpl := + "SELECT %s" + + " toStartOfHour(now()) as ts," + // now() has no menaing & used as a placeholder for ts + " %s as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " GLOBAL INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableTimeFilter + + " GROUP BY %s" + + " ORDER BY %s ts" + + // tagsWithoutLe is used to group by all tags except le + // This is done because we want to group by le only when we are calculating quantile + // Otherwise, we want to group by all tags except le + tagsWithoutLe := []string{} + for _, tag := range mq.GroupBy { + if tag.Key != "le" { + tagsWithoutLe = append(tagsWithoutLe, tag.Key) + } + } + + // orderWithoutLe := orderBy(mq.OrderBy, tagsWithoutLe) + + groupByWithoutLe := groupBy(tagsWithoutLe...) + groupTagsWithoutLe := groupSelect(tagsWithoutLe...) + orderWithoutLe := orderBy(mq.OrderBy, tagsWithoutLe) + + groupBy := groupByAttributeKeyTags(metricQueryGroupBy...) + groupTags := groupSelectAttributeKeyTags(metricQueryGroupBy...) + orderBy := orderByAttributeKeyTags(mq.OrderBy, metricQueryGroupBy) + + if len(orderBy) != 0 { + orderBy += "," + } + if len(orderWithoutLe) != 0 { + orderWithoutLe += "," + } + + switch mq.AggregateOperator { + case v3.AggregateOperatorRate: + return "", fmt.Errorf("rate is not supported for table view") + case v3.AggregateOperatorSumRate, v3.AggregateOperatorAvgRate, v3.AggregateOperatorMaxRate, v3.AggregateOperatorMinRate: + rateGroupBy := "fingerprint, " + groupBy + rateGroupTags := "fingerprint, " + groupTags + rateOrderBy := "fingerprint, " + orderBy + op := "max(value)" + subQuery := fmt.Sprintf( + queryTmplCounterInner, rateGroupTags, step, op, filterSubQuery, rateGroupBy, rateOrderBy, + ) // labels will be same so any should be fine + query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0` + query = fmt.Sprintf(query, groupTags, subQuery) + query = fmt.Sprintf(`SELECT %s toStartOfHour(now()) as ts, %s(value)/%d as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, aggregateOperatorToSQLFunc[mq.AggregateOperator], points, query, groupBy, orderBy) + return query, nil + case + v3.AggregateOperatorRateSum, + v3.AggregateOperatorRateMax, + v3.AggregateOperatorRateAvg, + v3.AggregateOperatorRateMin: + step = ((end - start + 1) / 1000) / 2 + op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) + subQuery := fmt.Sprintf(queryTmplCounterInner, groupTags, step, op, filterSubQuery, groupBy, orderBy) + query := `SELECT %s toStartOfHour(now()) as ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0` + query = fmt.Sprintf(query, groupTags, subQuery) + return query, nil + case + v3.AggregateOperatorP05, + v3.AggregateOperatorP10, + v3.AggregateOperatorP20, + v3.AggregateOperatorP25, + v3.AggregateOperatorP50, + v3.AggregateOperatorP75, + v3.AggregateOperatorP90, + v3.AggregateOperatorP95, + v3.AggregateOperatorP99: + op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99: + rateGroupBy := "fingerprint, " + groupBy + rateGroupTags := "fingerprint, " + groupTags + rateOrderBy := "fingerprint, " + orderBy + op := "max(value)" + subQuery := fmt.Sprintf( + queryTmplCounterInner, rateGroupTags, step, op, filterSubQuery, rateGroupBy, rateOrderBy, + ) // labels will be same so any should be fine + query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WHERE isNaN(value) = 0` + query = fmt.Sprintf(query, groupTags, subQuery) + query = fmt.Sprintf(`SELECT %s toStartOfHour(now()) as ts, sum(value)/%d as value FROM (%s) GROUP BY %s HAVING isNaN(value) = 0 ORDER BY %s ts`, groupTags, points, query, groupBy, orderBy) + value := aggregateOperatorToPercentile[mq.AggregateOperator] + + query = fmt.Sprintf(`SELECT %s toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, orderWithoutLe) + return query, nil + case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: + op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorCount: + op := "toFloat64(count(*))" + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorCountDistinct: + op := "toFloat64(count(distinct(value)))" + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorNoOp: + return "", fmt.Errorf("noop is not supported for table view") + default: + return "", fmt.Errorf("unsupported aggregate operator") + } +} diff --git a/pkg/query-service/app/metrics/v3/cumulative_table_test.go b/pkg/query-service/app/metrics/v3/cumulative_table_test.go new file mode 100644 index 0000000000..6c79c70bde --- /dev/null +++ b/pkg/query-service/app/metrics/v3/cumulative_table_test.go @@ -0,0 +1,99 @@ +package v3 + +import ( + "testing" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestPanelTableForCumulative(t *testing.T) { + cases := []struct { + name string + query *v3.BuilderQuery + expected string + }{ + { + name: "request rate", + query: &v3.BuilderQuery{ + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorSumRate, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_count", + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "service_name"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"frontend"}, + }, + { + Key: v3.AttributeKey{Key: "operation"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"HTTP GET /dispatch"}, + }, + }, + }, + Expression: "A", + }, + expected: "SELECT toStartOfHour(now()) as ts, sum(value)/29 as value FROM (SELECT ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY ts ORDER BY ts", + }, + { + name: "latency p50", + query: &v3.BuilderQuery{ + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorHistQuant50, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "service_name"}, + Operator: v3.FilterOperatorEqual, + Value: "frontend", + }, + }, + }, + Expression: "A", + }, + expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/29 as value FROM (SELECT le, ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WHERE isNaN(value) = 0) GROUP BY le,ts HAVING isNaN(value) = 0 ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts", + }, + { + name: "latency p99 with group by", + query: &v3.BuilderQuery{ + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorHistQuant99, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Cumulative, + GroupBy: []v3.AttributeKey{ + { + Key: "service_name", + }, + }, + Expression: "A", + }, + expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/29 as value FROM (SELECT service_name,le, ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WHERE isNaN(value) = 0) GROUP BY service_name,le,ts HAVING isNaN(value) = 0 ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + query, err := buildMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query, "distributed_time_series_v2") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if query != c.expected { + t.Fatalf("expected: %s, got: %s", c.expected, query) + } + }) + } +} diff --git a/pkg/query-service/app/metrics/v3/delta_table.go b/pkg/query-service/app/metrics/v3/delta_table.go new file mode 100644 index 0000000000..63cbaf72a2 --- /dev/null +++ b/pkg/query-service/app/metrics/v3/delta_table.go @@ -0,0 +1,148 @@ +package v3 + +import ( + "fmt" + "math" + + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableName string) (string, error) { + + // round up to the nearest multiple of 60 + step := int64(math.Ceil(float64(end-start+1)/1000/60) * 60) + + metricQueryGroupBy := mq.GroupBy + + // if the aggregate operator is a histogram quantile, and user has not forgotten + // the le tag in the group by then add the le tag to the group by + if mq.AggregateOperator == v3.AggregateOperatorHistQuant50 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant75 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant90 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant95 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant99 { + found := false + for _, tag := range mq.GroupBy { + if tag.Key == "le" { + found = true + break + } + } + if !found { + metricQueryGroupBy = append( + metricQueryGroupBy, + v3.AttributeKey{ + Key: "le", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + }, + ) + } + } + + filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq) + if err != nil { + return "", err + } + + samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + + queryTmpl := + "SELECT %s toStartOfHour(now()) as ts," + // now() has no menaing & used as a placeholder for ts + " %s as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " GLOBAL INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableTimeFilter + + " GROUP BY %s" + + " ORDER BY %s ts" + + // tagsWithoutLe is used to group by all tags except le + // This is done because we want to group by le only when we are calculating quantile + // Otherwise, we want to group by all tags except le + tagsWithoutLe := []string{} + for _, tag := range mq.GroupBy { + if tag.Key != "le" { + tagsWithoutLe = append(tagsWithoutLe, tag.Key) + } + } + + groupByWithoutLeTable := groupBy(tagsWithoutLe...) + groupTagsWithoutLeTable := groupSelect(tagsWithoutLe...) + orderWithoutLeTable := orderBy(mq.OrderBy, tagsWithoutLe) + + groupBy := groupByAttributeKeyTags(metricQueryGroupBy...) + groupTags := groupSelectAttributeKeyTags(metricQueryGroupBy...) + orderBy := orderByAttributeKeyTags(mq.OrderBy, metricQueryGroupBy) + + if len(orderBy) != 0 { + orderBy += "," + } + if len(orderWithoutLeTable) != 0 { + orderWithoutLeTable += "," + } + + switch mq.AggregateOperator { + case v3.AggregateOperatorRate: + // TODO(srikanthccv): what should be the expected behavior here for metrics? + return "", fmt.Errorf("rate is not supported for table view") + case v3.AggregateOperatorSumRate, v3.AggregateOperatorAvgRate, v3.AggregateOperatorMaxRate, v3.AggregateOperatorMinRate: + op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step) + query := fmt.Sprintf( + queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy, + ) + return query, nil + case + v3.AggregateOperatorRateSum, + v3.AggregateOperatorRateMax, + v3.AggregateOperatorRateAvg, + v3.AggregateOperatorRateMin: + op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step) + query := fmt.Sprintf( + queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy, + ) + return query, nil + case + v3.AggregateOperatorP05, + v3.AggregateOperatorP10, + v3.AggregateOperatorP20, + v3.AggregateOperatorP25, + v3.AggregateOperatorP50, + v3.AggregateOperatorP75, + v3.AggregateOperatorP90, + v3.AggregateOperatorP95, + v3.AggregateOperatorP99: + op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99: + op := fmt.Sprintf("sum(value)/%d", step) + query := fmt.Sprintf( + queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy, + ) // labels will be same so any should be fine + value := aggregateOperatorToPercentile[mq.AggregateOperator] + + query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLeTable, value, query, groupByWithoutLeTable, orderWithoutLeTable) + return query, nil + case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: + op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorCount: + op := "toFloat64(count(*))" + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorCountDistinct: + op := "toFloat64(count(distinct(value)))" + query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorNoOp: + return "", fmt.Errorf("noop is not supported for table view") + default: + return "", fmt.Errorf("unsupported aggregate operator") + } +} diff --git a/pkg/query-service/app/metrics/v3/delta_table_test.go b/pkg/query-service/app/metrics/v3/delta_table_test.go new file mode 100644 index 0000000000..5156c0b71d --- /dev/null +++ b/pkg/query-service/app/metrics/v3/delta_table_test.go @@ -0,0 +1,99 @@ +package v3 + +import ( + "testing" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestPanelTableForDelta(t *testing.T) { + cases := []struct { + name string + query *v3.BuilderQuery + expected string + }{ + { + name: "request rate", + query: &v3.BuilderQuery{ + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorSumRate, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_count", + }, + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "service_name"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"frontend"}, + }, + { + Key: v3.AttributeKey{Key: "operation"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"HTTP GET /dispatch"}, + }, + }, + }, + Expression: "A", + }, + expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY ts ORDER BY ts", + }, + { + name: "latency p50", + query: &v3.BuilderQuery{ + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorHistQuant50, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "service_name"}, + Operator: v3.FilterOperatorEqual, + Value: "frontend", + }, + }, + }, + Expression: "A", + }, + expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts", + }, + { + name: "latency p99 with group by", + query: &v3.BuilderQuery{ + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorHistQuant99, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Delta, + GroupBy: []v3.AttributeKey{ + { + Key: "service_name", + }, + }, + Expression: "A", + }, + expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + query, err := buildDeltaMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query, "distributed_time_series_v2") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if query != c.expected { + t.Fatalf("expected: %s, got: %s", c.expected, query) + } + }) + } +} diff --git a/pkg/query-service/app/metrics/v3/query_builder.go b/pkg/query-service/app/metrics/v3/query_builder.go index 8c04790de1..7641406c34 100644 --- a/pkg/query-service/app/metrics/v3/query_builder.go +++ b/pkg/query-service/app/metrics/v3/query_builder.go @@ -422,9 +422,17 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P var query string var err error if mq.Temporality == v3.Delta { - query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + if panelType == v3.PanelTypeTable { + query, err = buildDeltaMetricQueryForTable(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + } else { + query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + } } else { - query, err = buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + if panelType == v3.PanelTypeTable { + query, err = buildMetricQueryForTable(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + } else { + query, err = buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + } } if err != nil { return "", err