mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-16 14:35:58 +08:00
chore: support p9{9,5,0},75,50 for space aggregation (#4382)
This commit is contained in:
parent
7e5cf65ea3
commit
4a4f48cec8
@ -24,7 +24,10 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
|
||||
groupBy := helpers.GroupByAttributeKeyTags(mq.GroupBy...)
|
||||
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
|
||||
|
||||
if mq.Quantile != 0 {
|
||||
var quantile float64
|
||||
|
||||
if v3.IsPercentileOperator(mq.SpaceAggregation) {
|
||||
quantile = v3.GetPercentileFromOperator(mq.SpaceAggregation)
|
||||
// If quantile is set, we need to group by le
|
||||
// and set the space aggregation to sum
|
||||
// and time aggregation to rate
|
||||
@ -57,8 +60,8 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
|
||||
return "", err
|
||||
}
|
||||
|
||||
if mq.Quantile != 0 {
|
||||
query = fmt.Sprintf(`SELECT %s, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s`, groupBy, mq.Quantile, query, groupBy, orderBy)
|
||||
if quantile != 0 {
|
||||
query = fmt.Sprintf(`SELECT %s, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s`, groupBy, quantile, query, groupBy, orderBy)
|
||||
}
|
||||
|
||||
return query, nil
|
||||
|
@ -336,9 +336,9 @@ func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) {
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
}},
|
||||
Expression: "A",
|
||||
Disabled: false,
|
||||
Quantile: 0.99,
|
||||
Expression: "A",
|
||||
Disabled: false,
|
||||
SpaceAggregation: v3.SpaceAggregationPercentile99,
|
||||
},
|
||||
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
|
||||
},
|
||||
@ -366,9 +366,9 @@ func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
Expression: "A",
|
||||
Disabled: false,
|
||||
Quantile: 0.99,
|
||||
Expression: "A",
|
||||
Disabled: false,
|
||||
SpaceAggregation: v3.SpaceAggregationPercentile99,
|
||||
},
|
||||
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
|
||||
},
|
||||
@ -418,9 +418,9 @@ func TestPrepreMetricQueryDeltaQuantile(t *testing.T) {
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
}},
|
||||
Expression: "A",
|
||||
Disabled: false,
|
||||
Quantile: 0.99,
|
||||
Expression: "A",
|
||||
Disabled: false,
|
||||
SpaceAggregation: v3.SpaceAggregationPercentile99,
|
||||
},
|
||||
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
|
||||
},
|
||||
@ -448,9 +448,9 @@ func TestPrepreMetricQueryDeltaQuantile(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
Expression: "A",
|
||||
Disabled: false,
|
||||
Quantile: 0.99,
|
||||
Expression: "A",
|
||||
Disabled: false,
|
||||
SpaceAggregation: v3.SpaceAggregationPercentile99,
|
||||
},
|
||||
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
|
||||
},
|
||||
|
@ -475,14 +475,50 @@ func (t TimeAggregation) IsRateOperator() bool {
|
||||
type SpaceAggregation string
|
||||
|
||||
const (
|
||||
SpaceAggregationUnspecified SpaceAggregation = ""
|
||||
SpaceAggregationSum SpaceAggregation = "sum"
|
||||
SpaceAggregationAvg SpaceAggregation = "avg"
|
||||
SpaceAggregationMin SpaceAggregation = "min"
|
||||
SpaceAggregationMax SpaceAggregation = "max"
|
||||
SpaceAggregationCount SpaceAggregation = "count"
|
||||
SpaceAggregationUnspecified SpaceAggregation = ""
|
||||
SpaceAggregationSum SpaceAggregation = "sum"
|
||||
SpaceAggregationAvg SpaceAggregation = "avg"
|
||||
SpaceAggregationMin SpaceAggregation = "min"
|
||||
SpaceAggregationMax SpaceAggregation = "max"
|
||||
SpaceAggregationCount SpaceAggregation = "count"
|
||||
SpaceAggregationPercentile50 SpaceAggregation = "percentile_50"
|
||||
SpaceAggregationPercentile75 SpaceAggregation = "percentile_75"
|
||||
SpaceAggregationPercentile90 SpaceAggregation = "percentile_90"
|
||||
SpaceAggregationPercentile95 SpaceAggregation = "percentile_95"
|
||||
SpaceAggregationPercentile99 SpaceAggregation = "percentile_99"
|
||||
)
|
||||
|
||||
func IsPercentileOperator(operator SpaceAggregation) bool {
|
||||
switch operator {
|
||||
case SpaceAggregationPercentile50,
|
||||
SpaceAggregationPercentile75,
|
||||
SpaceAggregationPercentile90,
|
||||
SpaceAggregationPercentile95,
|
||||
SpaceAggregationPercentile99:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func GetPercentileFromOperator(operator SpaceAggregation) float64 {
|
||||
// This could be done with a map, but it's just easier to read this way
|
||||
switch operator {
|
||||
case SpaceAggregationPercentile50:
|
||||
return 0.5
|
||||
case SpaceAggregationPercentile75:
|
||||
return 0.75
|
||||
case SpaceAggregationPercentile90:
|
||||
return 0.9
|
||||
case SpaceAggregationPercentile95:
|
||||
return 0.95
|
||||
case SpaceAggregationPercentile99:
|
||||
return 0.99
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
type FunctionName string
|
||||
|
||||
const (
|
||||
@ -550,7 +586,6 @@ type BuilderQuery struct {
|
||||
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
|
||||
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
|
||||
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
|
||||
Quantile float64 `json:"quantile,omitempty"`
|
||||
Functions []Function `json:"functions,omitempty"`
|
||||
}
|
||||
|
||||
@ -569,7 +604,7 @@ func (b *BuilderQuery) Validate() error {
|
||||
return fmt.Errorf("data source is invalid: %w", err)
|
||||
}
|
||||
if b.DataSource == DataSourceMetrics {
|
||||
if b.TimeAggregation == TimeAggregationUnspecified && b.Quantile == 0 {
|
||||
if b.TimeAggregation == TimeAggregationUnspecified {
|
||||
if err := b.AggregateOperator.Validate(); err != nil {
|
||||
return fmt.Errorf("aggregate operator is invalid: %w", err)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user