chore: update query builder to use 5min/30min aggregation tables (#5679)

This commit is contained in:
Srikanth Chekuri 2024-10-21 14:22:32 +05:30 committed by GitHub
parent c0e40614bf
commit 28818fbaac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 679 additions and 71 deletions

View File

@ -114,12 +114,14 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
tableName := helpers.WhichSamplesTableToUse(start, end, mq)
// Select the aggregate value for interval // Select the aggregate value for interval
queryTmpl := queryTmpl :=
"SELECT fingerprint, %s" + "SELECT fingerprint, %s" +
" toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
" %s as per_series_value" + " %s as per_series_value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + tableName +
" INNER JOIN" + " INNER JOIN" +
" (%s) as filtered_time_series" + " (%s) as filtered_time_series" +
" USING fingerprint" + " USING fingerprint" +
@ -130,37 +132,30 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy) selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy)
selectLabels := helpers.SelectLabels(mq.GroupBy) selectLabels := helpers.SelectLabels(mq.GroupBy)
op := helpers.AggregationColumnForSamplesTable(start, end, mq)
switch mq.TimeAggregation { switch mq.TimeAggregation {
case v3.TimeAggregationAvg: case v3.TimeAggregationAvg:
op := "avg(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationSum: case v3.TimeAggregationSum:
op := "sum(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationMin: case v3.TimeAggregationMin:
op := "min(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationMax: case v3.TimeAggregationMax:
op := "max(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationCount: case v3.TimeAggregationCount:
op := "count(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationCountDistinct: case v3.TimeAggregationCountDistinct:
op := "count(distinct(value))"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationAnyLast: case v3.TimeAggregationAnyLast:
op := "anyLast(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationRate: case v3.TimeAggregationRate:
op := "max(value)"
innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
rateQueryTmpl := rateQueryTmpl :=
"SELECT %s ts, " + rateWithoutNegative + "SELECT %s ts, " + rateWithoutNegative +
" as per_series_value FROM (%s) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)" " as per_series_value FROM (%s) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)"
subQuery = fmt.Sprintf(rateQueryTmpl, selectLabels, innerSubQuery) subQuery = fmt.Sprintf(rateQueryTmpl, selectLabels, innerSubQuery)
case v3.TimeAggregationIncrease: case v3.TimeAggregationIncrease:
op := "max(value)"
innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
rateQueryTmpl := rateQueryTmpl :=
"SELECT %s ts, " + increaseWithoutNegative + "SELECT %s ts, " + increaseWithoutNegative +

View File

@ -26,12 +26,14 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
tableName := helpers.WhichSamplesTableToUse(start, end, mq)
// Select the aggregate value for interval // Select the aggregate value for interval
queryTmpl := queryTmpl :=
"SELECT fingerprint, %s" + "SELECT fingerprint, %s" +
" toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
" %s as per_series_value" + " %s as per_series_value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + tableName +
" INNER JOIN" + " INNER JOIN" +
" (%s) as filtered_time_series" + " (%s) as filtered_time_series" +
" USING fingerprint" + " USING fingerprint" +
@ -41,33 +43,27 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy) selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy)
op := helpers.AggregationColumnForSamplesTable(start, end, mq)
switch mq.TimeAggregation { switch mq.TimeAggregation {
case v3.TimeAggregationAvg: case v3.TimeAggregationAvg:
op := "avg(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationSum: case v3.TimeAggregationSum:
op := "sum(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationMin: case v3.TimeAggregationMin:
op := "min(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationMax: case v3.TimeAggregationMax:
op := "max(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationCount: case v3.TimeAggregationCount:
op := "count(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationCountDistinct: case v3.TimeAggregationCountDistinct:
op := "count(distinct(value))"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationAnyLast: case v3.TimeAggregationAnyLast:
op := "anyLast(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationRate: case v3.TimeAggregationRate:
op := fmt.Sprintf("sum(value)/%d", step) op := fmt.Sprintf("%s/%d", op, step)
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
case v3.TimeAggregationIncrease: case v3.TimeAggregationIncrease:
op := "sum(value)"
subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery)
} }
return subQuery, nil return subQuery, nil
@ -89,10 +85,8 @@ func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string,
samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
var tableName string = constants.SIGNOZ_SAMPLES_V4_TABLENAME tableName := helpers.WhichSamplesTableToUse(start, end, mq)
if mq.AggregateAttribute.Type == v3.AttributeKeyType(v3.MetricTypeExponentialHistogram) {
tableName = "distributed_exp_hist"
}
// Select the aggregate value for interval // Select the aggregate value for interval
queryTmpl := queryTmpl :=
"SELECT %s" + "SELECT %s" +
@ -108,16 +102,16 @@ func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string,
switch mq.SpaceAggregation { switch mq.SpaceAggregation {
case v3.SpaceAggregationSum: case v3.SpaceAggregationSum:
op := "sum(value)" op := helpers.AggregationColumnForSamplesTable(start, end, mq)
if mq.TimeAggregation == v3.TimeAggregationRate { if mq.TimeAggregation == v3.TimeAggregationRate {
op = "sum(value)/" + fmt.Sprintf("%d", step) op = fmt.Sprintf("%s/%d", op, step)
} }
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy) query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
case v3.SpaceAggregationMin: case v3.SpaceAggregationMin:
op := "min(value)" op := helpers.AggregationColumnForSamplesTable(start, end, mq)
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy) query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
case v3.SpaceAggregationMax: case v3.SpaceAggregationMax:
op := "max(value)" op := helpers.AggregationColumnForSamplesTable(start, end, mq)
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy) query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
case v3.SpaceAggregationPercentile50, case v3.SpaceAggregationPercentile50,
v3.SpaceAggregationPercentile75, v3.SpaceAggregationPercentile75,

