diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 4910f7118d..46029a9e39 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -145,7 +145,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { BuildTraceQuery: tracesV3.PrepareTracesQuery, BuildLogQuery: logsv3.PrepareLogsQuery, } - aH.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts) + aH.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, aH.featureFlags) aH.ready = aH.testReady diff --git a/pkg/query-service/app/logs/v3/query_builder.go b/pkg/query-service/app/logs/v3/query_builder.go index 2c98280b8f..f4897e4013 100644 --- a/pkg/query-service/app/logs/v3/query_builder.go +++ b/pkg/query-service/app/logs/v3/query_builder.go @@ -175,7 +175,7 @@ func getZerosForEpochNano(epoch int64) int64 { return int64(math.Pow(10, float64(19-count))) } -func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.BuilderQuery, graphLimitQtype string) (string, error) { +func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.BuilderQuery, graphLimitQtype string, preferRPM bool) (string, error) { filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy) if err != nil { @@ -238,7 +238,12 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build switch mq.AggregateOperator { case v3.AggregateOperatorRate: - op := fmt.Sprintf("count(%s)/%d", aggregationKey, step) + rate := float64(step) + if preferRPM { + rate = rate / 60.0 + } + + op := fmt.Sprintf("count(%s)/%f", aggregationKey, rate) query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy) return query, nil case @@ -246,7 +251,12 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build v3.AggregateOperatorRateMax, v3.AggregateOperatorRateAvg, v3.AggregateOperatorRateMin: - op := fmt.Sprintf("%s(%s)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, step) + rate := float64(step) + if preferRPM { + rate = rate / 60.0 + } + + op := fmt.Sprintf("%s(%s)/%f", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate) query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy) return query, nil case @@ -411,6 +421,7 @@ func addOffsetToQuery(query string, offset uint64) string { type Options struct { GraphLimitQtype string IsLivetailQuery bool + PreferRPM bool } func isOrderByTs(orderBy []v3.OrderBy) bool { @@ -429,7 +440,7 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan return query, nil } else if options.GraphLimitQtype == constants.FirstQueryGraphLimit { // give me just the groupby names - query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype) + query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM) if err != nil { return "", err } @@ -437,14 +448,14 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan return query, nil } else if options.GraphLimitQtype == constants.SecondQueryGraphLimit { - query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype) + query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM) if err != nil { return "", err } return query, nil } - query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype) + query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM) if err != nil { return "", err } diff --git a/pkg/query-service/app/logs/v3/query_builder_test.go b/pkg/query-service/app/logs/v3/query_builder_test.go index 95671dbc90..cda6757998 100644 --- a/pkg/query-service/app/logs/v3/query_builder_test.go +++ b/pkg/query-service/app/logs/v3/query_builder_test.go @@ -1,7 +1,6 @@ package v3 import ( - "fmt" "testing" . "github.com/smartystreets/goconvey/convey" @@ -239,6 +238,7 @@ var testBuildLogsQueryData = []struct { AggregateOperator v3.AggregateOperator ExpectedQuery string Type int + PreferRPM bool }{ { Name: "Test aggregate count on select field", @@ -535,8 +535,9 @@ var testBuildLogsQueryData = []struct { OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "ASC"}}, }, TableName: "logs", + PreferRPM: true, ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_string_value[indexOf(attributes_string_key, 'method')] as method" + - ", sum(bytes)/60 as value from signoz_logs.distributed_logs " + + ", sum(bytes)/1.000000 as value from signoz_logs.distributed_logs " + "where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) " + "AND indexOf(attributes_string_key, 'method') > 0 " + "group by method,ts order by method ASC", @@ -557,8 +558,9 @@ var testBuildLogsQueryData = []struct { OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "ASC"}}, }, TableName: "logs", + PreferRPM: false, ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_string_value[indexOf(attributes_string_key, 'method')] as method" + - ", count(attributes_float64_value[indexOf(attributes_float64_key, 'bytes')])/60 as value " + + ", count(attributes_float64_value[indexOf(attributes_float64_key, 'bytes')])/60.000000 as value " + "from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) " + "AND indexOf(attributes_string_key, 'method') > 0 " + "group by method,ts " + @@ -580,9 +582,10 @@ var testBuildLogsQueryData = []struct { OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "ASC"}}, }, TableName: "logs", + PreferRPM: true, ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, " + "attributes_string_value[indexOf(attributes_string_key, 'method')] as method, " + - "sum(attributes_float64_value[indexOf(attributes_float64_key, 'bytes')])/60 as value " + + "sum(attributes_float64_value[indexOf(attributes_float64_key, 'bytes')])/1.000000 as value " + "from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) " + "AND indexOf(attributes_string_key, 'method') > 0 " + "group by method,ts " + @@ -803,8 +806,7 @@ var testBuildLogsQueryData = []struct { func TestBuildLogsQuery(t *testing.T) { for _, tt := range testBuildLogsQueryData { Convey("TestBuildLogsQuery", t, func() { - query, err := buildLogsQuery(tt.PanelType, tt.Start, tt.End, tt.Step, tt.BuilderQuery, "") - fmt.Println(query) + query, err := buildLogsQuery(tt.PanelType, tt.Start, tt.End, tt.Step, tt.BuilderQuery, "", tt.PreferRPM) So(err, ShouldBeNil) So(query, ShouldEqual, tt.ExpectedQuery) @@ -1011,7 +1013,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT method from (SELECT attributes_string_value[indexOf(attributes_string_key, 'method')] as method, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND indexOf(attributes_string_key, 'method') > 0 group by method order by value DESC) LIMIT 10", - Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit}, + Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: true}, }, { Name: "Test TS with limit- first - with order by value", @@ -1034,7 +1036,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT method from (SELECT attributes_string_value[indexOf(attributes_string_key, 'method')] as method, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND indexOf(attributes_string_key, 'method') > 0 group by method order by value ASC) LIMIT 10", - Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit}, + Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: true}, }, { Name: "Test TS with limit- first - with order by attribute", @@ -1057,7 +1059,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT method from (SELECT attributes_string_value[indexOf(attributes_string_key, 'method')] as method, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND indexOf(attributes_string_key, 'method') > 0 group by method order by method ASC) LIMIT 10", - Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit}, + Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: true}, }, { Name: "Test TS with limit- second", diff --git a/pkg/query-service/app/metrics/v3/query_builder.go b/pkg/query-service/app/metrics/v3/query_builder.go index ea3888649b..03ac87d90f 100644 --- a/pkg/query-service/app/metrics/v3/query_builder.go +++ b/pkg/query-service/app/metrics/v3/query_builder.go @@ -11,6 +11,10 @@ import ( "go.signoz.io/signoz/pkg/query-service/utils" ) +type Options struct { + PreferRPM bool +} + var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{ v3.AggregateOperatorP05: 0.05, v3.AggregateOperatorP10: 0.10, @@ -418,7 +422,7 @@ func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v return query, nil } -func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery) (string, error) { +func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options Options) (string, error) { var query string var err error if mq.Temporality == v3.Delta { @@ -434,9 +438,29 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P query, err = buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) } } + if err != nil { return "", err } + + if options.PreferRPM && (mq.AggregateOperator == v3.AggregateOperatorRate || + mq.AggregateOperator == v3.AggregateOperatorSumRate || + mq.AggregateOperator == v3.AggregateOperatorAvgRate || + mq.AggregateOperator == v3.AggregateOperatorMaxRate || + mq.AggregateOperator == v3.AggregateOperatorMinRate || + mq.AggregateOperator == v3.AggregateOperatorRateSum || + mq.AggregateOperator == v3.AggregateOperatorRateAvg || + mq.AggregateOperator == v3.AggregateOperatorRateMax || + mq.AggregateOperator == v3.AggregateOperatorRateMin) { + var selectLabels string + if mq.AggregateOperator == v3.AggregateOperatorRate { + selectLabels = "fullLabels," + } else { + selectLabels = groupSelectAttributeKeyTags(mq.GroupBy...) + } + query = `SELECT ` + selectLabels + ` ts, ceil(value * 60) as value FROM (` + query + `)` + } + if having(mq.Having) != "" { query = fmt.Sprintf("SELECT * FROM (%s) HAVING %s", query, having(mq.Having)) } diff --git a/pkg/query-service/app/metrics/v3/query_builder_test.go b/pkg/query-service/app/metrics/v3/query_builder_test.go index 173645de78..7978a5d2c0 100644 --- a/pkg/query-service/app/metrics/v3/query_builder_test.go +++ b/pkg/query-service/app/metrics/v3/query_builder_test.go @@ -27,7 +27,7 @@ func TestBuildQuery(t *testing.T) { PanelType: v3.PanelTypeGraph, }, } - query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"]) + query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"], Options{PreferRPM: false}) require.NoError(t, err) require.Contains(t, query, "WHERE metric_name = 'name'") }) @@ -54,7 +54,7 @@ func TestBuildQueryWithFilters(t *testing.T) { }, }, } - query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"]) + query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"], Options{PreferRPM: false}) require.NoError(t, err) require.Contains(t, query, "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'a') != 'b'") @@ -90,7 +90,7 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) { }, } - query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"]) + query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"], Options{PreferRPM: false}) require.NoError(t, err) require.Contains(t, query, "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'in') IN ['a','b','c']") @@ -286,7 +286,60 @@ func TestBuildQueryXRate(t *testing.T) { PanelType: v3.PanelTypeGraph, }, } - query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"]) + query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"], Options{PreferRPM: false}) + require.NoError(t, err) + require.Equal(t, query, c.expectedQuery) + } + }) +} + +func TestBuildQueryRPM(t *testing.T) { + t.Run("TestBuildQueryXRate", func(t *testing.T) { + + tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(value) as value FROM (SELECT ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 0 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY GROUPING SETS ( (ts), () ) ORDER BY ts)` + + cases := []struct { + aggregateOperator v3.AggregateOperator + expectedQuery string + }{ + { + aggregateOperator: v3.AggregateOperatorAvgRate, + expectedQuery: fmt.Sprintf(tmpl, aggregateOperatorToSQLFunc[v3.AggregateOperatorAvgRate]), + }, + { + aggregateOperator: v3.AggregateOperatorMaxRate, + expectedQuery: fmt.Sprintf(tmpl, aggregateOperatorToSQLFunc[v3.AggregateOperatorMaxRate]), + }, + { + aggregateOperator: v3.AggregateOperatorMinRate, + expectedQuery: fmt.Sprintf(tmpl, aggregateOperatorToSQLFunc[v3.AggregateOperatorMinRate]), + }, + { + aggregateOperator: v3.AggregateOperatorSumRate, + expectedQuery: fmt.Sprintf(tmpl, aggregateOperatorToSQLFunc[v3.AggregateOperatorSumRate]), + }, + } + + for _, c := range cases { + + q := &v3.QueryRangeParamsV3{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "name"}, + AggregateOperator: c.aggregateOperator, + Expression: "A", + }, + }, + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + }, + } + query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"], Options{PreferRPM: true}) require.NoError(t, err) require.Equal(t, query, c.expectedQuery) } diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index ea77fca4f3..263c5008f6 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -250,7 +250,10 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa } if builderQuery.DataSource == v3.DataSourceTraces { - query, err := tracesV3.PrepareTracesQuery(params.Start, params.End, params.CompositeQuery.PanelType, builderQuery, keys, "") + query, err := tracesV3.PrepareTracesQuery(params.Start, params.End, params.CompositeQuery.PanelType, builderQuery, keys, tracesV3.Options{ + GraphLimitQtype: "", + PreferRPM: false, + }) if err != nil { errQueriesByName[queryName] = err.Error() continue @@ -285,6 +288,7 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, + metricsV3.Options{PreferRPM: false}, ) if err != nil { errQueriesByName[queryName] = err.Error() diff --git a/pkg/query-service/app/queryBuilder/query_builder.go b/pkg/query-service/app/queryBuilder/query_builder.go index bfa29395bb..690ba6fcc2 100644 --- a/pkg/query-service/app/queryBuilder/query_builder.go +++ b/pkg/query-service/app/queryBuilder/query_builder.go @@ -6,8 +6,11 @@ import ( "github.com/SigNoz/govaluate" logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" + tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/constants" + "go.signoz.io/signoz/pkg/query-service/interfaces" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.uber.org/zap" ) @@ -40,12 +43,13 @@ var SupportedFunctions = []string{ var EvalFuncs = map[string]govaluate.ExpressionFunction{} -type prepareTracesQueryFunc func(start, end int64, panelType v3.PanelType, bq *v3.BuilderQuery, keys map[string]v3.AttributeKey, graphLimitQtype string) (string, error) +type prepareTracesQueryFunc func(start, end int64, panelType v3.PanelType, bq *v3.BuilderQuery, keys map[string]v3.AttributeKey, options tracesV3.Options) (string, error) type prepareLogsQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery, options logsV3.Options) (string, error) -type prepareMetricQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error) +type prepareMetricQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery, options metricsV3.Options) (string, error) type QueryBuilder struct { - options QueryBuilderOptions + options QueryBuilderOptions + featureFlags interfaces.FeatureLookup } type QueryBuilderOptions struct { @@ -54,9 +58,10 @@ type QueryBuilderOptions struct { BuildMetricQuery prepareMetricQueryFunc } -func NewQueryBuilder(options QueryBuilderOptions) *QueryBuilder { +func NewQueryBuilder(options QueryBuilderOptions, featureFlags interfaces.FeatureLookup) *QueryBuilder { return &QueryBuilder{ - options: options, + options: options, + featureFlags: featureFlags, } } @@ -161,7 +166,8 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in compositeQuery := params.CompositeQuery if compositeQuery != nil { - + err := qb.featureFlags.CheckFeature(constants.PreferRPM) + PreferRPMFeatureEnabled := err == nil // Build queries for each builder query for queryName, query := range compositeQuery.BuilderQueries { if query.Expression == queryName { @@ -173,18 +179,21 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in } // for ts query with group by and limit form two queries if compositeQuery.PanelType == v3.PanelTypeGraph && query.Limit > 0 && len(query.GroupBy) > 0 { - limitQuery, err := qb.options.BuildTraceQuery(params.Start, params.End, compositeQuery.PanelType, query, keys, constants.FirstQueryGraphLimit) + limitQuery, err := qb.options.BuildTraceQuery(params.Start, params.End, compositeQuery.PanelType, query, + keys, tracesV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled}) if err != nil { return nil, err } - placeholderQuery, err := qb.options.BuildTraceQuery(params.Start, params.End, compositeQuery.PanelType, query, keys, constants.SecondQueryGraphLimit) + placeholderQuery, err := qb.options.BuildTraceQuery(params.Start, params.End, compositeQuery.PanelType, + query, keys, tracesV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled}) if err != nil { return nil, err } query := fmt.Sprintf(placeholderQuery, limitQuery) queries[queryName] = query } else { - queryString, err := qb.options.BuildTraceQuery(params.Start, params.End, compositeQuery.PanelType, query, keys, "") + queryString, err := qb.options.BuildTraceQuery(params.Start, params.End, compositeQuery.PanelType, + query, keys, tracesV3.Options{PreferRPM: PreferRPMFeatureEnabled, GraphLimitQtype: ""}) if err != nil { return nil, err } @@ -193,25 +202,25 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in case v3.DataSourceLogs: // for ts query with limit replace it as it is already formed if compositeQuery.PanelType == v3.PanelTypeGraph && query.Limit > 0 && len(query.GroupBy) > 0 { - limitQuery, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit}) + limitQuery, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled}) if err != nil { return nil, err } - placeholderQuery, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit}) + placeholderQuery, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled}) if err != nil { return nil, err } query := fmt.Sprintf(placeholderQuery, limitQuery) queries[queryName] = query } else { - queryString, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{}) + queryString, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{PreferRPM: PreferRPMFeatureEnabled, GraphLimitQtype: ""}) if err != nil { return nil, err } queries[queryName] = queryString } case v3.DataSourceMetrics: - queryString, err := qb.options.BuildMetricQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query) + queryString, err := qb.options.BuildMetricQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, metricsV3.Options{PreferRPM: PreferRPMFeatureEnabled}) if err != nil { return nil, err } diff --git a/pkg/query-service/app/queryBuilder/query_builder_test.go b/pkg/query-service/app/queryBuilder/query_builder_test.go index 649556130c..3dc80ae798 100644 --- a/pkg/query-service/app/queryBuilder/query_builder_test.go +++ b/pkg/query-service/app/queryBuilder/query_builder_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/require" metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" + "go.signoz.io/signoz/pkg/query-service/featureManager" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) @@ -44,7 +45,8 @@ func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) { qbOptions := QueryBuilderOptions{ BuildMetricQuery: metricsv3.PrepareMetricQuery, } - qb := NewQueryBuilder(qbOptions) + fm := featureManager.StartManager() + qb := NewQueryBuilder(qbOptions, fm) queries, err := qb.PrepareQueries(q) @@ -85,7 +87,8 @@ func TestBuildQueryWithIncorrectQueryRef(t *testing.T) { qbOptions := QueryBuilderOptions{ BuildMetricQuery: metricsv3.PrepareMetricQuery, } - qb := NewQueryBuilder(qbOptions) + fm := featureManager.StartManager() + qb := NewQueryBuilder(qbOptions, fm) _, err := qb.PrepareQueries(q) @@ -157,7 +160,8 @@ func TestBuildQueryWithThreeOrMoreQueriesRefAndFormula(t *testing.T) { qbOptions := QueryBuilderOptions{ BuildMetricQuery: metricsv3.PrepareMetricQuery, } - qb := NewQueryBuilder(qbOptions) + fm := featureManager.StartManager() + qb := NewQueryBuilder(qbOptions, fm) queries, err := qb.PrepareQueries(q) @@ -358,7 +362,8 @@ func TestDeltaQueryBuilder(t *testing.T) { qbOptions := QueryBuilderOptions{ BuildMetricQuery: metricsv3.PrepareMetricQuery, } - qb := NewQueryBuilder(qbOptions) + fm := featureManager.StartManager() + qb := NewQueryBuilder(qbOptions, fm) queries, err := qb.PrepareQueries(c.query) require.NoError(t, err) diff --git a/pkg/query-service/app/traces/v3/query_builder.go b/pkg/query-service/app/traces/v3/query_builder.go index b7ce646b9f..ce4a252668 100644 --- a/pkg/query-service/app/traces/v3/query_builder.go +++ b/pkg/query-service/app/traces/v3/query_builder.go @@ -10,6 +10,11 @@ import ( "go.signoz.io/signoz/pkg/query-service/utils" ) +type Options struct { + GraphLimitQtype string + PreferRPM bool +} + var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{ v3.AggregateOperatorP05: 0.05, v3.AggregateOperatorP10: 0.10, @@ -232,7 +237,7 @@ func handleEmptyValuesInGroupBy(keys map[string]v3.AttributeKey, groupBy []v3.At return "", nil } -func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, tableName string, keys map[string]v3.AttributeKey, panelType v3.PanelType, graphLimitQtype string) (string, error) { +func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, tableName string, keys map[string]v3.AttributeKey, panelType v3.PanelType, options Options) (string, error) { filterSubQuery, err := buildTracesFilterQuery(mq.Filters, keys) if err != nil { @@ -249,7 +254,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str } var queryTmpl string - if graphLimitQtype == constants.FirstQueryGraphLimit { + if options.GraphLimitQtype == constants.FirstQueryGraphLimit { queryTmpl = "SELECT" } else if panelType == v3.PanelTypeTable { queryTmpl = @@ -268,7 +273,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str "%s" // we don't need value for first query - if graphLimitQtype == constants.FirstQueryGraphLimit { + if options.GraphLimitQtype == constants.FirstQueryGraphLimit { queryTmpl = "SELECT " + getSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")" } @@ -278,7 +283,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str } filterSubQuery += emptyValuesInGroupByFilter - groupBy := groupByAttributeKeyTags(panelType, graphLimitQtype, mq.GroupBy...) + groupBy := groupByAttributeKeyTags(panelType, options.GraphLimitQtype, mq.GroupBy...) if groupBy != "" { groupBy = " group by " + groupBy } @@ -288,7 +293,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str orderBy = " order by " + orderBy } - if graphLimitQtype == constants.SecondQueryGraphLimit { + if options.GraphLimitQtype == constants.SecondQueryGraphLimit { filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", getSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "%s)" } @@ -303,7 +308,13 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str v3.AggregateOperatorRateAvg, v3.AggregateOperatorRateMin, v3.AggregateOperatorRate: - op := fmt.Sprintf("%s(%s)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, step) + + rate := float64(step) + if options.PreferRPM { + rate = rate / 60.0 + } + + op := fmt.Sprintf("%s(%s)/%f", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate) query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy) return query, nil case @@ -488,25 +499,25 @@ func addOffsetToQuery(query string, offset uint64) string { return fmt.Sprintf("%s OFFSET %d", query, offset) } -func PrepareTracesQuery(start, end int64, panelType v3.PanelType, mq *v3.BuilderQuery, keys map[string]v3.AttributeKey, graphLimitQtype string) (string, error) { - if graphLimitQtype == constants.FirstQueryGraphLimit { +func PrepareTracesQuery(start, end int64, panelType v3.PanelType, mq *v3.BuilderQuery, keys map[string]v3.AttributeKey, options Options) (string, error) { + if options.GraphLimitQtype == constants.FirstQueryGraphLimit { // give me just the group by names - query, err := buildTracesQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_SPAN_INDEX_TABLENAME, keys, panelType, graphLimitQtype) + query, err := buildTracesQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_SPAN_INDEX_TABLENAME, keys, panelType, options) if err != nil { return "", err } query = addLimitToQuery(query, mq.Limit) return query, nil - } else if graphLimitQtype == constants.SecondQueryGraphLimit { - query, err := buildTracesQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_SPAN_INDEX_TABLENAME, keys, panelType, graphLimitQtype) + } else if options.GraphLimitQtype == constants.SecondQueryGraphLimit { + query, err := buildTracesQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_SPAN_INDEX_TABLENAME, keys, panelType, options) if err != nil { return "", err } return query, nil } - query, err := buildTracesQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_SPAN_INDEX_TABLENAME, keys, panelType, graphLimitQtype) + query, err := buildTracesQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_SPAN_INDEX_TABLENAME, keys, panelType, options) if err != nil { return "", err } diff --git a/pkg/query-service/app/traces/v3/query_builder_test.go b/pkg/query-service/app/traces/v3/query_builder_test.go index c7abe75492..20c6d86c1b 100644 --- a/pkg/query-service/app/traces/v3/query_builder_test.go +++ b/pkg/query-service/app/traces/v3/query_builder_test.go @@ -466,6 +466,7 @@ var testBuildTracesQueryData = []struct { AggregateOperator v3.AggregateOperator ExpectedQuery string PanelType v3.PanelType + Options Options }{ { Name: "Test aggregate count on fixed column of float64 type", @@ -495,10 +496,11 @@ var testBuildTracesQueryData = []struct { Expression: "A", }, TableName: "signoz_traces.distributed_signoz_index_v2", - ExpectedQuery: "SELECT toStartOfInterval(timestamp, INTERVAL 60 SECOND) AS ts, count()/60 as value from" + + ExpectedQuery: "SELECT toStartOfInterval(timestamp, INTERVAL 60 SECOND) AS ts, count()/1.000000 as value from" + " signoz_traces.distributed_signoz_index_v2 where (timestamp >= '1680066360726210000' AND timestamp <=" + " '1680066458000000000') group by ts order by value DESC", PanelType: v3.PanelTypeGraph, + Options: Options{GraphLimitQtype: "", PreferRPM: true}, }, { Name: "Test aggregate count on fixed column of float64 type with filter", @@ -837,10 +839,13 @@ var testBuildTracesQueryData = []struct { }, TableName: "signoz_traces.distributed_signoz_index_v2", ExpectedQuery: "SELECT toStartOfInterval(timestamp, INTERVAL 60 SECOND) AS ts, stringTagMap['method'] as `method`" + - ", sum(bytes)/60 as value from signoz_traces.distributed_signoz_index_v2 " + + ", sum(bytes)/60.000000 as value from signoz_traces.distributed_signoz_index_v2 " + "where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000')" + " AND has(stringTagMap, 'method') group by `method`,ts order by `method` ASC", PanelType: v3.PanelTypeGraph, + Options: Options{GraphLimitQtype: "", + PreferRPM: false, + }, }, { Name: "Test aggregate rate", @@ -858,11 +863,12 @@ var testBuildTracesQueryData = []struct { }, TableName: "signoz_traces.distributed_signoz_index_v2", ExpectedQuery: "SELECT toStartOfInterval(timestamp, INTERVAL 60 SECOND) AS ts, stringTagMap['method'] as `method`" + - ", count(numberTagMap['bytes'])/60 as value " + + ", count(numberTagMap['bytes'])/1.000000 as value " + "from signoz_traces.distributed_signoz_index_v2 where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') " + "AND has(stringTagMap, 'method') group by `method`,ts " + "order by `method` ASC", PanelType: v3.PanelTypeGraph, + Options: Options{GraphLimitQtype: "", PreferRPM: true}, }, { Name: "Test aggregate RateSum without fixed column", @@ -881,11 +887,12 @@ var testBuildTracesQueryData = []struct { TableName: "signoz_traces.distributed_signoz_index_v2", ExpectedQuery: "SELECT toStartOfInterval(timestamp, INTERVAL 60 SECOND) AS ts, " + "stringTagMap['method'] as `method`, " + - "sum(numberTagMap['bytes'])/60 as value " + + "sum(numberTagMap['bytes'])/1.000000 as value " + "from signoz_traces.distributed_signoz_index_v2 where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') " + "AND has(stringTagMap, 'method') group by `method`,ts " + "order by `method` ASC", PanelType: v3.PanelTypeGraph, + Options: Options{GraphLimitQtype: "", PreferRPM: true}, }, { Name: "Test aggregate with having clause", @@ -1131,7 +1138,7 @@ func TestBuildTracesQuery(t *testing.T) { Convey("TestBuildTracesQuery", t, func() { query, err := buildTracesQuery(tt.Start, tt.End, tt.Step, tt.BuilderQuery, tt.TableName, map[string]v3.AttributeKey{ "name": {Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, - }, tt.PanelType, "") + }, tt.PanelType, tt.Options) So(err, ShouldBeNil) So(query, ShouldEqual, tt.ExpectedQuery) }) @@ -1146,7 +1153,7 @@ var testPrepTracesQueryData = []struct { BuilderQuery *v3.BuilderQuery ExpectedQuery string Keys map[string]v3.AttributeKey - Type string + Options Options }{ { Name: "Test TS with limit- first", @@ -1171,7 +1178,9 @@ var testPrepTracesQueryData = []struct { " where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND" + " stringTagMap['method'] = 'GET' AND has(stringTagMap, 'method') group by `method` order by value DESC) LIMIT 10", Keys: map[string]v3.AttributeKey{"name": {Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}}, - Type: constants.FirstQueryGraphLimit, + Options: Options{ + GraphLimitQtype: constants.FirstQueryGraphLimit, + }, }, { Name: "Test TS with limit- first - with order by value", @@ -1198,7 +1207,9 @@ var testPrepTracesQueryData = []struct { " AND timestamp <= '1680066458000000000') AND stringTagMap['method'] = 'GET' AND" + " has(stringTagMap, 'method') group by `method` order by value ASC) LIMIT 10", Keys: map[string]v3.AttributeKey{}, - Type: constants.FirstQueryGraphLimit, + Options: Options{ + GraphLimitQtype: constants.FirstQueryGraphLimit, + }, }, { Name: "Test TS with limit- first - with order by attribute", @@ -1222,7 +1233,9 @@ var testPrepTracesQueryData = []struct { " AND timestamp <= '1680066458000000000') " + "group by `serviceName` order by `serviceName` ASC) LIMIT 10", Keys: map[string]v3.AttributeKey{}, - Type: constants.FirstQueryGraphLimit, + Options: Options{ + GraphLimitQtype: constants.FirstQueryGraphLimit, + }, }, { Name: "Test TS with limit- first - with 2 group by and 2 order by", @@ -1250,7 +1263,9 @@ var testPrepTracesQueryData = []struct { " AND timestamp <= '1680066458000000000') AND has(stringTagMap, 'http.method') " + "group by `serviceName`,`http.method` order by `serviceName` ASC,value ASC) LIMIT 10", Keys: map[string]v3.AttributeKey{}, - Type: constants.FirstQueryGraphLimit, + Options: Options{ + GraphLimitQtype: constants.FirstQueryGraphLimit, + }, }, { Name: "Test TS with limit- second", @@ -1276,7 +1291,9 @@ var testPrepTracesQueryData = []struct { " AND timestamp <= '1680066458000000000') AND stringTagMap['method'] = 'GET' AND" + " has(stringTagMap, 'method') AND (`method`) GLOBAL IN (%s) group by `method`,ts order by value DESC", Keys: map[string]v3.AttributeKey{}, - Type: constants.SecondQueryGraphLimit, + Options: Options{ + GraphLimitQtype: constants.SecondQueryGraphLimit, + }, }, { Name: "Test TS with limit- second - with order by", @@ -1302,7 +1319,9 @@ var testPrepTracesQueryData = []struct { " as value from signoz_traces.distributed_signoz_index_v2 where (timestamp >= '1680066360726210000'" + " AND timestamp <= '1680066458000000000') AND stringTagMap['method'] = 'GET' AND" + " has(stringTagMap, 'method') AND (`method`) GLOBAL IN (%s) group by `method`,ts order by `method` ASC", Keys: map[string]v3.AttributeKey{}, - Type: constants.SecondQueryGraphLimit, + Options: Options{ + GraphLimitQtype: constants.SecondQueryGraphLimit, + }, }, { Name: "Test TS with limit - second - with two group by and two order by", @@ -1335,14 +1354,16 @@ var testPrepTracesQueryData = []struct { "AND (`method`,`name`) GLOBAL IN (%s) group by `method`,`name`,ts " + "order by `method` ASC,`name` ASC", Keys: map[string]v3.AttributeKey{}, - Type: constants.SecondQueryGraphLimit, + Options: Options{ + GraphLimitQtype: constants.SecondQueryGraphLimit, + }, }, } func TestPrepareTracesQuery(t *testing.T) { for _, tt := range testPrepTracesQueryData { Convey("TestPrepareTracesQuery", t, func() { - query, err := PrepareTracesQuery(tt.Start, tt.End, tt.PanelType, tt.BuilderQuery, tt.Keys, tt.Type) + query, err := PrepareTracesQuery(tt.Start, tt.End, tt.PanelType, tt.BuilderQuery, tt.Keys, tt.Options) So(err, ShouldBeNil) So(query, ShouldEqual, tt.ExpectedQuery) }) diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 6de790a396..fa642e2e60 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -35,6 +35,7 @@ const LogsTTL = "logs" const DurationSort = "DurationSort" const TimestampSort = "TimestampSort" +const PreferRPM = "PreferRPM" func GetAlertManagerApiPrefix() string { if os.Getenv("ALERTMANAGER_API_PREFIX") != "" { @@ -55,6 +56,8 @@ var DurationSortFeature = GetOrDefaultEnv("DURATION_SORT_FEATURE", "true") var TimestampSortFeature = GetOrDefaultEnv("TIMESTAMP_SORT_FEATURE", "true") +var PreferRPMFeature = GetOrDefaultEnv("PREFER_RPM_FEATURE", "false") + func IsDurationSortFeatureEnabled() bool { isDurationSortFeatureEnabledStr := DurationSortFeature isDurationSortFeatureEnabledBool, err := strconv.ParseBool(isDurationSortFeatureEnabledStr) @@ -73,6 +76,15 @@ func IsTimestampSortFeatureEnabled() bool { return isTimestampSortFeatureEnabledBool } +func IsPreferRPMFeatureEnabled() bool { + preferRPMFeatureEnabledStr := PreferRPMFeature + preferRPMFeatureEnabledBool, err := strconv.ParseBool(preferRPMFeatureEnabledStr) + if err != nil { + return false + } + return preferRPMFeatureEnabledBool +} + var DEFAULT_FEATURE_SET = model.FeatureSet{ model.Feature{ Name: DurationSort, @@ -94,6 +106,13 @@ var DEFAULT_FEATURE_SET = model.FeatureSet{ UsageLimit: -1, Route: "", }, + model.Feature{ + Name: PreferRPM, + Active: IsPreferRPMFeatureEnabled(), + Usage: 0, + UsageLimit: -1, + Route: "", + }, } func GetContextTimeout() time.Duration { diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index 067e96c033..70596982d9 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -509,6 +509,7 @@ func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string ruleId, r, ThresholdRuleOpts{}, + m.featureFlags, ) if err != nil { @@ -856,6 +857,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m SendUnmatched: true, SendAlways: true, }, + m.featureFlags, ) if err != nil { diff --git a/pkg/query-service/rules/thresholdRule.go b/pkg/query-service/rules/thresholdRule.go index 507e6fa289..ced9577b79 100644 --- a/pkg/query-service/rules/thresholdRule.go +++ b/pkg/query-service/rules/thresholdRule.go @@ -19,6 +19,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/constants" + "go.signoz.io/signoz/pkg/query-service/interfaces" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils/labels" querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate" @@ -75,6 +76,7 @@ func NewThresholdRule( id string, p *PostableRule, opts ThresholdRuleOpts, + featureFlags interfaces.FeatureLookup, ) (*ThresholdRule, error) { if p.RuleCondition == nil { @@ -106,7 +108,7 @@ func NewThresholdRule( BuildTraceQuery: tracesV3.PrepareTracesQuery, BuildLogQuery: logsv3.PrepareLogsQuery, } - t.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts) + t.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, featureFlags) zap.S().Info("msg:", "creating new alerting rule", "\t name:", t.name, "\t condition:", t.ruleCondition.String(), "\t generatorURL:", t.GeneratorURL())