mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-13 21:35:58 +08:00
feat: table view support for cumulative & delta metrics (#3110)
This commit is contained in:
parent
0beffb50ca
commit
5e89211f53
196
pkg/query-service/app/metrics/v3/cumulative_table.go
Normal file
196
pkg/query-service/app/metrics/v3/cumulative_table.go
Normal file
@ -0,0 +1,196 @@
|
||||
package v3
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
)
|
||||
|
||||
// This logic is little convoluted for a reason.
|
||||
// When we work with cumulative metrics, the table view need to show the data for the entire time range.
|
||||
// In some cases, we could take the points at the start and end of the time range and divide it by the
|
||||
// duration. But, the problem is there is no guarantee that the trend will be linear between the start and end.
|
||||
// We can sum the rate of change for some interval X, this interval can be step size of time series.
|
||||
// However, the speed of query depends on the number of timestamps, so we bump up the xx the step size.
|
||||
// This should be a good balance between speed and accuracy.
|
||||
// TODO: find a better way to do this
|
||||
func stepForTableCumulative(start, end int64) int64 {
|
||||
// round up to the nearest multiple of 60
|
||||
duration := (end - start + 1) / 1000
|
||||
step := math.Max(math.Floor(float64(duration)/120), 60) // assuming 120 max points
|
||||
if duration > 1800 { // bump for longer duration
|
||||
step = step * 5
|
||||
}
|
||||
return int64(step)
|
||||
}
|
||||
|
||||
func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableName string) (string, error) {
|
||||
|
||||
step := stepForTableCumulative(start, end)
|
||||
|
||||
points := ((end - start + 1) / 1000) / step
|
||||
|
||||
metricQueryGroupBy := mq.GroupBy
|
||||
|
||||
// if the aggregate operator is a histogram quantile, and user has not forgotten
|
||||
// the le tag in the group by then add the le tag to the group by
|
||||
if mq.AggregateOperator == v3.AggregateOperatorHistQuant50 ||
|
||||
mq.AggregateOperator == v3.AggregateOperatorHistQuant75 ||
|
||||
mq.AggregateOperator == v3.AggregateOperatorHistQuant90 ||
|
||||
mq.AggregateOperator == v3.AggregateOperatorHistQuant95 ||
|
||||
mq.AggregateOperator == v3.AggregateOperatorHistQuant99 {
|
||||
found := false
|
||||
for _, tag := range mq.GroupBy {
|
||||
if tag.Key == "le" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
metricQueryGroupBy = append(
|
||||
metricQueryGroupBy,
|
||||
v3.AttributeKey{
|
||||
Key: "le",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
IsColumn: false,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
|
||||
|
||||
// Select the aggregate value for interval
|
||||
queryTmplCounterInner :=
|
||||
"SELECT %s" +
|
||||
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
|
||||
" %s as value" +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||
" GLOBAL INNER JOIN" +
|
||||
" (%s) as filtered_time_series" +
|
||||
" USING fingerprint" +
|
||||
" WHERE " + samplesTableTimeFilter +
|
||||
" GROUP BY %s" +
|
||||
" ORDER BY %s ts"
|
||||
|
||||
// Select the aggregate value for interval
|
||||
queryTmpl :=
|
||||
"SELECT %s" +
|
||||
" toStartOfHour(now()) as ts," + // now() has no menaing & used as a placeholder for ts
|
||||
" %s as value" +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||
" GLOBAL INNER JOIN" +
|
||||
" (%s) as filtered_time_series" +
|
||||
" USING fingerprint" +
|
||||
" WHERE " + samplesTableTimeFilter +
|
||||
" GROUP BY %s" +
|
||||
" ORDER BY %s ts"
|
||||
|
||||
// tagsWithoutLe is used to group by all tags except le
|
||||
// This is done because we want to group by le only when we are calculating quantile
|
||||
// Otherwise, we want to group by all tags except le
|
||||
tagsWithoutLe := []string{}
|
||||
for _, tag := range mq.GroupBy {
|
||||
if tag.Key != "le" {
|
||||
tagsWithoutLe = append(tagsWithoutLe, tag.Key)
|
||||
}
|
||||
}
|
||||
|
||||
// orderWithoutLe := orderBy(mq.OrderBy, tagsWithoutLe)
|
||||
|
||||
groupByWithoutLe := groupBy(tagsWithoutLe...)
|
||||
groupTagsWithoutLe := groupSelect(tagsWithoutLe...)
|
||||
orderWithoutLe := orderBy(mq.OrderBy, tagsWithoutLe)
|
||||
|
||||
groupBy := groupByAttributeKeyTags(metricQueryGroupBy...)
|
||||
groupTags := groupSelectAttributeKeyTags(metricQueryGroupBy...)
|
||||
orderBy := orderByAttributeKeyTags(mq.OrderBy, metricQueryGroupBy)
|
||||
|
||||
if len(orderBy) != 0 {
|
||||
orderBy += ","
|
||||
}
|
||||
if len(orderWithoutLe) != 0 {
|
||||
orderWithoutLe += ","
|
||||
}
|
||||
|
||||
switch mq.AggregateOperator {
|
||||
case v3.AggregateOperatorRate:
|
||||
return "", fmt.Errorf("rate is not supported for table view")
|
||||
case v3.AggregateOperatorSumRate, v3.AggregateOperatorAvgRate, v3.AggregateOperatorMaxRate, v3.AggregateOperatorMinRate:
|
||||
rateGroupBy := "fingerprint, " + groupBy
|
||||
rateGroupTags := "fingerprint, " + groupTags
|
||||
rateOrderBy := "fingerprint, " + orderBy
|
||||
op := "max(value)"
|
||||
subQuery := fmt.Sprintf(
|
||||
queryTmplCounterInner, rateGroupTags, step, op, filterSubQuery, rateGroupBy, rateOrderBy,
|
||||
) // labels will be same so any should be fine
|
||||
query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0`
|
||||
query = fmt.Sprintf(query, groupTags, subQuery)
|
||||
query = fmt.Sprintf(`SELECT %s toStartOfHour(now()) as ts, %s(value)/%d as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, aggregateOperatorToSQLFunc[mq.AggregateOperator], points, query, groupBy, orderBy)
|
||||
return query, nil
|
||||
case
|
||||
v3.AggregateOperatorRateSum,
|
||||
v3.AggregateOperatorRateMax,
|
||||
v3.AggregateOperatorRateAvg,
|
||||
v3.AggregateOperatorRateMin:
|
||||
step = ((end - start + 1) / 1000) / 2
|
||||
op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator])
|
||||
subQuery := fmt.Sprintf(queryTmplCounterInner, groupTags, step, op, filterSubQuery, groupBy, orderBy)
|
||||
query := `SELECT %s toStartOfHour(now()) as ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0`
|
||||
query = fmt.Sprintf(query, groupTags, subQuery)
|
||||
return query, nil
|
||||
case
|
||||
v3.AggregateOperatorP05,
|
||||
v3.AggregateOperatorP10,
|
||||
v3.AggregateOperatorP20,
|
||||
v3.AggregateOperatorP25,
|
||||
v3.AggregateOperatorP50,
|
||||
v3.AggregateOperatorP75,
|
||||
v3.AggregateOperatorP90,
|
||||
v3.AggregateOperatorP95,
|
||||
v3.AggregateOperatorP99:
|
||||
op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator])
|
||||
query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99:
|
||||
rateGroupBy := "fingerprint, " + groupBy
|
||||
rateGroupTags := "fingerprint, " + groupTags
|
||||
rateOrderBy := "fingerprint, " + orderBy
|
||||
op := "max(value)"
|
||||
subQuery := fmt.Sprintf(
|
||||
queryTmplCounterInner, rateGroupTags, step, op, filterSubQuery, rateGroupBy, rateOrderBy,
|
||||
) // labels will be same so any should be fine
|
||||
query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WHERE isNaN(value) = 0`
|
||||
query = fmt.Sprintf(query, groupTags, subQuery)
|
||||
query = fmt.Sprintf(`SELECT %s toStartOfHour(now()) as ts, sum(value)/%d as value FROM (%s) GROUP BY %s HAVING isNaN(value) = 0 ORDER BY %s ts`, groupTags, points, query, groupBy, orderBy)
|
||||
value := aggregateOperatorToPercentile[mq.AggregateOperator]
|
||||
|
||||
query = fmt.Sprintf(`SELECT %s toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, orderWithoutLe)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax:
|
||||
op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator])
|
||||
query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorCount:
|
||||
op := "toFloat64(count(*))"
|
||||
query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorCountDistinct:
|
||||
op := "toFloat64(count(distinct(value)))"
|
||||
query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorNoOp:
|
||||
return "", fmt.Errorf("noop is not supported for table view")
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported aggregate operator")
|
||||
}
|
||||
}
|
99
pkg/query-service/app/metrics/v3/cumulative_table_test.go
Normal file
99
pkg/query-service/app/metrics/v3/cumulative_table_test.go
Normal file
@ -0,0 +1,99 @@
|
||||
package v3
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
func TestPanelTableForCumulative(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
query *v3.BuilderQuery
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "request rate",
|
||||
query: &v3.BuilderQuery{
|
||||
QueryName: "A",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorSumRate,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "signoz_latency_count",
|
||||
},
|
||||
Temporality: v3.Cumulative,
|
||||
Filters: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{Key: "service_name"},
|
||||
Operator: v3.FilterOperatorIn,
|
||||
Value: []interface{}{"frontend"},
|
||||
},
|
||||
{
|
||||
Key: v3.AttributeKey{Key: "operation"},
|
||||
Operator: v3.FilterOperatorIn,
|
||||
Value: []interface{}{"HTTP GET /dispatch"},
|
||||
},
|
||||
},
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT toStartOfHour(now()) as ts, sum(value)/29 as value FROM (SELECT ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "latency p50",
|
||||
query: &v3.BuilderQuery{
|
||||
QueryName: "A",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorHistQuant50,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "signoz_latency_bucket",
|
||||
},
|
||||
Temporality: v3.Cumulative,
|
||||
Filters: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{Key: "service_name"},
|
||||
Operator: v3.FilterOperatorEqual,
|
||||
Value: "frontend",
|
||||
},
|
||||
},
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/29 as value FROM (SELECT le, ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WHERE isNaN(value) = 0) GROUP BY le,ts HAVING isNaN(value) = 0 ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "latency p99 with group by",
|
||||
query: &v3.BuilderQuery{
|
||||
QueryName: "A",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorHistQuant99,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "signoz_latency_bucket",
|
||||
},
|
||||
Temporality: v3.Cumulative,
|
||||
GroupBy: []v3.AttributeKey{
|
||||
{
|
||||
Key: "service_name",
|
||||
},
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/29 as value FROM (SELECT service_name,le, ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WHERE isNaN(value) = 0) GROUP BY service_name,le,ts HAVING isNaN(value) = 0 ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
query, err := buildMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query, "distributed_time_series_v2")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if query != c.expected {
|
||||
t.Fatalf("expected: %s, got: %s", c.expected, query)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
148
pkg/query-service/app/metrics/v3/delta_table.go
Normal file
148
pkg/query-service/app/metrics/v3/delta_table.go
Normal file
@ -0,0 +1,148 @@
|
||||
package v3
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
)
|
||||
|
||||
func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableName string) (string, error) {
|
||||
|
||||
// round up to the nearest multiple of 60
|
||||
step := int64(math.Ceil(float64(end-start+1)/1000/60) * 60)
|
||||
|
||||
metricQueryGroupBy := mq.GroupBy
|
||||
|
||||
// if the aggregate operator is a histogram quantile, and user has not forgotten
|
||||
// the le tag in the group by then add the le tag to the group by
|
||||
if mq.AggregateOperator == v3.AggregateOperatorHistQuant50 ||
|
||||
mq.AggregateOperator == v3.AggregateOperatorHistQuant75 ||
|
||||
mq.AggregateOperator == v3.AggregateOperatorHistQuant90 ||
|
||||
mq.AggregateOperator == v3.AggregateOperatorHistQuant95 ||
|
||||
mq.AggregateOperator == v3.AggregateOperatorHistQuant99 {
|
||||
found := false
|
||||
for _, tag := range mq.GroupBy {
|
||||
if tag.Key == "le" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
metricQueryGroupBy = append(
|
||||
metricQueryGroupBy,
|
||||
v3.AttributeKey{
|
||||
Key: "le",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
IsColumn: false,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
|
||||
|
||||
queryTmpl :=
|
||||
"SELECT %s toStartOfHour(now()) as ts," + // now() has no menaing & used as a placeholder for ts
|
||||
" %s as value" +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||
" GLOBAL INNER JOIN" +
|
||||
" (%s) as filtered_time_series" +
|
||||
" USING fingerprint" +
|
||||
" WHERE " + samplesTableTimeFilter +
|
||||
" GROUP BY %s" +
|
||||
" ORDER BY %s ts"
|
||||
|
||||
// tagsWithoutLe is used to group by all tags except le
|
||||
// This is done because we want to group by le only when we are calculating quantile
|
||||
// Otherwise, we want to group by all tags except le
|
||||
tagsWithoutLe := []string{}
|
||||
for _, tag := range mq.GroupBy {
|
||||
if tag.Key != "le" {
|
||||
tagsWithoutLe = append(tagsWithoutLe, tag.Key)
|
||||
}
|
||||
}
|
||||
|
||||
groupByWithoutLeTable := groupBy(tagsWithoutLe...)
|
||||
groupTagsWithoutLeTable := groupSelect(tagsWithoutLe...)
|
||||
orderWithoutLeTable := orderBy(mq.OrderBy, tagsWithoutLe)
|
||||
|
||||
groupBy := groupByAttributeKeyTags(metricQueryGroupBy...)
|
||||
groupTags := groupSelectAttributeKeyTags(metricQueryGroupBy...)
|
||||
orderBy := orderByAttributeKeyTags(mq.OrderBy, metricQueryGroupBy)
|
||||
|
||||
if len(orderBy) != 0 {
|
||||
orderBy += ","
|
||||
}
|
||||
if len(orderWithoutLeTable) != 0 {
|
||||
orderWithoutLeTable += ","
|
||||
}
|
||||
|
||||
switch mq.AggregateOperator {
|
||||
case v3.AggregateOperatorRate:
|
||||
// TODO(srikanthccv): what should be the expected behavior here for metrics?
|
||||
return "", fmt.Errorf("rate is not supported for table view")
|
||||
case v3.AggregateOperatorSumRate, v3.AggregateOperatorAvgRate, v3.AggregateOperatorMaxRate, v3.AggregateOperatorMinRate:
|
||||
op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step)
|
||||
query := fmt.Sprintf(
|
||||
queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy,
|
||||
)
|
||||
return query, nil
|
||||
case
|
||||
v3.AggregateOperatorRateSum,
|
||||
v3.AggregateOperatorRateMax,
|
||||
v3.AggregateOperatorRateAvg,
|
||||
v3.AggregateOperatorRateMin:
|
||||
op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step)
|
||||
query := fmt.Sprintf(
|
||||
queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy,
|
||||
)
|
||||
return query, nil
|
||||
case
|
||||
v3.AggregateOperatorP05,
|
||||
v3.AggregateOperatorP10,
|
||||
v3.AggregateOperatorP20,
|
||||
v3.AggregateOperatorP25,
|
||||
v3.AggregateOperatorP50,
|
||||
v3.AggregateOperatorP75,
|
||||
v3.AggregateOperatorP90,
|
||||
v3.AggregateOperatorP95,
|
||||
v3.AggregateOperatorP99:
|
||||
op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator])
|
||||
query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99:
|
||||
op := fmt.Sprintf("sum(value)/%d", step)
|
||||
query := fmt.Sprintf(
|
||||
queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy,
|
||||
) // labels will be same so any should be fine
|
||||
value := aggregateOperatorToPercentile[mq.AggregateOperator]
|
||||
|
||||
query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLeTable, value, query, groupByWithoutLeTable, orderWithoutLeTable)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax:
|
||||
op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator])
|
||||
query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorCount:
|
||||
op := "toFloat64(count(*))"
|
||||
query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorCountDistinct:
|
||||
op := "toFloat64(count(distinct(value)))"
|
||||
query := fmt.Sprintf(queryTmpl, groupTags, op, filterSubQuery, groupBy, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorNoOp:
|
||||
return "", fmt.Errorf("noop is not supported for table view")
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported aggregate operator")
|
||||
}
|
||||
}
|
99
pkg/query-service/app/metrics/v3/delta_table_test.go
Normal file
99
pkg/query-service/app/metrics/v3/delta_table_test.go
Normal file
@ -0,0 +1,99 @@
|
||||
package v3
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
func TestPanelTableForDelta(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
query *v3.BuilderQuery
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "request rate",
|
||||
query: &v3.BuilderQuery{
|
||||
QueryName: "A",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorSumRate,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "signoz_latency_count",
|
||||
},
|
||||
Temporality: v3.Delta,
|
||||
Filters: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{Key: "service_name"},
|
||||
Operator: v3.FilterOperatorIn,
|
||||
Value: []interface{}{"frontend"},
|
||||
},
|
||||
{
|
||||
Key: v3.AttributeKey{Key: "operation"},
|
||||
Operator: v3.FilterOperatorIn,
|
||||
Value: []interface{}{"HTTP GET /dispatch"},
|
||||
},
|
||||
},
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "latency p50",
|
||||
query: &v3.BuilderQuery{
|
||||
QueryName: "A",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorHistQuant50,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "signoz_latency_bucket",
|
||||
},
|
||||
Temporality: v3.Delta,
|
||||
Filters: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{Key: "service_name"},
|
||||
Operator: v3.FilterOperatorEqual,
|
||||
Value: "frontend",
|
||||
},
|
||||
},
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "latency p99 with group by",
|
||||
query: &v3.BuilderQuery{
|
||||
QueryName: "A",
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorHistQuant99,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "signoz_latency_bucket",
|
||||
},
|
||||
Temporality: v3.Delta,
|
||||
GroupBy: []v3.AttributeKey{
|
||||
{
|
||||
Key: "service_name",
|
||||
},
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
query, err := buildDeltaMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query, "distributed_time_series_v2")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if query != c.expected {
|
||||
t.Fatalf("expected: %s, got: %s", c.expected, query)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -422,9 +422,17 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
|
||||
var query string
|
||||
var err error
|
||||
if mq.Temporality == v3.Delta {
|
||||
query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||
if panelType == v3.PanelTypeTable {
|
||||
query, err = buildDeltaMetricQueryForTable(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||
} else {
|
||||
query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||
}
|
||||
} else {
|
||||
query, err = buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||
if panelType == v3.PanelTypeTable {
|
||||
query, err = buildMetricQueryForTable(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||
} else {
|
||||
query, err = buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
Loading…
x
Reference in New Issue
Block a user