View File

@ -13,31 +13,218 @@ import (
var ( var (
sixHoursInMilliseconds = time.Hour.Milliseconds() * 6 sixHoursInMilliseconds = time.Hour.Milliseconds() * 6
oneDayInMilliseconds = time.Hour.Milliseconds() * 24 oneDayInMilliseconds = time.Hour.Milliseconds() * 24
oneWeekInMilliseconds = oneDayInMilliseconds * 7
) )
// start and end are in milliseconds // start and end are in milliseconds
func which(start, end int64) (int64, int64, string) { func whichTSTableToUse(start, end int64) (int64, int64, string) {
// If time range is less than 6 hours, we need to use the `time_series_v4` table // If time range is less than 6 hours, we need to use the `time_series_v4` table
// else if time range is less than 1 day and greater than 6 hours, we need to use the `time_series_v4_6hrs` table // else if time range is less than 1 day and greater than 6 hours, we need to use the `time_series_v4_6hrs` table
// else we need to use the `time_series_v4_1day` table // else if time range is less than 1 week and greater than 1 day, we need to use the `time_series_v4_1day` table
// else we need to use the `time_series_v4_1week` table
var tableName string var tableName string
if end-start <= sixHoursInMilliseconds { if end-start < sixHoursInMilliseconds {
// adjust the start time to nearest 1 hour // adjust the start time to nearest 1 hour
start = start - (start % (time.Hour.Milliseconds() * 1)) start = start - (start % (time.Hour.Milliseconds() * 1))
tableName = constants.SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME tableName = constants.SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME
} else if end-start <= oneDayInMilliseconds { } else if end-start < oneDayInMilliseconds {
// adjust the start time to nearest 6 hours // adjust the start time to nearest 6 hours
start = start - (start % (time.Hour.Milliseconds() * 6)) start = start - (start % (time.Hour.Milliseconds() * 6))
tableName = constants.SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME tableName = constants.SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME
} else { } else if end-start < oneWeekInMilliseconds {
// adjust the start time to nearest 1 day // adjust the start time to nearest 1 day
start = start - (start % (time.Hour.Milliseconds() * 24)) start = start - (start % (time.Hour.Milliseconds() * 24))
tableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME tableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME
} else {
if constants.UseMetricsPreAggregation() {
// adjust the start time to nearest 1 week
start = start - (start % (time.Hour.Milliseconds() * 24 * 7))
tableName = constants.SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME
} else {
// continue to use the 1 day table
start = start - (start % (time.Hour.Milliseconds() * 24))
tableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME
}
} }
return start, end, tableName return start, end, tableName
} }
// start and end are in milliseconds
// we have three tables for samples
// 1. distributed_samples_v4
// 2. distributed_samples_v4_agg_5m - for queries with time range above or equal to 1 day and less than 1 week
// 3. distributed_samples_v4_agg_30m - for queries with time range above or equal to 1 week
// if the `timeAggregation` is `count_distinct` we can't use the aggregated tables because they don't support it
func WhichSamplesTableToUse(start, end int64, mq *v3.BuilderQuery) string {
// we don't have any aggregated table for sketches (yet)
if mq.AggregateAttribute.Type == v3.AttributeKeyType(v3.MetricTypeExponentialHistogram) {
return constants.SIGNOZ_EXP_HISTOGRAM_TABLENAME
}
// continue to use the old table if pre-aggregation is disabled
if !constants.UseMetricsPreAggregation() {
return constants.SIGNOZ_SAMPLES_V4_TABLENAME
}
// if the time aggregation is count_distinct, we need to use the distributed_samples_v4 table
// because the aggregated tables don't support count_distinct
if mq.TimeAggregation == v3.TimeAggregationCountDistinct {
return constants.SIGNOZ_SAMPLES_V4_TABLENAME
}
if end-start < oneDayInMilliseconds {
// if we are dealing with delta metrics and interval is greater than 5 minutes, we can use the 5m aggregated table
// why would interval be greater than 5 minutes?
// we allow people to configure the step interval so we can make use of this
if mq.Temporality == v3.Delta && mq.TimeAggregation == v3.TimeAggregationIncrease && mq.StepInterval >= 300 && mq.StepInterval < 1800 {
return constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME
} else if mq.Temporality == v3.Delta && mq.TimeAggregation == v3.TimeAggregationIncrease && mq.StepInterval >= 1800 {
// if we are dealing with delta metrics and interval is greater than 30 minutes, we can use the 30m aggregated table
return constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME
}
return constants.SIGNOZ_SAMPLES_V4_TABLENAME
} else if end-start < oneWeekInMilliseconds {
return constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME
} else {
return constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME
}
}
func AggregationColumnForSamplesTable(start, end int64, mq *v3.BuilderQuery) string {
tableName := WhichSamplesTableToUse(start, end, mq)
var aggregationColumn string
switch mq.Temporality {
case v3.Delta:
// for delta metrics, we only support `RATE`/`INCREASE` both of which are sum
// although it doesn't make sense to use anyLast, avg, min, max, count on delta metrics,
// we are keeping it here to make sure that query will not be invalid
switch tableName {
case constants.SIGNOZ_SAMPLES_V4_TABLENAME:
switch mq.TimeAggregation {
case v3.TimeAggregationAnyLast:
aggregationColumn = "anyLast(value)"
case v3.TimeAggregationSum:
aggregationColumn = "sum(value)"
case v3.TimeAggregationAvg:
aggregationColumn = "avg(value)"
case v3.TimeAggregationMin:
aggregationColumn = "min(value)"
case v3.TimeAggregationMax:
aggregationColumn = "max(value)"
case v3.TimeAggregationCount:
aggregationColumn = "count(value)"
case v3.TimeAggregationCountDistinct:
aggregationColumn = "countDistinct(value)"
case v3.TimeAggregationRate, v3.TimeAggregationIncrease: // only these two options give meaningful results
aggregationColumn = "sum(value)"
}
case constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME, constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME:
switch mq.TimeAggregation {
case v3.TimeAggregationAnyLast:
aggregationColumn = "anyLast(last)"
case v3.TimeAggregationSum:
aggregationColumn = "sum(sum)"
case v3.TimeAggregationAvg:
aggregationColumn = "sum(sum) / sum(count)"
case v3.TimeAggregationMin:
aggregationColumn = "min(min)"
case v3.TimeAggregationMax:
aggregationColumn = "max(max)"
case v3.TimeAggregationCount:
aggregationColumn = "sum(count)"
// count_distinct is not supported in aggregated tables
case v3.TimeAggregationRate, v3.TimeAggregationIncrease: // only these two options give meaningful results
aggregationColumn = "sum(sum)"
}
}
case v3.Cumulative:
// for cumulative metrics, we only support `RATE`/`INCREASE`. The max value in window is
// used to calculate the sum which is then divided by the window size to get the rate
switch tableName {
case constants.SIGNOZ_SAMPLES_V4_TABLENAME:
switch mq.TimeAggregation {
case v3.TimeAggregationAnyLast:
aggregationColumn = "anyLast(value)"
case v3.TimeAggregationSum:
aggregationColumn = "sum(value)"
case v3.TimeAggregationAvg:
aggregationColumn = "avg(value)"
case v3.TimeAggregationMin:
aggregationColumn = "min(value)"
case v3.TimeAggregationMax:
aggregationColumn = "max(value)"
case v3.TimeAggregationCount:
aggregationColumn = "count(value)"
case v3.TimeAggregationCountDistinct:
aggregationColumn = "countDistinct(value)"
case v3.TimeAggregationRate, v3.TimeAggregationIncrease: // only these two options give meaningful results
aggregationColumn = "max(value)"
}
case constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME, constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME:
switch mq.TimeAggregation {
case v3.TimeAggregationAnyLast:
aggregationColumn = "anyLast(last)"
case v3.TimeAggregationSum:
aggregationColumn = "sum(sum)"
case v3.TimeAggregationAvg:
aggregationColumn = "sum(sum) / sum(count)"
case v3.TimeAggregationMin:
aggregationColumn = "min(min)"
case v3.TimeAggregationMax:
aggregationColumn = "max(max)"
case v3.TimeAggregationCount:
aggregationColumn = "sum(count)"
// count_distinct is not supported in aggregated tables
case v3.TimeAggregationRate, v3.TimeAggregationIncrease: // only these two options give meaningful results
aggregationColumn = "max(max)"
}
}
case v3.Unspecified:
switch tableName {
case constants.SIGNOZ_SAMPLES_V4_TABLENAME:
switch mq.TimeAggregation {
case v3.TimeAggregationAnyLast:
aggregationColumn = "anyLast(value)"
case v3.TimeAggregationSum:
aggregationColumn = "sum(value)"
case v3.TimeAggregationAvg:
aggregationColumn = "avg(value)"
case v3.TimeAggregationMin:
aggregationColumn = "min(value)"
case v3.TimeAggregationMax:
aggregationColumn = "max(value)"
case v3.TimeAggregationCount:
aggregationColumn = "count(value)"
case v3.TimeAggregationCountDistinct:
aggregationColumn = "countDistinct(value)"
case v3.TimeAggregationRate, v3.TimeAggregationIncrease: // ideally, this should never happen
aggregationColumn = "sum(value)"
}
case constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME, constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME:
switch mq.TimeAggregation {
case v3.TimeAggregationAnyLast:
aggregationColumn = "anyLast(last)"
case v3.TimeAggregationSum:
aggregationColumn = "sum(sum)"
case v3.TimeAggregationAvg:
aggregationColumn = "sum(sum) / sum(count)"
case v3.TimeAggregationMin:
aggregationColumn = "min(min)"
case v3.TimeAggregationMax:
aggregationColumn = "max(max)"
case v3.TimeAggregationCount:
aggregationColumn = "sum(count)"
// count_distinct is not supported in aggregated tables
case v3.TimeAggregationRate, v3.TimeAggregationIncrease: // ideally, this should never happen
aggregationColumn = "sum(sum)"
}
}
}
return aggregationColumn
}
// PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria // PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria
func PrepareTimeseriesFilterQuery(start, end int64, mq *v3.BuilderQuery) (string, error) { func PrepareTimeseriesFilterQuery(start, end int64, mq *v3.BuilderQuery) (string, error) {
var conditions []string var conditions []string
@ -47,7 +234,7 @@ func PrepareTimeseriesFilterQuery(start, end int64, mq *v3.BuilderQuery) (string
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key))) conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key)))
conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality)) conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality))
start, end, tableName := which(start, end) start, end, tableName := whichTSTableToUse(start, end)
conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end)) conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end))
@ -127,7 +314,7 @@ func PrepareTimeseriesFilterQueryV3(start, end int64, mq *v3.BuilderQuery) (stri
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key))) conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key)))
conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality)) conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality))
start, end, tableName := which(start, end) start, end, tableName := whichTSTableToUse(start, end)
conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end)) conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end))

