From 216499051dd737b7587c4697dab74fc30357dc77 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 13 Jul 2023 18:50:19 +0530 Subject: [PATCH] Add support for Delta temporality (#2505) * Revert "upgraded some deprecated packages (#2423)" This reverts commit c4b052c51ec3a22e25923aa92fb66665e6ca589b. * chore: delta working with QB * chore: use enum * Revert "Revert "upgraded some deprecated packages (#2423)"" This reverts commit d88f3594a62582a16fa04528a56ff075f628bf4e. * chore: add test * chore: add delta for query range v3 * chore: explicit check for temporality * chore: fix tests * chore: conditionally fetch temporality when --prefer-delta is set --- ee/query-service/app/api/api.go | 2 + ee/query-service/app/server.go | 2 + ee/query-service/main.go | 3 + .../app/clickhouseReader/reader.go | 26 +++ pkg/query-service/app/http_handler.go | 55 ++++++ pkg/query-service/app/metrics/v3/delta.go | 174 ++++++++++++++++++ .../app/metrics/v3/query_builder.go | 22 ++- .../app/metrics/v3/query_builder_test.go | 13 +- .../app/queryBuilder/query_builder_test.go | 165 ++++++++++++++++- pkg/query-service/app/server.go | 2 + pkg/query-service/interfaces/interface.go | 1 + pkg/query-service/main.go | 4 + pkg/query-service/model/v3/v3.go | 9 + 13 files changed, 467 insertions(+), 11 deletions(-) create mode 100644 pkg/query-service/app/metrics/v3/delta.go diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index df1220d80c..2eddf1d83c 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -17,6 +17,7 @@ import ( type APIHandlerOptions struct { DataConnector interfaces.DataConnector SkipConfig *basemodel.SkipConfig + PreferDelta bool AppDao dao.ModelDao RulesManager *rules.Manager FeatureFlags baseint.FeatureLookup @@ -34,6 +35,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) { baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{ Reader: opts.DataConnector, SkipConfig: opts.SkipConfig, + PerferDelta: opts.PreferDelta, AppDao: opts.AppDao, RuleManager: opts.RulesManager, FeatureFlags: opts.FeatureFlags}) diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index ec2895acd8..a2e86023e3 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -56,6 +56,7 @@ type ServerOptions struct { // alert specific params DisableRules bool RuleRepoURL string + PreferDelta bool } // Server runs HTTP api service @@ -170,6 +171,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { apiOpts := api.APIHandlerOptions{ DataConnector: reader, SkipConfig: skipConfig, + PreferDelta: serverOptions.PreferDelta, AppDao: modelDao, RulesManager: rm, FeatureFlags: lm, diff --git a/ee/query-service/main.go b/ee/query-service/main.go index 67cbde2151..52ce63ba20 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -83,10 +83,12 @@ func main() { var ruleRepoURL string var enableQueryServiceLogOTLPExport bool + var preferDelta bool flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") + flag.BoolVar(&preferDelta, "prefer-delta", false, "(prefer delta over raw metrics)") flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)") flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)") flag.Parse() @@ -102,6 +104,7 @@ func main() { HTTPHostPort: baseconst.HTTPHostPort, PromConfigPath: promConfigPath, SkipTopLvlOpsPath: skipTopLvlOpsPath, + PreferDelta: preferDelta, PrivateHostPort: baseconst.PrivateHostPort, DisableRules: disableRules, RuleRepoURL: ruleRepoURL, diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 0a32730f97..c537cac8ef 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -3249,6 +3249,32 @@ func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context) return spansInLastHeartBeatInterval, nil } +func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) { + + metricNameToTemporality := make(map[string]map[v3.Temporality]bool) + + query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN [$1]`, signozMetricDBName, signozTSTableName) + + rows, err := r.db.Query(ctx, query, metricNames) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var metricName, temporality string + err := rows.Scan(&metricName, &temporality) + if err != nil { + return nil, err + } + if _, ok := metricNameToTemporality[metricName]; !ok { + metricNameToTemporality[metricName] = make(map[v3.Temporality]bool) + } + metricNameToTemporality[metricName][v3.Temporality(temporality)] = true + } + return metricNameToTemporality, nil +} + // func sum(array []tsByMetricName) uint64 { // var result uint64 // result = 0 diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index e4803b4c2e..47e115343f 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -71,6 +71,7 @@ type APIHandler struct { featureFlags interfaces.FeatureLookup ready func(http.HandlerFunc) http.HandlerFunc queryBuilder *queryBuilder.QueryBuilder + preferDelta bool // SetupCompleted indicates if SigNoz is ready for general use. // at the moment, we mark the app ready when the first user @@ -84,6 +85,8 @@ type APIHandlerOpts struct { Reader interfaces.Reader SkipConfig *model.SkipConfig + + PerferDelta bool // dao layer to perform crud on app objects like dashboard, alerts etc AppDao dao.ModelDao @@ -106,6 +109,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { reader: opts.Reader, appDao: opts.AppDao, skipConfig: opts.SkipConfig, + preferDelta: opts.PerferDelta, alertManager: alertManager, ruleManager: opts.RuleManager, featureFlags: opts.FeatureFlags, @@ -452,6 +456,48 @@ func (aH *APIHandler) metricAutocompleteTagValue(w http.ResponseWriter, r *http. aH.Respond(w, tagValueList) } +func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error { + + metricNames := make([]string, 0) + metricNameToTemporality := make(map[string]map[v3.Temporality]bool) + if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { + for _, query := range qp.CompositeQuery.BuilderQueries { + if query.DataSource == v3.DataSourceMetrics { + metricNames = append(metricNames, query.AggregateAttribute.Key) + if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok { + metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool) + } + } + } + } + + var err error + + if aH.preferDelta { + zap.S().Debug("fetching metric temporality") + metricNameToTemporality, err = aH.reader.FetchTemporality(ctx, metricNames) + if err != nil { + return err + } + } + + if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { + for name := range qp.CompositeQuery.BuilderQueries { + query := qp.CompositeQuery.BuilderQueries[name] + if query.DataSource == v3.DataSourceMetrics { + if aH.preferDelta && metricNameToTemporality[query.AggregateAttribute.Key][v3.Delta] { + query.Temporality = v3.Delta + } else if metricNameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] { + query.Temporality = v3.Cumulative + } else { + query.Temporality = v3.Unspecified + } + } + } + } + return nil +} + func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request) { metricsQueryRangeParams, apiErrorObj := parser.ParseMetricQueryRangeParams(r) @@ -2776,6 +2822,15 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) { return } + // add temporality for each metric + + temporalityErr := aH.addTemporality(r.Context(), queryRangeParams) + if temporalityErr != nil { + zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr) + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil) + return + } + aH.queryRangeV3(r.Context(), queryRangeParams, w, r) } diff --git a/pkg/query-service/app/metrics/v3/delta.go b/pkg/query-service/app/metrics/v3/delta.go new file mode 100644 index 0000000000..ba3d2783ce --- /dev/null +++ b/pkg/query-service/app/metrics/v3/delta.go @@ -0,0 +1,174 @@ +package v3 + +import ( + "fmt" + + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +func buildDeltaMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName string) (string, error) { + + metricQueryGroupBy := mq.GroupBy + + // if the aggregate operator is a histogram quantile, and user has not forgotten + // the le tag in the group by then add the le tag to the group by + if mq.AggregateOperator == v3.AggregateOperatorHistQuant50 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant75 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant90 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant95 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant99 { + found := false + for _, tag := range mq.GroupBy { + if tag.Key == "le" { + found = true + break + } + } + if !found { + metricQueryGroupBy = append( + metricQueryGroupBy, + v3.AttributeKey{ + Key: "le", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + }, + ) + } + } + + if mq.Filters != nil { + mq.Filters.Items = append(mq.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: "__temporality__"}, + Operator: v3.FilterOperatorEqual, + Value: "Delta", + }) + } + + filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq) + if err != nil { + return "", err + } + + samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + + // Select the aggregate value for interval + queryTmpl := + "SELECT %s" + + " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " %s as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " GLOBAL INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableTimeFilter + + " GROUP BY %s" + + " ORDER BY %s ts" + + // tagsWithoutLe is used to group by all tags except le + // This is done because we want to group by le only when we are calculating quantile + // Otherwise, we want to group by all tags except le + tagsWithoutLe := []string{} + for _, tag := range mq.GroupBy { + if tag.Key != "le" { + tagsWithoutLe = append(tagsWithoutLe, tag.Key) + } + } + + groupByWithoutLe := groupBy(tagsWithoutLe...) + groupTagsWithoutLe := groupSelect(tagsWithoutLe...) + orderWithoutLe := orderBy(mq.OrderBy, tagsWithoutLe) + + groupBy := groupByAttributeKeyTags(metricQueryGroupBy...) + groupTags := groupSelectAttributeKeyTags(metricQueryGroupBy...) + orderBy := orderByAttributeKeyTags(mq.OrderBy, metricQueryGroupBy) + + if len(orderBy) != 0 { + orderBy += "," + } + if len(orderWithoutLe) != 0 { + orderWithoutLe += "," + } + + switch mq.AggregateOperator { + case v3.AggregateOperatorRate: + // Calculate rate of change of metric for each unique time series + groupBy = "fingerprint, ts" + orderBy = "fingerprint, " + groupTags = "fingerprint," + op := fmt.Sprintf("sum(value)/%d", step) + query := fmt.Sprintf( + queryTmpl, "any(labels) as fullLabels, "+groupTags, step, op, filterSubQuery, groupBy, orderBy, + ) // labels will be same so any should be fine + + return query, nil + case v3.AggregateOperatorSumRate, v3.AggregateOperatorAvgRate, v3.AggregateOperatorMaxRate, v3.AggregateOperatorMinRate: + op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step) + query := fmt.Sprintf( + queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy, + ) + return query, nil + case + v3.AggregateOperatorRateSum, + v3.AggregateOperatorRateMax, + v3.AggregateOperatorRateAvg, + v3.AggregateOperatorRateMin: + op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step) + query := fmt.Sprintf( + queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy, + ) + return query, nil + case + v3.AggregateOperatorP05, + v3.AggregateOperatorP10, + v3.AggregateOperatorP20, + v3.AggregateOperatorP25, + v3.AggregateOperatorP50, + v3.AggregateOperatorP75, + v3.AggregateOperatorP90, + v3.AggregateOperatorP95, + v3.AggregateOperatorP99: + op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99: + op := fmt.Sprintf("sum(value)/%d", step) + query := fmt.Sprintf( + queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy, + ) // labels will be same so any should be fine + value := aggregateOperatorToPercentile[mq.AggregateOperator] + + query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, orderWithoutLe) + return query, nil + case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: + op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorCount: + op := "toFloat64(count(*))" + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorCountDistinct: + op := "toFloat64(count(distinct(value)))" + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorNoOp: + queryTmpl := + "SELECT fingerprint, labels as fullLabels," + + " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " any(value) as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " GLOBAL INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableTimeFilter + + " GROUP BY fingerprint, labels, ts" + + " ORDER BY fingerprint, labels, ts" + query := fmt.Sprintf(queryTmpl, step, filterSubQuery) + return query, nil + default: + return "", fmt.Errorf("unsupported aggregate operator") + } +} diff --git a/pkg/query-service/app/metrics/v3/query_builder.go b/pkg/query-service/app/metrics/v3/query_builder.go index 1bff2ae2ba..c416d746f8 100644 --- a/pkg/query-service/app/metrics/v3/query_builder.go +++ b/pkg/query-service/app/metrics/v3/query_builder.go @@ -44,13 +44,19 @@ var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{ } // See https://github.com/SigNoz/signoz/issues/2151#issuecomment-1467249056 -var rateWithoutNegative = `if (runningDifference(value) < 0 OR runningDifference(ts) <= 0, nan, runningDifference(value)/runningDifference(ts))` +var rateWithoutNegative = `if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) ` // buildMetricsTimeSeriesFilterQuery builds the sub-query to be used for filtering // timeseries based on search criteria -func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.AttributeKey, metricName string, aggregateOperator v3.AggregateOperator) (string, error) { +func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.AttributeKey, mq *v3.BuilderQuery) (string, error) { + metricName := mq.AggregateAttribute.Key + aggregateOperator := mq.AggregateOperator var conditions []string - conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(metricName))) + if mq.Temporality == v3.Delta { + conditions = append(conditions, fmt.Sprintf("metric_name = %s AND temporality = '%s' ", utils.ClickHouseFormattedValue(metricName), v3.Delta)) + } else { + conditions = append(conditions, fmt.Sprintf("metric_name = %s AND temporality IN ['%s', '%s']", utils.ClickHouseFormattedValue(metricName), v3.Cumulative, v3.Unspecified)) + } if fs != nil && len(fs.Items) != 0 { for _, item := range fs.Items { @@ -157,7 +163,7 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str } } - filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq.AggregateAttribute.Key, mq.AggregateOperator) + filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq) if err != nil { return "", err } @@ -413,7 +419,13 @@ func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v } func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery) (string, error) { - query, err := buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + var query string + var err error + if mq.Temporality == v3.Delta { + query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + } else { + query, err = buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + } if err != nil { return "", err } diff --git a/pkg/query-service/app/metrics/v3/query_builder_test.go b/pkg/query-service/app/metrics/v3/query_builder_test.go index f3f91129a8..595a8c4557 100644 --- a/pkg/query-service/app/metrics/v3/query_builder_test.go +++ b/pkg/query-service/app/metrics/v3/query_builder_test.go @@ -57,7 +57,7 @@ func TestBuildQueryWithFilters(t *testing.T) { query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"]) require.NoError(t, err) - require.Contains(t, query, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'a') != 'b'") + require.Contains(t, query, "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'a') != 'b'") require.Contains(t, query, rateWithoutNegative) require.Contains(t, query, "not match(JSONExtractString(labels, 'code'), 'ERROR_*')") }) @@ -93,7 +93,7 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) { query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"]) require.NoError(t, err) - require.Contains(t, query, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']") + require.Contains(t, query, "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'in') IN ['a','b','c']") require.Contains(t, query, rateWithoutNegative) }) } @@ -228,7 +228,12 @@ func TestBuildQueryOperators(t *testing.T) { for i, tc := range testCases { t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { - whereClause, err := buildMetricsTimeSeriesFilterQuery(&tc.filterSet, []v3.AttributeKey{}, "signoz_calls_total", "sum") + mq := v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "signoz_calls_total"}, + AggregateOperator: v3.AggregateOperatorSum, + } + whereClause, err := buildMetricsTimeSeriesFilterQuery(&tc.filterSet, []v3.AttributeKey{}, &mq) require.NoError(t, err) require.Contains(t, whereClause, tc.expectedWhereClause) }) @@ -238,7 +243,7 @@ func TestBuildQueryOperators(t *testing.T) { func TestBuildQueryXRate(t *testing.T) { t.Run("TestBuildQueryXRate", func(t *testing.T) { - tmpl := `SELECT ts, %s(value) as value FROM (SELECT ts, if (runningDifference(value) < 0 OR runningDifference(ts) <= 0, nan, runningDifference(value)/runningDifference(ts))as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 0 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'name') as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY GROUPING SETS ( (ts), () ) ORDER BY ts` + tmpl := `SELECT ts, %s(value) as value FROM (SELECT ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 0 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY GROUPING SETS ( (ts), () ) ORDER BY ts` cases := []struct { aggregateOperator v3.AggregateOperator diff --git a/pkg/query-service/app/queryBuilder/query_builder_test.go b/pkg/query-service/app/queryBuilder/query_builder_test.go index aacb895e75..f2679f0aff 100644 --- a/pkg/query-service/app/queryBuilder/query_builder_test.go +++ b/pkg/query-service/app/queryBuilder/query_builder_test.go @@ -51,8 +51,8 @@ func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) { require.NoError(t, err) require.Contains(t, queries["C"], "SELECT A.ts as ts, A.value / B.value") - require.Contains(t, queries["C"], "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']") - require.Contains(t, queries["C"], "runningDifference(value)/runningDifference(ts)") + require.Contains(t, queries["C"], "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'in') IN ['a','b','c']") + require.Contains(t, queries["C"], "runningDifference(value) / runningDifference(ts)") }) } @@ -205,3 +205,164 @@ func TestBuildQueryWithThreeOrMoreQueriesRefAndFormula(t *testing.T) { // So(strings.Count(queries["F5"], " ON "), ShouldEqual, 1) }) } + +func TestDeltaQueryBuilder(t *testing.T) { + cases := []struct { + name string + query *v3.QueryRangeParamsV3 + queryToTest string + expected string + }{ + { + name: "TestQueryWithName - Request rate", + query: &v3.QueryRangeParamsV3{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + DataSource: v3.DataSourceMetrics, + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_count"}, + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorSumRate, + Expression: "A", + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "service_name"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"frontend"}, + }, + { + Key: v3.AttributeKey{Key: "operation"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"HTTP GET /dispatch"}, + }, + }, + }, + }, + }, + }, + }, + queryToTest: "A", + expected: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY ts ORDER BY ts", + }, + { + name: "TestQueryWithExpression - Error rate", + query: &v3.QueryRangeParamsV3{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_count"}, + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorSumRate, + Expression: "A", + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "service_name"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"frontend"}, + }, + { + Key: v3.AttributeKey{Key: "operation"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"HTTP GET /dispatch"}, + }, + { + Key: v3.AttributeKey{Key: "status_code"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"STATUS_CODE_ERROR"}, + }, + }, + }, + }, + "B": { + QueryName: "B", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_count"}, + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorSumRate, + Expression: "B", + Temporality: v3.Delta, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "service_name"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"frontend"}, + }, + { + Key: v3.AttributeKey{Key: "operation"}, + Operator: v3.FilterOperatorIn, + Value: []interface{}{"HTTP GET /dispatch"}, + }, + }, + }, + }, + "C": { + QueryName: "C", + Expression: "A*100/B", + }, + }, + }, + }, + queryToTest: "C", + expected: "SELECT A.ts as ts, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY ts ORDER BY ts) as A GLOBAL INNER JOIN(SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY ts ORDER BY ts) as B ON A.ts = B.ts", + }, + { + name: "TestQuery - Quantile", + query: &v3.QueryRangeParamsV3{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_bucket"}, + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorHistQuant95, + Expression: "A", + Temporality: v3.Delta, + GroupBy: []v3.AttributeKey{ + {Key: "service_name"}, + }, + }, + }, + }, + }, + queryToTest: "A", + expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + qbOptions := QueryBuilderOptions{ + BuildMetricQuery: metricsv3.PrepareMetricQuery, + } + qb := NewQueryBuilder(qbOptions) + queries, err := qb.PrepareQueries(c.query) + + require.NoError(t, err) + require.Equal(t, c.expected, queries[c.queryToTest]) + }) + } +} diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 293d3f8753..7e3a913fac 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -48,6 +48,7 @@ type ServerOptions struct { // alert specific params DisableRules bool RuleRepoURL string + PreferDelta bool } // Server runs HTTP, Mux and a grpc server @@ -125,6 +126,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { apiHandler, err := NewAPIHandler(APIHandlerOpts{ Reader: reader, SkipConfig: skipConfig, + PerferDelta: serverOptions.PreferDelta, AppDao: dao.DB(), RuleManager: rm, FeatureFlags: fm, diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index b6a8015fc0..638de6e4d8 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -56,6 +56,7 @@ type Reader interface { // Setter Interfaces SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) + FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) GetMetricAutocompleteMetricNames(ctx context.Context, matchText string, limit int) (*[]string, *model.ApiError) GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) GetMetricAutocompleteTagValue(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index 9d769a0940..7e8b8f4bf6 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -34,9 +34,12 @@ func main() { // the url used to build link in the alert messages in slack and other systems var ruleRepoURL string + var preferDelta bool + flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") + flag.BoolVar(&preferDelta, "prefer-delta", false, "(prefer delta over gauge)") flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)") flag.Parse() @@ -51,6 +54,7 @@ func main() { HTTPHostPort: constants.HTTPHostPort, PromConfigPath: promConfigPath, SkipTopLvlOpsPath: skipTopLvlOpsPath, + PreferDelta: preferDelta, PrivateHostPort: constants.PrivateHostPort, DisableRules: disableRules, RuleRepoURL: ruleRepoURL, diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index e320cc2877..7647e5a75d 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -417,12 +417,21 @@ func (c *CompositeQuery) Validate() error { return nil } +type Temporality string + +const ( + Unspecified Temporality = "Unspecified" + Delta Temporality = "Delta" + Cumulative Temporality = "Cumulative" +) + type BuilderQuery struct { QueryName string `json:"queryName"` StepInterval int64 `json:"stepInterval"` DataSource DataSource `json:"dataSource"` AggregateOperator AggregateOperator `json:"aggregateOperator"` AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"` + Temporality Temporality `json:"temporality,omitempty"` Filters *FilterSet `json:"filters,omitempty"` GroupBy []AttributeKey `json:"groupBy,omitempty"` Expression string `json:"expression"`