220 lines
8.2 KiB
Go

package delta
import (
"fmt"
"github.com/SigNoz/signoz/pkg/query-service/app/metrics/v4/helpers"
"github.com/SigNoz/signoz/pkg/query-service/constants"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils"
)
// TODO(srikanthccv): support multiple quantiles; see https://github.com/SigNoz/signoz/issues/4016#issuecomment-1838583305
var (
sketchFmt = "quantilesDDMerge(0.01, %f)(sketch)[1]"
)
// prepareTimeAggregationSubQuery builds the sub-query to be used for temporal aggregation
func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
var subQuery string
timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(start, end, mq)
if err != nil {
return "", err
}
samplesTableFilter := fmt.Sprintf("metric_name IN %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedMetricNames(mq.AggregateAttribute.Key), start, end)
tableName := helpers.WhichSamplesTableToUse(start, end, mq)
samplesTableFilter = helpers.AddFlagsFilters(samplesTableFilter, tableName)
// Select the aggregate value for interval
queryTmpl :=
"SELECT fingerprint, %s" +
" toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
" %s as per_series_value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + tableName +
" INNER JOIN" +
" (%s) as filtered_time_series" +
" USING fingerprint" +
" WHERE " + samplesTableFilter +
" GROUP BY fingerprint, ts" +
" ORDER BY fingerprint, ts"
selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy)
op := helpers.AggregationColumnForSamplesTable(start, end, mq)
switch mq.TimeAggregation {
case v3.TimeAggregationAvg:
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationSum:
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationMin:
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationMax:
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationCount:
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationCountDistinct:
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationAnyLast:
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationRate:
op := fmt.Sprintf("%s/%d", op, step)
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationIncrease:
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
}
return subQuery, nil
}
// See `canShortCircuit` below for details
func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...)
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
selectLabels := helpers.SelectLabels(mq.GroupBy)
var query string
timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(start, end, mq)
if err != nil {
return "", err
}
samplesTableFilter := fmt.Sprintf("metric_name IN %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedMetricNames(mq.AggregateAttribute.Key), start, end)
tableName := helpers.WhichSamplesTableToUse(start, end, mq)
samplesTableFilter = helpers.AddFlagsFilters(samplesTableFilter, tableName)
// Select the aggregate value for interval
queryTmpl :=
"SELECT %s" +
" toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
" %s as value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + tableName +
" INNER JOIN" +
" (%s) as filtered_time_series" +
" USING fingerprint" +
" WHERE " + samplesTableFilter +
" GROUP BY %s" +
" ORDER BY %s"
switch mq.SpaceAggregation {
case v3.SpaceAggregationSum:
op := helpers.AggregationColumnForSamplesTable(start, end, mq)
if mq.TimeAggregation == v3.TimeAggregationRate {
op = fmt.Sprintf("%s/%d", op, step)
}
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
case v3.SpaceAggregationMin:
op := helpers.AggregationColumnForSamplesTable(start, end, mq)
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
case v3.SpaceAggregationMax:
op := helpers.AggregationColumnForSamplesTable(start, end, mq)
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
case v3.SpaceAggregationPercentile50,
v3.SpaceAggregationPercentile75,
v3.SpaceAggregationPercentile90,
v3.SpaceAggregationPercentile95,
v3.SpaceAggregationPercentile99:
op := fmt.Sprintf(sketchFmt, v3.GetPercentileFromOperator(mq.SpaceAggregation))
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
}
return query, nil
}
// PrepareMetricQueryDeltaTimeSeries builds the query to be used for fetching metrics
func PrepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
if canShortCircuit(mq) {
return prepareQueryOptimized(start, end, step, mq)
}
var query string
temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq)
if err != nil {
return "", err
}
groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...)
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
selectLabels := helpers.GroupByAttributeKeyTags(mq.GroupBy...)
valueFilter := " WHERE isNaN(per_series_value) = 0"
if mq.MetricValueFilter != nil {
valueFilter += fmt.Sprintf(" AND per_series_value = %f", mq.MetricValueFilter.Value)
}
queryTmpl :=
"SELECT %s," +
" %s as value" +
" FROM (%s)" +
valueFilter +
" GROUP BY %s" +
" ORDER BY %s"
switch mq.SpaceAggregation {
case v3.SpaceAggregationAvg:
op := "avg(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
case v3.SpaceAggregationSum:
op := "sum(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
case v3.SpaceAggregationMin:
op := "min(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
case v3.SpaceAggregationMax:
op := "max(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
case v3.SpaceAggregationCount:
op := "count(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
}
return query, nil
}
// canShortCircuit returns true if we can use the optimized query
// for the given query
// This is used to avoid the group by fingerprint thus improving the performance
// for certain queries
// cases where we can short circuit:
// 1. time aggregation = (rate|increase) and space aggregation = sum
// - rate = sum(value)/step, increase = sum(value) - sum of sums is same as sum of all values
//
// 2. time aggregation = sum and space aggregation = sum
// - sum of sums is same as sum of all values
//
// 3. time aggregation = min and space aggregation = min
// - min of mins is same as min of all values
//
// 4. time aggregation = max and space aggregation = max
// - max of maxs is same as max of all values
//
// 5. special case exphist, there is no need for per series/fingerprint aggregation
// we can directly use the quantilesDDMerge function
//
// all of this is true only for delta metrics
func canShortCircuit(mq *v3.BuilderQuery) bool {
if (mq.TimeAggregation == v3.TimeAggregationRate || mq.TimeAggregation == v3.TimeAggregationIncrease) && mq.SpaceAggregation == v3.SpaceAggregationSum {
return true
}
if mq.TimeAggregation == v3.TimeAggregationSum && mq.SpaceAggregation == v3.SpaceAggregationSum {
return true
}
if mq.TimeAggregation == v3.TimeAggregationMin && mq.SpaceAggregation == v3.SpaceAggregationMin {
return true
}
if mq.TimeAggregation == v3.TimeAggregationMax && mq.SpaceAggregation == v3.SpaceAggregationMax {
return true
}
if mq.AggregateAttribute.Type == v3.AttributeKeyType(v3.MetricTypeExponentialHistogram) && v3.IsPercentileOperator(mq.SpaceAggregation) {
return true
}
return false
}