View File

@ -0,0 +1,402 @@
package v4
import (
"testing"
"github.com/stretchr/testify/assert"
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestPrepareMetricQueryCumulativeRatePreAgg(t *testing.T) {
t.Setenv("USE_METRICS_PRE_AGGREGATION", "true")
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
expectedQueryContains string
}{
{
name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_calls_total",
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorContains,
Value: "frontend",
},
},
},
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
}},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(max) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
},
{
name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative, multiple group by",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_calls_total",
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{
{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
{
Key: "endpoint",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
expectedQueryContains: "SELECT service_name, endpoint, ts, sum(per_series_value) as value FROM (SELECT service_name, endpoint, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(endpoint) as endpoint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(max) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, endpoint, ts ORDER BY service_name ASC, endpoint ASC, ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
// 1650991982000 - April 26, 2022 10:23:02 PM
// 1651078382000 - April 27, 2022 10:23:02 PM
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}
func TestPrepareMetricQueryDeltaRatePreAgg(t *testing.T) {
t.Setenv("USE_METRICS_PRE_AGGREGATION", "true")
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
expectedQueryContains string
}{
{
name: "test time aggregation = rate, space aggregation = sum, temporality = delta, no group by",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_calls_total",
},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum)/60 as value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY ts ORDER BY ts ASC",
},
{
name: "test time aggregation = rate, space aggregation = sum, temporality = delta, group by service_name",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_calls_total",
},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
}},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum)/60 as value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
// 1650991982000 - April 26, 2022 10:23:02 PM
// 1651078382000 - April 27, 2022 10:23:02 PM
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}
func TestPrepreMetricQueryCumulativeQuantilePreAgg(t *testing.T) {
t.Setenv("USE_METRICS_PRE_AGGREGATION", "true")
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
expectedQueryContains string
}{
{
name: "test temporality = cumulative, quantile = 0.99",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_latency_bucket",
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorContains,
Value: "frontend",
},
},
},
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
}},
Expression: "A",
Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99,
},
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(max) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
},
{
name: "test temporality = cumulative, quantile = 0.99 without group by",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_latency_bucket",
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorContains,
Value: "frontend",
},
},
},
Expression: "A",
Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99,
},
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(max) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
// 1650991982000 - April 26, 2022 10:23:02 PM
// 1651078382000 - April 27, 2022 10:23:02 PM
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}
func TestPrepreMetricQueryDeltaQuantilePreAgg(t *testing.T) {
t.Setenv("USE_METRICS_PRE_AGGREGATION", "true")
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
expectedQueryContains string
}{
{
name: "test temporality = delta, quantile = 0.99 group by service_name",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_latency_bucket",
},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorContains,
Value: "frontend",
},
},
},
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
}},
Expression: "A",
Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99,
},
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum)/60 as value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
},
{
name: "test temporality = delta, quantile = 0.99 no group by",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_latency_bucket",
},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorContains,
Value: "frontend",
},
},
},
Expression: "A",
Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99,
},
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum)/60 as value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
// 1650991982000 - April 26, 2022 10:23:02 PM
// 1651078382000 - April 27, 2022 10:23:02 PM
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}
func TestPrepareMetricQueryGaugePreAgg(t *testing.T) {
t.Setenv("USE_METRICS_PRE_AGGREGATION", "true")
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
expectedQueryContains string
}{
{
name: "test gauge query with no group by",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "system_cpu_usage",
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
Expression: "A",
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum) / sum(count) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC",
},
{
name: "test gauge query with group by host_name",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "system_cpu_usage",
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{{
Key: "host_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
}},
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Expression: "A",
Disabled: false,
},
expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum) / sum(count) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY host_name, ts ORDER BY host_name ASC, ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
// 1650991982000 - April 26, 2022 10:23:02 PM
// 1651078382000 - April 27, 2022 10:23:02 PM
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}

