mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-10 04:49:02 +08:00
chore: add ExponentialHistogram support for metrics v4 query range (#4525)
This commit is contained in:
parent
aa9a3e9349
commit
f734142419
@ -98,6 +98,7 @@ processors:
|
||||
latency_histogram_buckets: [100us, 1ms, 2ms, 6ms, 10ms, 50ms, 100ms, 250ms, 500ms, 1000ms, 1400ms, 2000ms, 5s, 10s, 20s, 40s, 60s ]
|
||||
dimensions_cache_size: 100000
|
||||
aggregation_temporality: AGGREGATION_TEMPORALITY_DELTA
|
||||
enable_exp_histogram: true
|
||||
dimensions:
|
||||
- name: service.namespace
|
||||
default: default
|
||||
|
@ -101,6 +101,7 @@ processors:
|
||||
latency_histogram_buckets: [100us, 1ms, 2ms, 6ms, 10ms, 50ms, 100ms, 250ms, 500ms, 1000ms, 1400ms, 2000ms, 5s, 10s, 20s, 40s, 60s ]
|
||||
dimensions_cache_size: 100000
|
||||
aggregation_temporality: AGGREGATION_TEMPORALITY_DELTA
|
||||
enable_exp_histogram: true
|
||||
dimensions:
|
||||
- name: service.namespace
|
||||
default: default
|
||||
|
@ -212,6 +212,40 @@ func TestPrepareTimeseriesQuery(t *testing.T) {
|
||||
end: 1701796780000,
|
||||
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
|
||||
},
|
||||
{
|
||||
name: "test time aggregation = rate, space aggregation percentile99, type = ExponentialHistogram",
|
||||
builderQuery: &v3.BuilderQuery{
|
||||
QueryName: "A",
|
||||
StepInterval: 60,
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "signoz_latency",
|
||||
DataType: v3.AttributeKeyDataTypeFloat64,
|
||||
Type: v3.AttributeKeyType(v3.MetricTypeExponentialHistogram),
|
||||
IsColumn: true,
|
||||
IsJSON: false,
|
||||
},
|
||||
Temporality: v3.Delta,
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{},
|
||||
},
|
||||
GroupBy: []v3.AttributeKey{
|
||||
{
|
||||
Key: "service_name",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
},
|
||||
},
|
||||
Expression: "A",
|
||||
Disabled: false,
|
||||
TimeAggregation: v3.TimeAggregationRate,
|
||||
SpaceAggregation: v3.SpaceAggregationPercentile99,
|
||||
},
|
||||
start: 1701794980000,
|
||||
end: 1701796780000,
|
||||
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, quantilesDDMerge(0.01, 0.990000)(sketch)[1] as value FROM signoz_metrics.distributed_exp_hist INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'signoz_latency' AND temporality = 'Delta' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
|
@ -9,6 +9,11 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
)
|
||||
|
||||
// TODO(srikanthccv): support multiple quantiles; see https://github.com/SigNoz/signoz/issues/4016#issuecomment-1838583305
|
||||
var (
|
||||
sketchFmt = "quantilesDDMerge(0.01, %f)(sketch)[1]"
|
||||
)
|
||||
|
||||
// prepareTimeAggregationSubQuery builds the sub-query to be used for temporal aggregation
|
||||
func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
||||
|
||||
@ -84,12 +89,16 @@ func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string,
|
||||
|
||||
samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
|
||||
|
||||
var tableName string = constants.SIGNOZ_SAMPLES_V4_TABLENAME
|
||||
if mq.AggregateAttribute.Type == v3.AttributeKeyType(v3.MetricTypeExponentialHistogram) {
|
||||
tableName = "distributed_exp_hist"
|
||||
}
|
||||
// Select the aggregate value for interval
|
||||
queryTmpl :=
|
||||
"SELECT %s" +
|
||||
" toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
|
||||
" %s as value" +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + tableName +
|
||||
" INNER JOIN" +
|
||||
" (%s) as filtered_time_series" +
|
||||
" USING fingerprint" +
|
||||
@ -110,6 +119,13 @@ func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string,
|
||||
case v3.SpaceAggregationMax:
|
||||
op := "max(value)"
|
||||
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
|
||||
case v3.SpaceAggregationPercentile50,
|
||||
v3.SpaceAggregationPercentile75,
|
||||
v3.SpaceAggregationPercentile90,
|
||||
v3.SpaceAggregationPercentile95,
|
||||
v3.SpaceAggregationPercentile99:
|
||||
op := fmt.Sprintf(sketchFmt, v3.GetPercentileFromOperator(mq.SpaceAggregation))
|
||||
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
|
||||
}
|
||||
return query, nil
|
||||
}
|
||||
@ -178,6 +194,9 @@ func PrepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQue
|
||||
// 4. time aggregation = max and space aggregation = max
|
||||
// - max of maxs is same as max of all values
|
||||
//
|
||||
// 5. special case exphist, there is no need for per series/fingerprint aggregation
|
||||
// we can directly use the quantilesDDMerge function
|
||||
//
|
||||
// all of this is true only for delta metrics
|
||||
func canShortCircuit(mq *v3.BuilderQuery) bool {
|
||||
if (mq.TimeAggregation == v3.TimeAggregationRate || mq.TimeAggregation == v3.TimeAggregationIncrease) && mq.SpaceAggregation == v3.SpaceAggregationSum {
|
||||
@ -192,5 +211,8 @@ func canShortCircuit(mq *v3.BuilderQuery) bool {
|
||||
if mq.TimeAggregation == v3.TimeAggregationMax && mq.SpaceAggregation == v3.SpaceAggregationMax {
|
||||
return true
|
||||
}
|
||||
if mq.AggregateAttribute.Type == v3.AttributeKeyType(v3.MetricTypeExponentialHistogram) && v3.IsPercentileOperator(mq.SpaceAggregation) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -37,17 +37,6 @@ func GroupByAttributeKeyTags(tags ...v3.AttributeKey) string {
|
||||
return strings.Join(groupTags, ", ")
|
||||
}
|
||||
|
||||
func GroupByAttributeKeyTagsWithoutLe(tags ...v3.AttributeKey) string {
|
||||
groupTags := []string{}
|
||||
for _, tag := range tags {
|
||||
if tag.Key != "le" {
|
||||
groupTags = append(groupTags, tag.Key)
|
||||
}
|
||||
}
|
||||
groupTags = append(groupTags, "ts")
|
||||
return strings.Join(groupTags, ", ")
|
||||
}
|
||||
|
||||
// OrderByAttributeKeyTags returns a string of comma separated tags for order by clause
|
||||
// if the order is not specified, it defaults to ASC
|
||||
func OrderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string {
|
||||
@ -71,29 +60,6 @@ func OrderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string
|
||||
return strings.Join(orderBy, ", ")
|
||||
}
|
||||
|
||||
func OrderByAttributeKeyTagsWithoutLe(items []v3.OrderBy, tags []v3.AttributeKey) string {
|
||||
var orderBy []string
|
||||
for _, tag := range tags {
|
||||
if tag.Key != "le" {
|
||||
found := false
|
||||
for _, item := range items {
|
||||
if item.ColumnName == tag.Key {
|
||||
found = true
|
||||
orderBy = append(orderBy, fmt.Sprintf("%s %s", item.ColumnName, item.Order))
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag.Key))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
orderBy = append(orderBy, "ts ASC")
|
||||
|
||||
return strings.Join(orderBy, ", ")
|
||||
}
|
||||
|
||||
func SelectLabelsAny(tags []v3.AttributeKey) string {
|
||||
var selectLabelsAny []string
|
||||
for _, tag := range tags {
|
||||
|
@ -489,6 +489,17 @@ func (t TimeAggregation) IsRateOperator() bool {
|
||||
}
|
||||
}
|
||||
|
||||
type MetricType string
|
||||
|
||||
const (
|
||||
MetricTypeUnspecified MetricType = ""
|
||||
MetricTypeSum MetricType = "Sum"
|
||||
MetricTypeGauge MetricType = "Gauge"
|
||||
MetricTypeHistogram MetricType = "Histogram"
|
||||
MetricTypeSummary MetricType = "Summary"
|
||||
MetricTypeExponentialHistogram MetricType = "ExponentialHistogram"
|
||||
)
|
||||
|
||||
type SpaceAggregation string
|
||||
|
||||
const (
|
||||
|
Loading…
x
Reference in New Issue
Block a user