diff --git a/pkg/query-service/app/metrics/v4/cumulative/timeseries.go b/pkg/query-service/app/metrics/v4/cumulative/timeseries.go index 9845096223..cf5020b51d 100644 --- a/pkg/query-service/app/metrics/v4/cumulative/timeseries.go +++ b/pkg/query-service/app/metrics/v4/cumulative/timeseries.go @@ -114,12 +114,14 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + tableName := helpers.WhichSamplesTableToUse(start, end, mq) + // Select the aggregate value for interval queryTmpl := "SELECT fingerprint, %s" + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," + " %s as per_series_value" + - " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + tableName + " INNER JOIN" + " (%s) as filtered_time_series" + " USING fingerprint" + @@ -130,37 +132,30 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy) selectLabels := helpers.SelectLabels(mq.GroupBy) + op := helpers.AggregationColumnForSamplesTable(start, end, mq) + switch mq.TimeAggregation { case v3.TimeAggregationAvg: - op := "avg(value)" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationSum: - op := "sum(value)" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationMin: - op := "min(value)" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationMax: - op := "max(value)" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationCount: - op := "count(value)" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationCountDistinct: - op := "count(distinct(value))" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationAnyLast: - op := "anyLast(value)" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationRate: - op := "max(value)" innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) rateQueryTmpl := "SELECT %s ts, " + rateWithoutNegative + " as per_series_value FROM (%s) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)" subQuery = fmt.Sprintf(rateQueryTmpl, selectLabels, innerSubQuery) case v3.TimeAggregationIncrease: - op := "max(value)" innerSubQuery := fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) rateQueryTmpl := "SELECT %s ts, " + increaseWithoutNegative + diff --git a/pkg/query-service/app/metrics/v4/delta/timeseries.go b/pkg/query-service/app/metrics/v4/delta/timeseries.go index 365b09c56d..0f6cbf82fb 100644 --- a/pkg/query-service/app/metrics/v4/delta/timeseries.go +++ b/pkg/query-service/app/metrics/v4/delta/timeseries.go @@ -26,12 +26,14 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + tableName := helpers.WhichSamplesTableToUse(start, end, mq) + // Select the aggregate value for interval queryTmpl := "SELECT fingerprint, %s" + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," + " %s as per_series_value" + - " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + tableName + " INNER JOIN" + " (%s) as filtered_time_series" + " USING fingerprint" + @@ -41,33 +43,27 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy) + op := helpers.AggregationColumnForSamplesTable(start, end, mq) + switch mq.TimeAggregation { case v3.TimeAggregationAvg: - op := "avg(value)" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationSum: - op := "sum(value)" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationMin: - op := "min(value)" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationMax: - op := "max(value)" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationCount: - op := "count(value)" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationCountDistinct: - op := "count(distinct(value))" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationAnyLast: - op := "anyLast(value)" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationRate: - op := fmt.Sprintf("sum(value)/%d", step) + op := fmt.Sprintf("%s/%d", op, step) subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) case v3.TimeAggregationIncrease: - op := "sum(value)" subQuery = fmt.Sprintf(queryTmpl, selectLabelsAny, step, op, timeSeriesSubQuery) } return subQuery, nil @@ -89,10 +85,8 @@ func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string, samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) - var tableName string = constants.SIGNOZ_SAMPLES_V4_TABLENAME - if mq.AggregateAttribute.Type == v3.AttributeKeyType(v3.MetricTypeExponentialHistogram) { - tableName = "distributed_exp_hist" - } + tableName := helpers.WhichSamplesTableToUse(start, end, mq) + // Select the aggregate value for interval queryTmpl := "SELECT %s" + @@ -108,16 +102,16 @@ func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string, switch mq.SpaceAggregation { case v3.SpaceAggregationSum: - op := "sum(value)" + op := helpers.AggregationColumnForSamplesTable(start, end, mq) if mq.TimeAggregation == v3.TimeAggregationRate { - op = "sum(value)/" + fmt.Sprintf("%d", step) + op = fmt.Sprintf("%s/%d", op, step) } query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy) case v3.SpaceAggregationMin: - op := "min(value)" + op := helpers.AggregationColumnForSamplesTable(start, end, mq) query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy) case v3.SpaceAggregationMax: - op := "max(value)" + op := helpers.AggregationColumnForSamplesTable(start, end, mq) query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy) case v3.SpaceAggregationPercentile50, v3.SpaceAggregationPercentile75, diff --git a/pkg/query-service/app/metrics/v4/helpers/sub_query.go b/pkg/query-service/app/metrics/v4/helpers/sub_query.go index e1edc5a964..87589ddb9c 100644 --- a/pkg/query-service/app/metrics/v4/helpers/sub_query.go +++ b/pkg/query-service/app/metrics/v4/helpers/sub_query.go @@ -13,31 +13,218 @@ import ( var ( sixHoursInMilliseconds = time.Hour.Milliseconds() * 6 oneDayInMilliseconds = time.Hour.Milliseconds() * 24 + oneWeekInMilliseconds = oneDayInMilliseconds * 7 ) // start and end are in milliseconds -func which(start, end int64) (int64, int64, string) { +func whichTSTableToUse(start, end int64) (int64, int64, string) { // If time range is less than 6 hours, we need to use the `time_series_v4` table // else if time range is less than 1 day and greater than 6 hours, we need to use the `time_series_v4_6hrs` table - // else we need to use the `time_series_v4_1day` table + // else if time range is less than 1 week and greater than 1 day, we need to use the `time_series_v4_1day` table + // else we need to use the `time_series_v4_1week` table var tableName string - if end-start <= sixHoursInMilliseconds { + if end-start < sixHoursInMilliseconds { // adjust the start time to nearest 1 hour start = start - (start % (time.Hour.Milliseconds() * 1)) tableName = constants.SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME - } else if end-start <= oneDayInMilliseconds { + } else if end-start < oneDayInMilliseconds { // adjust the start time to nearest 6 hours start = start - (start % (time.Hour.Milliseconds() * 6)) tableName = constants.SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME - } else { + } else if end-start < oneWeekInMilliseconds { // adjust the start time to nearest 1 day start = start - (start % (time.Hour.Milliseconds() * 24)) tableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME + } else { + if constants.UseMetricsPreAggregation() { + // adjust the start time to nearest 1 week + start = start - (start % (time.Hour.Milliseconds() * 24 * 7)) + tableName = constants.SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME + } else { + // continue to use the 1 day table + start = start - (start % (time.Hour.Milliseconds() * 24)) + tableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME + } } return start, end, tableName } +// start and end are in milliseconds +// we have three tables for samples +// 1. distributed_samples_v4 +// 2. distributed_samples_v4_agg_5m - for queries with time range above or equal to 1 day and less than 1 week +// 3. distributed_samples_v4_agg_30m - for queries with time range above or equal to 1 week +// if the `timeAggregation` is `count_distinct` we can't use the aggregated tables because they don't support it +func WhichSamplesTableToUse(start, end int64, mq *v3.BuilderQuery) string { + + // we don't have any aggregated table for sketches (yet) + if mq.AggregateAttribute.Type == v3.AttributeKeyType(v3.MetricTypeExponentialHistogram) { + return constants.SIGNOZ_EXP_HISTOGRAM_TABLENAME + } + + // continue to use the old table if pre-aggregation is disabled + if !constants.UseMetricsPreAggregation() { + return constants.SIGNOZ_SAMPLES_V4_TABLENAME + } + + // if the time aggregation is count_distinct, we need to use the distributed_samples_v4 table + // because the aggregated tables don't support count_distinct + if mq.TimeAggregation == v3.TimeAggregationCountDistinct { + return constants.SIGNOZ_SAMPLES_V4_TABLENAME + } + + if end-start < oneDayInMilliseconds { + // if we are dealing with delta metrics and interval is greater than 5 minutes, we can use the 5m aggregated table + // why would interval be greater than 5 minutes? + // we allow people to configure the step interval so we can make use of this + if mq.Temporality == v3.Delta && mq.TimeAggregation == v3.TimeAggregationIncrease && mq.StepInterval >= 300 && mq.StepInterval < 1800 { + return constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME + } else if mq.Temporality == v3.Delta && mq.TimeAggregation == v3.TimeAggregationIncrease && mq.StepInterval >= 1800 { + // if we are dealing with delta metrics and interval is greater than 30 minutes, we can use the 30m aggregated table + return constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME + } + return constants.SIGNOZ_SAMPLES_V4_TABLENAME + } else if end-start < oneWeekInMilliseconds { + return constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME + } else { + return constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME + } +} + +func AggregationColumnForSamplesTable(start, end int64, mq *v3.BuilderQuery) string { + tableName := WhichSamplesTableToUse(start, end, mq) + var aggregationColumn string + switch mq.Temporality { + case v3.Delta: + // for delta metrics, we only support `RATE`/`INCREASE` both of which are sum + // although it doesn't make sense to use anyLast, avg, min, max, count on delta metrics, + // we are keeping it here to make sure that query will not be invalid + switch tableName { + case constants.SIGNOZ_SAMPLES_V4_TABLENAME: + switch mq.TimeAggregation { + case v3.TimeAggregationAnyLast: + aggregationColumn = "anyLast(value)" + case v3.TimeAggregationSum: + aggregationColumn = "sum(value)" + case v3.TimeAggregationAvg: + aggregationColumn = "avg(value)" + case v3.TimeAggregationMin: + aggregationColumn = "min(value)" + case v3.TimeAggregationMax: + aggregationColumn = "max(value)" + case v3.TimeAggregationCount: + aggregationColumn = "count(value)" + case v3.TimeAggregationCountDistinct: + aggregationColumn = "countDistinct(value)" + case v3.TimeAggregationRate, v3.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "sum(value)" + } + case constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME, constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME: + switch mq.TimeAggregation { + case v3.TimeAggregationAnyLast: + aggregationColumn = "anyLast(last)" + case v3.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case v3.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case v3.TimeAggregationMin: + aggregationColumn = "min(min)" + case v3.TimeAggregationMax: + aggregationColumn = "max(max)" + case v3.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case v3.TimeAggregationRate, v3.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "sum(sum)" + } + } + case v3.Cumulative: + // for cumulative metrics, we only support `RATE`/`INCREASE`. The max value in window is + // used to calculate the sum which is then divided by the window size to get the rate + switch tableName { + case constants.SIGNOZ_SAMPLES_V4_TABLENAME: + switch mq.TimeAggregation { + case v3.TimeAggregationAnyLast: + aggregationColumn = "anyLast(value)" + case v3.TimeAggregationSum: + aggregationColumn = "sum(value)" + case v3.TimeAggregationAvg: + aggregationColumn = "avg(value)" + case v3.TimeAggregationMin: + aggregationColumn = "min(value)" + case v3.TimeAggregationMax: + aggregationColumn = "max(value)" + case v3.TimeAggregationCount: + aggregationColumn = "count(value)" + case v3.TimeAggregationCountDistinct: + aggregationColumn = "countDistinct(value)" + case v3.TimeAggregationRate, v3.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "max(value)" + } + case constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME, constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME: + switch mq.TimeAggregation { + case v3.TimeAggregationAnyLast: + aggregationColumn = "anyLast(last)" + case v3.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case v3.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case v3.TimeAggregationMin: + aggregationColumn = "min(min)" + case v3.TimeAggregationMax: + aggregationColumn = "max(max)" + case v3.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case v3.TimeAggregationRate, v3.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "max(max)" + } + } + case v3.Unspecified: + switch tableName { + case constants.SIGNOZ_SAMPLES_V4_TABLENAME: + switch mq.TimeAggregation { + case v3.TimeAggregationAnyLast: + aggregationColumn = "anyLast(value)" + case v3.TimeAggregationSum: + aggregationColumn = "sum(value)" + case v3.TimeAggregationAvg: + aggregationColumn = "avg(value)" + case v3.TimeAggregationMin: + aggregationColumn = "min(value)" + case v3.TimeAggregationMax: + aggregationColumn = "max(value)" + case v3.TimeAggregationCount: + aggregationColumn = "count(value)" + case v3.TimeAggregationCountDistinct: + aggregationColumn = "countDistinct(value)" + case v3.TimeAggregationRate, v3.TimeAggregationIncrease: // ideally, this should never happen + aggregationColumn = "sum(value)" + } + case constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME, constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME: + switch mq.TimeAggregation { + case v3.TimeAggregationAnyLast: + aggregationColumn = "anyLast(last)" + case v3.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case v3.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case v3.TimeAggregationMin: + aggregationColumn = "min(min)" + case v3.TimeAggregationMax: + aggregationColumn = "max(max)" + case v3.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case v3.TimeAggregationRate, v3.TimeAggregationIncrease: // ideally, this should never happen + aggregationColumn = "sum(sum)" + } + } + } + return aggregationColumn +} + // PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria func PrepareTimeseriesFilterQuery(start, end int64, mq *v3.BuilderQuery) (string, error) { var conditions []string @@ -47,7 +234,7 @@ func PrepareTimeseriesFilterQuery(start, end int64, mq *v3.BuilderQuery) (string conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key))) conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality)) - start, end, tableName := which(start, end) + start, end, tableName := whichTSTableToUse(start, end) conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end)) @@ -127,7 +314,7 @@ func PrepareTimeseriesFilterQueryV3(start, end int64, mq *v3.BuilderQuery) (stri conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key))) conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality)) - start, end, tableName := which(start, end) + start, end, tableName := whichTSTableToUse(start, end) conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end)) diff --git a/pkg/query-service/app/metrics/v4/query_builder_pre_agg_test.go b/pkg/query-service/app/metrics/v4/query_builder_pre_agg_test.go new file mode 100644 index 0000000000..121324a8da --- /dev/null +++ b/pkg/query-service/app/metrics/v4/query_builder_pre_agg_test.go @@ -0,0 +1,402 @@ +package v4 + +import ( + "testing" + + "github.com/stretchr/testify/assert" + metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestPrepareMetricQueryCumulativeRatePreAgg(t *testing.T) { + t.Setenv("USE_METRICS_PRE_AGGREGATION", "true") + testCases := []struct { + name string + builderQuery *v3.BuilderQuery + expectedQueryContains string + }{ + { + name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_calls_total", + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "frontend", + }, + }, + }, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }}, + Expression: "A", + Disabled: false, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + }, + expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(max) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", + }, + { + name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative, multiple group by", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_calls_total", + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "endpoint", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, + Expression: "A", + Disabled: false, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + }, + expectedQueryContains: "SELECT service_name, endpoint, ts, sum(per_series_value) as value FROM (SELECT service_name, endpoint, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(endpoint) as endpoint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(max) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, endpoint, ts ORDER BY service_name ASC, endpoint ASC, ts ASC", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + // 1650991982000 - April 26, 2022 10:23:02 PM + // 1651078382000 - April 27, 2022 10:23:02 PM + query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) + assert.Nil(t, err) + assert.Contains(t, query, testCase.expectedQueryContains) + }) + } +} + +func TestPrepareMetricQueryDeltaRatePreAgg(t *testing.T) { + t.Setenv("USE_METRICS_PRE_AGGREGATION", "true") + testCases := []struct { + name string + builderQuery *v3.BuilderQuery + expectedQueryContains string + }{ + { + name: "test time aggregation = rate, space aggregation = sum, temporality = delta, no group by", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_calls_total", + }, + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + Expression: "A", + Disabled: false, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + }, + expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum)/60 as value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY ts ORDER BY ts ASC", + }, + { + name: "test time aggregation = rate, space aggregation = sum, temporality = delta, group by service_name", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_calls_total", + }, + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }}, + Expression: "A", + Disabled: false, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + }, + expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum)/60 as value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + // 1650991982000 - April 26, 2022 10:23:02 PM + // 1651078382000 - April 27, 2022 10:23:02 PM + query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) + assert.Nil(t, err) + assert.Contains(t, query, testCase.expectedQueryContains) + }) + } +} + +func TestPrepreMetricQueryCumulativeQuantilePreAgg(t *testing.T) { + t.Setenv("USE_METRICS_PRE_AGGREGATION", "true") + testCases := []struct { + name string + builderQuery *v3.BuilderQuery + expectedQueryContains string + }{ + { + name: "test temporality = cumulative, quantile = 0.99", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "frontend", + }, + }, + }, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }}, + Expression: "A", + Disabled: false, + SpaceAggregation: v3.SpaceAggregationPercentile99, + }, + expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(max) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", + }, + { + name: "test temporality = cumulative, quantile = 0.99 without group by", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "frontend", + }, + }, + }, + Expression: "A", + Disabled: false, + SpaceAggregation: v3.SpaceAggregationPercentile99, + }, + expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(max) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + // 1650991982000 - April 26, 2022 10:23:02 PM + // 1651078382000 - April 27, 2022 10:23:02 PM + query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) + assert.Nil(t, err) + assert.Contains(t, query, testCase.expectedQueryContains) + }) + } +} + +func TestPrepreMetricQueryDeltaQuantilePreAgg(t *testing.T) { + t.Setenv("USE_METRICS_PRE_AGGREGATION", "true") + testCases := []struct { + name string + builderQuery *v3.BuilderQuery + expectedQueryContains string + }{ + { + name: "test temporality = delta, quantile = 0.99 group by service_name", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "frontend", + }, + }, + }, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }}, + Expression: "A", + Disabled: false, + SpaceAggregation: v3.SpaceAggregationPercentile99, + }, + expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum)/60 as value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", + }, + { + name: "test temporality = delta, quantile = 0.99 no group by", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "signoz_latency_bucket", + }, + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "frontend", + }, + }, + }, + Expression: "A", + Disabled: false, + SpaceAggregation: v3.SpaceAggregationPercentile99, + }, + expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum)/60 as value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + // 1650991982000 - April 26, 2022 10:23:02 PM + // 1651078382000 - April 27, 2022 10:23:02 PM + query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) + assert.Nil(t, err) + assert.Contains(t, query, testCase.expectedQueryContains) + }) + } +} + +func TestPrepareMetricQueryGaugePreAgg(t *testing.T) { + t.Setenv("USE_METRICS_PRE_AGGREGATION", "true") + testCases := []struct { + name string + builderQuery *v3.BuilderQuery + expectedQueryContains string + }{ + { + name: "test gauge query with no group by", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_usage", + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + Expression: "A", + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum) / sum(count) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", + }, + { + name: "test gauge query with group by host_name", + builderQuery: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_usage", + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{{ + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }}, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Expression: "A", + Disabled: false, + }, + expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(sum) / sum(count) as per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY host_name, ts ORDER BY host_name ASC, ts ASC", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + // 1650991982000 - April 26, 2022 10:23:02 PM + // 1651078382000 - April 27, 2022 10:23:02 PM + query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) + assert.Nil(t, err) + assert.Contains(t, query, testCase.expectedQueryContains) + }) + } +} diff --git a/pkg/query-service/app/metrics/v4/query_builder_test.go b/pkg/query-service/app/metrics/v4/query_builder_test.go index 08d0f087ff..2fc83a9e1f 100644 --- a/pkg/query-service/app/metrics/v4/query_builder_test.go +++ b/pkg/query-service/app/metrics/v4/query_builder_test.go @@ -154,6 +154,7 @@ func TestPrepareTimeseriesFilterQuery(t *testing.T) { } func TestPrepareMetricQueryCumulativeRate(t *testing.T) { + t.Setenv("USE_METRICS_PRE_AGGREGATION", "false") testCases := []struct { name string builderQuery *v3.BuilderQuery @@ -242,6 +243,7 @@ func TestPrepareMetricQueryCumulativeRate(t *testing.T) { } func TestPrepareMetricQueryDeltaRate(t *testing.T) { + t.Setenv("USE_METRICS_PRE_AGGREGATION", "false") testCases := []struct { name string builderQuery *v3.BuilderQuery @@ -266,7 +268,7 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) { TimeAggregation: v3.TimeAggregationRate, SpaceAggregation: v3.SpaceAggregationSum, }, - expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY ts ORDER BY ts ASC", + expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY ts ORDER BY ts ASC", }, { name: "test time aggregation = rate, space aggregation = sum, temporality = delta, group by service_name", @@ -292,12 +294,14 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) { TimeAggregation: v3.TimeAggregationRate, SpaceAggregation: v3.SpaceAggregationSum, }, - expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", + expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { + // 1650991982000 - April 26, 2022 10:23:02 PM + // 1651078382000 - April 27, 2022 10:23:02 PM query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) assert.Nil(t, err) assert.Contains(t, query, testCase.expectedQueryContains) @@ -306,6 +310,7 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) { } func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) { + t.Setenv("USE_METRICS_PRE_AGGREGATION", "false") testCases := []struct { name string builderQuery *v3.BuilderQuery @@ -344,7 +349,7 @@ func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) { Disabled: false, SpaceAggregation: v3.SpaceAggregationPercentile99, }, - expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", + expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", }, { name: "test temporality = cumulative, quantile = 0.99 without group by", @@ -374,12 +379,14 @@ func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) { Disabled: false, SpaceAggregation: v3.SpaceAggregationPercentile99, }, - expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", + expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { + // 1650991982000 - April 26, 2022 10:23:02 PM + // 1651078382000 - April 27, 2022 10:23:02 PM query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) assert.Nil(t, err) assert.Contains(t, query, testCase.expectedQueryContains) @@ -388,6 +395,7 @@ func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) { } func TestPrepreMetricQueryDeltaQuantile(t *testing.T) { + t.Setenv("USE_METRICS_PRE_AGGREGATION", "false") testCases := []struct { name string builderQuery *v3.BuilderQuery @@ -426,7 +434,7 @@ func TestPrepreMetricQueryDeltaQuantile(t *testing.T) { Disabled: false, SpaceAggregation: v3.SpaceAggregationPercentile99, }, - expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", + expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY service_name, le, ts ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", }, { name: "test temporality = delta, quantile = 0.99 no group by", @@ -456,12 +464,14 @@ func TestPrepreMetricQueryDeltaQuantile(t *testing.T) { Disabled: false, SpaceAggregation: v3.SpaceAggregationPercentile99, }, - expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", + expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY le, ts ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { + // 1650991982000 - April 26, 2022 10:23:02 PM + // 1651078382000 - April 27, 2022 10:23:02 PM query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) assert.Nil(t, err) assert.Contains(t, query, testCase.expectedQueryContains) @@ -470,6 +480,7 @@ func TestPrepreMetricQueryDeltaQuantile(t *testing.T) { } func TestPrepareMetricQueryGauge(t *testing.T) { + t.Setenv("USE_METRICS_PRE_AGGREGATION", "false") testCases := []struct { name string builderQuery *v3.BuilderQuery @@ -494,7 +505,7 @@ func TestPrepareMetricQueryGauge(t *testing.T) { SpaceAggregation: v3.SpaceAggregationSum, Disabled: false, }, - expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", + expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", }, { name: "test gauge query with group by host_name", @@ -520,12 +531,14 @@ func TestPrepareMetricQueryGauge(t *testing.T) { Expression: "A", Disabled: false, }, - expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY host_name, ts ORDER BY host_name ASC, ts ASC", + expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY host_name, ts ORDER BY host_name ASC, ts ASC", }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { + // 1650991982000 - April 26, 2022 10:23:02 PM + // 1651078382000 - April 27, 2022 10:23:02 PM query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) assert.Nil(t, err) assert.Contains(t, query, testCase.expectedQueryContains) diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index b8df9be9d6..17c3bfc4a5 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -1134,6 +1134,17 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE query.StepInterval = minStep } + if query.DataSource == v3.DataSourceMetrics && baseconstants.UseMetricsPreAggregation() { + // if the time range is greater than 1 day, and less than 1 week set the step interval to be multiple of 5 minutes + // if the time range is greater than 1 week, set the step interval to be multiple of 30 mins + start, end := queryRangeParams.Start, queryRangeParams.End + if end-start >= 24*time.Hour.Milliseconds() && end-start < 7*24*time.Hour.Milliseconds() { + query.StepInterval = int64(math.Round(float64(query.StepInterval)/300)) * 300 + } else if end-start >= 7*24*time.Hour.Milliseconds() { + query.StepInterval = int64(math.Round(float64(query.StepInterval)/1800)) * 1800 + } + } + // Remove the time shift function from the list of functions and set the shift by value var timeShiftBy int64 if len(query.Functions) > 0 { diff --git a/pkg/query-service/app/parser_test.go b/pkg/query-service/app/parser_test.go index 9d58a190f7..a5367b70ff 100644 --- a/pkg/query-service/app/parser_test.go +++ b/pkg/query-service/app/parser_test.go @@ -1416,12 +1416,6 @@ func TestParseQueryRangeParamsStepIntervalAdjustment(t *testing.T) { end: time.Now().UnixMilli(), step: 1, // gets updated }, - { - desc: "1 week and 1 minute step", - start: time.Now().Add(-7 * 24 * time.Hour).UnixMilli(), - end: time.Now().UnixMilli(), - step: 60, // gets updated - }, { desc: "1 day and 1 hour step", start: time.Now().Add(-24 * time.Hour).UnixMilli(), @@ -1446,12 +1440,6 @@ func TestParseQueryRangeParamsStepIntervalAdjustment(t *testing.T) { end: time.Now().UnixMilli(), step: 300, // no update }, - { - desc: "1 week and 10 minutes step", - start: time.Now().Add(-7 * 24 * time.Hour).UnixMilli(), - end: time.Now().UnixMilli(), - step: 600, // get updated - }, { desc: "1 week and 45 minutes step", start: time.Now().Add(-7 * 24 * time.Hour).UnixMilli(), diff --git a/pkg/query-service/app/queryBuilder/query_builder_test.go b/pkg/query-service/app/queryBuilder/query_builder_test.go index 52af7af780..f7538f0efb 100644 --- a/pkg/query-service/app/queryBuilder/query_builder_test.go +++ b/pkg/query-service/app/queryBuilder/query_builder_test.go @@ -228,8 +228,8 @@ func TestDeltaQueryBuilder(t *testing.T) { { name: "TestQueryWithName - Request rate", query: &v3.QueryRangeParamsV3{ - Start: 1650991982000, - End: 1651078382000, + Start: 1650991982000, // 2022-04-25 10:53:02 + End: 1651078382000, // 2022-04-26 10:53:02 CompositeQuery: &v3.CompositeQuery{ QueryType: v3.QueryTypeBuilder, PanelType: v3.PanelTypeGraph, @@ -261,13 +261,13 @@ func TestDeltaQueryBuilder(t *testing.T) { }, }, queryToTest: "A", - expected: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts", + expected: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts", }, { name: "TestQueryWithExpression - Error rate", query: &v3.QueryRangeParamsV3{ - Start: 1650991982000, - End: 1651078382000, + Start: 1650991982000, // 2022-04-25 10:53:02 + End: 1651078382000, // 2022-04-26 10:53:02 CompositeQuery: &v3.CompositeQuery{ QueryType: v3.QueryTypeBuilder, PanelType: v3.PanelTypeGraph, @@ -331,7 +331,7 @@ func TestDeltaQueryBuilder(t *testing.T) { }, }, queryToTest: "C", - expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`", + expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`", }, { name: "TestQuery - Quantile", @@ -359,7 +359,7 @@ func TestDeltaQueryBuilder(t *testing.T) { }, }, queryToTest: "A", - expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", + expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", }, } diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 3a65c03be0..05855b1ddd 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -80,6 +80,11 @@ var TimestampSortFeature = GetOrDefaultEnv("TIMESTAMP_SORT_FEATURE", "true") var PreferRPMFeature = GetOrDefaultEnv("PREFER_RPM_FEATURE", "false") +// TODO(srikanthccv): remove after backfilling is done +func UseMetricsPreAggregation() bool { + return GetOrDefaultEnv("USE_METRICS_PRE_AGGREGATION", "true") == "true" +} + var KafkaSpanEval = GetOrDefaultEnv("KAFKA_SPAN_EVAL", "false") func IsDurationSortFeatureEnabled() bool { @@ -222,15 +227,19 @@ var GroupByColMap = map[string]struct{}{ } const ( - SIGNOZ_METRIC_DBNAME = "signoz_metrics" - SIGNOZ_SAMPLES_V4_TABLENAME = "distributed_samples_v4" - SIGNOZ_TRACE_DBNAME = "signoz_traces" - SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2" - SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME = "signoz_index_v2" - SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4" - SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs" - SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day" - SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME = "distributed_time_series_v4_1day" + SIGNOZ_METRIC_DBNAME = "signoz_metrics" + SIGNOZ_SAMPLES_V4_TABLENAME = "distributed_samples_v4" + SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME = "distributed_samples_v4_agg_5m" + SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME = "distributed_samples_v4_agg_30m" + SIGNOZ_EXP_HISTOGRAM_TABLENAME = "distributed_exp_hist" + SIGNOZ_TRACE_DBNAME = "signoz_traces" + SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2" + SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME = "signoz_index_v2" + SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4" + SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs" + SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day" + SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME = "time_series_v4_1week" + SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME = "distributed_time_series_v4_1day" ) var TimeoutExcludedRoutes = map[string]bool{ diff --git a/pkg/query-service/rules/threshold_rule.go b/pkg/query-service/rules/threshold_rule.go index 72b00e2412..7cc823bb53 100644 --- a/pkg/query-service/rules/threshold_rule.go +++ b/pkg/query-service/rules/threshold_rule.go @@ -152,6 +152,15 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) (*v3.QueryRangeParamsV3, if minStep := common.MinAllowedStepInterval(start, end); q.StepInterval < minStep { q.StepInterval = minStep } + if q.DataSource == v3.DataSourceMetrics && constants.UseMetricsPreAggregation() { + // if the time range is greater than 1 day, and less than 1 week set the step interval to be multiple of 5 minutes + // if the time range is greater than 1 week, set the step interval to be multiple of 30 mins + if end-start >= 24*time.Hour.Milliseconds() && end-start < 7*24*time.Hour.Milliseconds() { + q.StepInterval = int64(math.Round(float64(q.StepInterval)/300)) * 300 + } else if end-start >= 7*24*time.Hour.Milliseconds() { + q.StepInterval = int64(math.Round(float64(q.StepInterval)/1800)) * 1800 + } + } } }