View File

@ -154,6 +154,7 @@ func TestPrepareTimeseriesFilterQuery(t *testing.T) {
} }
func TestPrepareMetricQueryCumulativeRate(t *testing.T) { func TestPrepareMetricQueryCumulativeRate(t *testing.T) {
t.Setenv("USE_METRICS_PRE_AGGREGATION", "false")
testCases := []struct { testCases := []struct {
name string name string
builderQuery *v3.BuilderQuery builderQuery *v3.BuilderQuery
@ -242,6 +243,7 @@ func TestPrepareMetricQueryCumulativeRate(t *testing.T) {
} }
func TestPrepareMetricQueryDeltaRate(t *testing.T) { func TestPrepareMetricQueryDeltaRate(t *testing.T) {
t.Setenv("USE_METRICS_PRE_AGGREGATION", "false")
testCases := []struct { testCases := []struct {
name string name string
builderQuery *v3.BuilderQuery builderQuery *v3.BuilderQuery
@ -266,7 +268,7 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) {
TimeAggregation: v3.TimeAggregationRate, TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum, SpaceAggregation: v3.SpaceAggregationSum,
}, },
expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY ts ORDER BY ts ASC", expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY ts ORDER BY ts ASC",
}, },
{ {
name: "test time aggregation = rate, space aggregation = sum, temporality = delta, group by service_name", name: "test time aggregation = rate, space aggregation = sum, temporality = delta, group by service_name",
@ -292,12 +294,14 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) {
TimeAggregation: v3.TimeAggregationRate, TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum, SpaceAggregation: v3.SpaceAggregationSum,
}, },
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
}, },
} }
for _, testCase := range testCases { for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) { t.Run(testCase.name, func(t *testing.T) {
// 1650991982000 - April 26, 2022 10:23:02 PM
// 1651078382000 - April 27, 2022 10:23:02 PM
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err) assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains) assert.Contains(t, query, testCase.expectedQueryContains)
@ -306,6 +310,7 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) {
} }
func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) { func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) {
t.Setenv("USE_METRICS_PRE_AGGREGATION", "false")
testCases := []struct { testCases := []struct {
name string name string
builderQuery *v3.BuilderQuery builderQuery *v3.BuilderQuery
@ -344,7 +349,7 @@ func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) {
Disabled: false, Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99, SpaceAggregation: v3.SpaceAggregationPercentile99,
}, },
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
}, },
{ {
name: "test temporality = cumulative, quantile = 0.99 without group by", name: "test temporality = cumulative, quantile = 0.99 without group by",
@ -374,12 +379,14 @@ func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) {
Disabled: false, Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99, SpaceAggregation: v3.SpaceAggregationPercentile99,
}, },
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
}, },
} }
for _, testCase := range testCases { for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) { t.Run(testCase.name, func(t *testing.T) {
// 1650991982000 - April 26, 2022 10:23:02 PM
// 1651078382000 - April 27, 2022 10:23:02 PM
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err) assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains) assert.Contains(t, query, testCase.expectedQueryContains)
@ -388,6 +395,7 @@ func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) {
} }
func TestPrepreMetricQueryDeltaQuantile(t *testing.T) { func TestPrepreMetricQueryDeltaQuantile(t *testing.T) {
t.Setenv("USE_METRICS_PRE_AGGREGATION", "false")
testCases := []struct { testCases := []struct {
name string name string
builderQuery *v3.BuilderQuery builderQuery *v3.BuilderQuery
@ -426,7 +434,7 @@ func TestPrepreMetricQueryDeltaQuantile(t *testing.T) {
Disabled: false, Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99, SpaceAggregation: v3.SpaceAggregationPercentile99,
}, },
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
}, },
{ {
name: "test temporality = delta, quantile = 0.99 no group by", name: "test temporality = delta, quantile = 0.99 no group by",
@ -456,12 +464,14 @@ func TestPrepreMetricQueryDeltaQuantile(t *testing.T) {
Disabled: false, Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99, SpaceAggregation: v3.SpaceAggregationPercentile99,
}, },
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
}, },
} }
for _, testCase := range testCases { for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) { t.Run(testCase.name, func(t *testing.T) {
// 1650991982000 - April 26, 2022 10:23:02 PM
// 1651078382000 - April 27, 2022 10:23:02 PM
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err) assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains) assert.Contains(t, query, testCase.expectedQueryContains)
@ -470,6 +480,7 @@ func TestPrepreMetricQueryDeltaQuantile(t *testing.T) {
} }
func TestPrepareMetricQueryGauge(t *testing.T) { func TestPrepareMetricQueryGauge(t *testing.T) {
t.Setenv("USE_METRICS_PRE_AGGREGATION", "false")
testCases := []struct { testCases := []struct {
name string name string
builderQuery *v3.BuilderQuery builderQuery *v3.BuilderQuery
@ -494,7 +505,7 @@ func TestPrepareMetricQueryGauge(t *testing.T) {
SpaceAggregation: v3.SpaceAggregationSum, SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false, Disabled: false,
}, },
expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC",
}, },
{ {
name: "test gauge query with group by host_name", name: "test gauge query with group by host_name",
@ -520,12 +531,14 @@ func TestPrepareMetricQueryGauge(t *testing.T) {
Expression: "A", Expression: "A",
Disabled: false, Disabled: false,
}, },
expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY host_name, ts ORDER BY host_name ASC, ts ASC", expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY host_name, ts ORDER BY host_name ASC, ts ASC",
}, },
} }
for _, testCase := range testCases { for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) { t.Run(testCase.name, func(t *testing.T) {
// 1650991982000 - April 26, 2022 10:23:02 PM
// 1651078382000 - April 27, 2022 10:23:02 PM
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err) assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains) assert.Contains(t, query, testCase.expectedQueryContains)

View File

@ -1134,6 +1134,17 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
query.StepInterval = minStep query.StepInterval = minStep
} }
if query.DataSource == v3.DataSourceMetrics && baseconstants.UseMetricsPreAggregation() {
// if the time range is greater than 1 day, and less than 1 week set the step interval to be multiple of 5 minutes
// if the time range is greater than 1 week, set the step interval to be multiple of 30 mins
start, end := queryRangeParams.Start, queryRangeParams.End
if end-start >= 24*time.Hour.Milliseconds() && end-start < 7*24*time.Hour.Milliseconds() {
query.StepInterval = int64(math.Round(float64(query.StepInterval)/300)) * 300
} else if end-start >= 7*24*time.Hour.Milliseconds() {
query.StepInterval = int64(math.Round(float64(query.StepInterval)/1800)) * 1800
}
}
// Remove the time shift function from the list of functions and set the shift by value // Remove the time shift function from the list of functions and set the shift by value
var timeShiftBy int64 var timeShiftBy int64
if len(query.Functions) > 0 { if len(query.Functions) > 0 {

View File

@ -1416,12 +1416,6 @@ func TestParseQueryRangeParamsStepIntervalAdjustment(t *testing.T) {
end: time.Now().UnixMilli(), end: time.Now().UnixMilli(),
step: 1, // gets updated step: 1, // gets updated
}, },
{
desc: "1 week and 1 minute step",
start: time.Now().Add(-7 * 24 * time.Hour).UnixMilli(),
end: time.Now().UnixMilli(),
step: 60, // gets updated
},
{ {
desc: "1 day and 1 hour step", desc: "1 day and 1 hour step",
start: time.Now().Add(-24 * time.Hour).UnixMilli(), start: time.Now().Add(-24 * time.Hour).UnixMilli(),
@ -1446,12 +1440,6 @@ func TestParseQueryRangeParamsStepIntervalAdjustment(t *testing.T) {
end: time.Now().UnixMilli(), end: time.Now().UnixMilli(),
step: 300, // no update step: 300, // no update
}, },
{
desc: "1 week and 10 minutes step",
start: time.Now().Add(-7 * 24 * time.Hour).UnixMilli(),
end: time.Now().UnixMilli(),
step: 600, // get updated
},
{ {
desc: "1 week and 45 minutes step", desc: "1 week and 45 minutes step",
start: time.Now().Add(-7 * 24 * time.Hour).UnixMilli(), start: time.Now().Add(-7 * 24 * time.Hour).UnixMilli(),

View File

@ -228,8 +228,8 @@ func TestDeltaQueryBuilder(t *testing.T) {
{ {
name: "TestQueryWithName - Request rate", name: "TestQueryWithName - Request rate",
query: &v3.QueryRangeParamsV3{ query: &v3.QueryRangeParamsV3{
Start: 1650991982000, Start: 1650991982000, // 2022-04-25 10:53:02
End: 1651078382000, End: 1651078382000, // 2022-04-26 10:53:02
CompositeQuery: &v3.CompositeQuery{ CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder, QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph, PanelType: v3.PanelTypeGraph,
@ -261,13 +261,13 @@ func TestDeltaQueryBuilder(t *testing.T) {
}, },
}, },
queryToTest: "A", queryToTest: "A",
expected: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts", expected: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts",
}, },
{ {
name: "TestQueryWithExpression - Error rate", name: "TestQueryWithExpression - Error rate",
query: &v3.QueryRangeParamsV3{ query: &v3.QueryRangeParamsV3{
Start: 1650991982000, Start: 1650991982000, // 2022-04-25 10:53:02
End: 1651078382000, End: 1651078382000, // 2022-04-26 10:53:02
CompositeQuery: &v3.CompositeQuery{ CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder, QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph, PanelType: v3.PanelTypeGraph,
@ -331,7 +331,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
}, },
}, },
queryToTest: "C", queryToTest: "C",
expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`", expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`",
}, },
{ {
name: "TestQuery - Quantile", name: "TestQuery - Quantile",
@ -359,7 +359,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
}, },
}, },
queryToTest: "A", queryToTest: "A",
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
}, },
} }

View File

@ -80,6 +80,11 @@ var TimestampSortFeature = GetOrDefaultEnv("TIMESTAMP_SORT_FEATURE", "true")
var PreferRPMFeature = GetOrDefaultEnv("PREFER_RPM_FEATURE", "false") var PreferRPMFeature = GetOrDefaultEnv("PREFER_RPM_FEATURE", "false")
// TODO(srikanthccv): remove after backfilling is done
func UseMetricsPreAggregation() bool {
return GetOrDefaultEnv("USE_METRICS_PRE_AGGREGATION", "true") == "true"
}
var KafkaSpanEval = GetOrDefaultEnv("KAFKA_SPAN_EVAL", "false") var KafkaSpanEval = GetOrDefaultEnv("KAFKA_SPAN_EVAL", "false")
func IsDurationSortFeatureEnabled() bool { func IsDurationSortFeatureEnabled() bool {
@ -222,15 +227,19 @@ var GroupByColMap = map[string]struct{}{
} }
const ( const (
SIGNOZ_METRIC_DBNAME = "signoz_metrics" SIGNOZ_METRIC_DBNAME = "signoz_metrics"
SIGNOZ_SAMPLES_V4_TABLENAME = "distributed_samples_v4" SIGNOZ_SAMPLES_V4_TABLENAME = "distributed_samples_v4"
SIGNOZ_TRACE_DBNAME = "signoz_traces" SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME = "distributed_samples_v4_agg_5m"
SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2" SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME = "distributed_samples_v4_agg_30m"
SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME = "signoz_index_v2" SIGNOZ_EXP_HISTOGRAM_TABLENAME = "distributed_exp_hist"
SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4" SIGNOZ_TRACE_DBNAME = "signoz_traces"
SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs" SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2"
SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day" SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME = "signoz_index_v2"
SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME = "distributed_time_series_v4_1day" SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4"
SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs"
SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day"
SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME = "time_series_v4_1week"
SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME = "distributed_time_series_v4_1day"
) )
var TimeoutExcludedRoutes = map[string]bool{ var TimeoutExcludedRoutes = map[string]bool{

View File

@ -152,6 +152,15 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) (*v3.QueryRangeParamsV3,
if minStep := common.MinAllowedStepInterval(start, end); q.StepInterval < minStep { if minStep := common.MinAllowedStepInterval(start, end); q.StepInterval < minStep {
q.StepInterval = minStep q.StepInterval = minStep
} }
if q.DataSource == v3.DataSourceMetrics && constants.UseMetricsPreAggregation() {
// if the time range is greater than 1 day, and less than 1 week set the step interval to be multiple of 5 minutes
// if the time range is greater than 1 week, set the step interval to be multiple of 30 mins
if end-start >= 24*time.Hour.Milliseconds() && end-start < 7*24*time.Hour.Milliseconds() {
q.StepInterval = int64(math.Round(float64(q.StepInterval)/300)) * 300
} else if end-start >= 7*24*time.Hour.Milliseconds() {
q.StepInterval = int64(math.Round(float64(q.StepInterval)/1800)) * 1800
}
}
} }
} }