diff --git a/pkg/query-service/app/metrics/v4/helpers/clauses.go b/pkg/query-service/app/metrics/v4/helpers/clauses.go index 2bc3c24ef4..924db92c03 100644 --- a/pkg/query-service/app/metrics/v4/helpers/clauses.go +++ b/pkg/query-service/app/metrics/v4/helpers/clauses.go @@ -2,9 +2,10 @@ package helpers import ( "fmt" - "github.com/SigNoz/signoz/pkg/query-service/utils" "strings" + "github.com/SigNoz/signoz/pkg/query-service/utils" + v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" ) diff --git a/pkg/telemetrymetrics/const.go b/pkg/telemetrymetrics/const.go index 4522534829..59314dc22d 100644 --- a/pkg/telemetrymetrics/const.go +++ b/pkg/telemetrymetrics/const.go @@ -1,6 +1,7 @@ package telemetrymetrics var IntrinsicFields = []string{ + "__normalized", "temporality", "metric_name", "type", diff --git a/pkg/telemetrymetrics/field_mapper.go b/pkg/telemetrymetrics/field_mapper.go index 624dab3089..a7f9b67c09 100644 --- a/pkg/telemetrymetrics/field_mapper.go +++ b/pkg/telemetrymetrics/field_mapper.go @@ -3,6 +3,7 @@ package telemetrymetrics import ( "context" "fmt" + "slices" schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator" qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" @@ -44,12 +45,18 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry switch key.FieldContext { case telemetrytypes.FieldContextResource, telemetrytypes.FieldContextScope, telemetrytypes.FieldContextAttribute: return timeSeriesV4Columns["labels"], nil - case telemetrytypes.FieldContextMetric, telemetrytypes.FieldContextUnspecified: + case telemetrytypes.FieldContextMetric: col, ok := timeSeriesV4Columns[key.Name] if !ok { return nil, qbtypes.ErrColumnNotFound } return col, nil + case telemetrytypes.FieldContextUnspecified: + col, ok := timeSeriesV4Columns[key.Name] + if !ok { + return timeSeriesV4Columns["labels"], nil + } + return col, nil } return nil, qbtypes.ErrColumnNotFound @@ -66,6 +73,11 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr return fmt.Sprintf("JSONExtractString(%s, '%s')", column.Name, key.Name), nil case telemetrytypes.FieldContextMetric: return column.Name, nil + case telemetrytypes.FieldContextUnspecified: + if slices.Contains(IntrinsicFields, key.Name) { + return column.Name, nil + } + return fmt.Sprintf("JSONExtractString(%s, '%s')", column.Name, key.Name), nil } return column.Name, nil diff --git a/pkg/telemetrymetrics/internal/cumulative.go b/pkg/telemetrymetrics/internal/cumulative.go new file mode 100644 index 0000000000..5bf0569ce8 --- /dev/null +++ b/pkg/telemetrymetrics/internal/cumulative.go @@ -0,0 +1 @@ +package internal diff --git a/pkg/telemetrymetrics/internal/delta.go b/pkg/telemetrymetrics/internal/delta.go new file mode 100644 index 0000000000..5bf0569ce8 --- /dev/null +++ b/pkg/telemetrymetrics/internal/delta.go @@ -0,0 +1 @@ +package internal diff --git a/pkg/telemetrymetrics/statement_builder.go b/pkg/telemetrymetrics/statement_builder.go new file mode 100644 index 0000000000..0a274bb9d9 --- /dev/null +++ b/pkg/telemetrymetrics/statement_builder.go @@ -0,0 +1,455 @@ +package telemetrymetrics + +import ( + "context" + "fmt" + "log/slog" + + "github.com/SigNoz/signoz/pkg/querybuilder" + "github.com/SigNoz/signoz/pkg/types/metrictypes" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/huandu/go-sqlbuilder" +) + +const ( + RateWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))` + IncreaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, ((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)) * (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))` +) + +type metricQueryStatementBuilder struct { + logger *slog.Logger + metadataStore telemetrytypes.MetadataStore + fm qbtypes.FieldMapper + cb qbtypes.ConditionBuilder + aggExprRewriter qbtypes.AggExprRewriter +} + +var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*metricQueryStatementBuilder)(nil) + +func NewMetricQueryStatementBuilder( + logger *slog.Logger, + metadataStore telemetrytypes.MetadataStore, + fieldMapper qbtypes.FieldMapper, + conditionBuilder qbtypes.ConditionBuilder, + aggExprRewriter qbtypes.AggExprRewriter, +) *metricQueryStatementBuilder { + return &metricQueryStatementBuilder{ + logger: logger, + metadataStore: metadataStore, + fm: fieldMapper, + cb: conditionBuilder, + aggExprRewriter: aggExprRewriter, + } +} + +func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) []*telemetrytypes.FieldKeySelector { + var keySelectors []*telemetrytypes.FieldKeySelector + if query.Filter != nil && query.Filter.Expression != "" { + whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression) + keySelectors = append(keySelectors, whereClauseSelectors...) + } + for idx := range query.GroupBy { + groupBy := query.GroupBy[idx] + selectors := querybuilder.QueryStringToKeysSelectors(groupBy.TelemetryFieldKey.Name) + keySelectors = append(keySelectors, selectors...) + } + for idx := range query.Order { + keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{ + Name: query.Order[idx].Key.Name, + Signal: telemetrytypes.SignalMetrics, + FieldContext: query.Order[idx].Key.FieldContext, + FieldDataType: query.Order[idx].Key.FieldDataType, + }) + } + for idx := range keySelectors { + keySelectors[idx].Signal = telemetrytypes.SignalMetrics + } + return keySelectors +} + +func (b *metricQueryStatementBuilder) Build( + ctx context.Context, + start uint64, + end uint64, + _ qbtypes.RequestType, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], +) (*qbtypes.Statement, error) { + keySelectors := getKeySelectors(query) + keys, err := b.metadataStore.GetKeysMulti(ctx, keySelectors) + if err != nil { + return nil, err + } + + return b.buildPipelineStatement(ctx, start, end, query, keys) +} + +// Fast‑path (no fingerprint grouping) +func (b *metricQueryStatementBuilder) canShortCircuitDelta(q qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) bool { + if q.Aggregations[0].Temporality != metrictypes.Delta { + return false + } + + ta := q.Aggregations[0].TimeAggregation + sa := q.Aggregations[0].SpaceAggregation + + if (ta == metrictypes.TimeAggregationRate || ta == metrictypes.TimeAggregationIncrease) && sa == metrictypes.SpaceAggregationSum { + return true + } + if ta == metrictypes.TimeAggregationSum && sa == metrictypes.SpaceAggregationSum { + return true + } + if ta == metrictypes.TimeAggregationMin && sa == metrictypes.SpaceAggregationMin { + return true + } + if ta == metrictypes.TimeAggregationMax && sa == metrictypes.SpaceAggregationMax { + return true + } + if q.Aggregations[0].Type == metrictypes.ExpHistogramType && sa.IsPercentile() { + return true + } + return false +} + +func (b *metricQueryStatementBuilder) buildPipelineStatement( + ctx context.Context, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + keys map[string][]*telemetrytypes.TelemetryFieldKey, +) (*qbtypes.Statement, error) { + var ( + cteFragments []string + cteArgs [][]any + ) + + origSpaceAgg := query.Aggregations[0].SpaceAggregation + origTimeAgg := query.Aggregations[0].TimeAggregation + origGroupBy := query.GroupBy + + if query.Aggregations[0].SpaceAggregation.IsPercentile() { + // 1. add le in the group by if doesn't exist + leExists := false + for _, g := range query.GroupBy { + if g.TelemetryFieldKey.Name == "le" { + leExists = true + break + } + } + if !leExists { + query.GroupBy = append(query.GroupBy, qbtypes.GroupByKey{ + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "le"}, + }) + } + + // 2. make the time aggregation rate and space aggregation sum + query.Aggregations[0].TimeAggregation = metrictypes.TimeAggregationSum + query.Aggregations[0].SpaceAggregation = metrictypes.SpaceAggregationSum + } + + // 1. time_series_cte + if frag, args, err := b.buildTimeSeriesCTE(ctx, start, end, query, keys); err != nil { + return nil, err + } else if frag != "" { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + + if b.canShortCircuitDelta(query) { + // 2. spatial_aggregation_cte directly + if frag, args, err := b.buildTemporalAggDeltaFastPath(start, end, query); err != nil { + return nil, err + } else if frag != "" { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + } else { + // 2. temporal_aggregation_cte + if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys); err != nil { + return nil, err + } else if frag != "" { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + + // 3. spatial_aggregation_cte + if frag, args, err := b.buildSpatialAggregationCTE(ctx, start, end, query, keys); err != nil { + return nil, err + } else if frag != "" { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + } + + // reset the query to the original state + query.Aggregations[0].SpaceAggregation = origSpaceAgg + query.Aggregations[0].TimeAggregation = origTimeAgg + query.GroupBy = origGroupBy + + // 4. final SELECT + return b.buildFinalSelect(cteFragments, cteArgs, query) +} + +func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath( + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], +) (string, []any, error) { + stepSec := int64(query.StepInterval.Seconds()) + + sb := sqlbuilder.NewSelectBuilder() + + sb.SelectMore(fmt.Sprintf( + "toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts", + stepSec, + )) + for _, g := range query.GroupBy { + sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) + } + + aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate { + aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec) + } + + if query.Aggregations[0].SpaceAggregation.IsPercentile() { + aggCol = fmt.Sprintf("quantilesDDMerge(0.01, %f)(sketch)[1]", query.Aggregations[0].SpaceAggregation.Percentile()) + } + + sb.SelectMore(fmt.Sprintf("%s AS value", aggCol)) + + tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl)) + sb.JoinWithOption(sqlbuilder.InnerJoin, "__time_series_cte", "points.fingerprint = __time_series_cte.fingerprint") + sb.Where( + sb.In("metric_name", query.Aggregations[0].MetricName), + sb.GTE("unix_milli", start), + sb.LT("unix_milli", end), + ) + sb.GroupBy("ALL") + + q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args, nil +} + +func (b *metricQueryStatementBuilder) buildTimeSeriesCTE( + ctx context.Context, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + keys map[string][]*telemetrytypes.TelemetryFieldKey, +) (string, []any, error) { + sb := sqlbuilder.NewSelectBuilder() + + filterWhere, _, err := querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{ + FieldMapper: b.fm, + ConditionBuilder: b.cb, + FieldKeys: keys, + FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"}, + }) + if err != nil { + return "", nil, err + } + + start, end, tbl := WhichTSTableToUse(start, end, query.Aggregations[0].TableHints) + sb.From(fmt.Sprintf("%s.%s", DBName, tbl)) + + sb.Select("fingerprint") + for _, g := range query.GroupBy { + col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys) + if err != nil { + return "", nil, err + } + sb.SelectMore(col) + } + + sb.Where( + sb.In("metric_name", query.Aggregations[0].MetricName), + sb.GTE("unix_milli", start), + sb.LTE("unix_milli", end), + sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()), + sb.EQ("__normalized", false), // TODO configurable + ) + if filterWhere != nil { + sb.AddWhereClause(filterWhere) + } + + sb.GroupBy("ALL") + + q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return fmt.Sprintf("__time_series_cte AS (%s)", q), args, nil +} + +func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE( + ctx context.Context, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + _ map[string][]*telemetrytypes.TelemetryFieldKey, +) (string, []any, error) { + if query.Aggregations[0].Temporality == metrictypes.Delta { + return b.buildTemporalAggDelta(start, end, query) + } + return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query) +} + +func (b *metricQueryStatementBuilder) buildTemporalAggDelta( + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], +) (string, []any, error) { + stepSec := int64(query.StepInterval.Seconds()) + + sb := sqlbuilder.NewSelectBuilder() + + sb.Select("fingerprint") + sb.SelectMore(fmt.Sprintf( + "toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts", + stepSec, + )) + for _, g := range query.GroupBy { + sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) + } + + aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate { + aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec) + } + + sb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol)) + + tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl)) + sb.JoinWithOption(sqlbuilder.InnerJoin, "__time_series_cte", "points.fingerprint = __time_series_cte.fingerprint") + sb.Where( + sb.In("metric_name", query.Aggregations[0].MetricName), + sb.GTE("unix_milli", start), + sb.LT("unix_milli", end), + ) + sb.GroupBy("ALL") + sb.OrderBy("fingerprint", "ts") + + q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil +} + +func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( + ctx context.Context, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], +) (string, []any, error) { + stepSec := int64(query.StepInterval.Seconds()) + + baseSb := sqlbuilder.NewSelectBuilder() + baseSb.Select("fingerprint") + baseSb.SelectMore(fmt.Sprintf( + "toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts", + stepSec, + )) + for _, g := range query.GroupBy { + baseSb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) + } + + aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol)) + + tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl)) + baseSb.JoinWithOption(sqlbuilder.InnerJoin, "__time_series_cte", "points.fingerprint = __time_series_cte.fingerprint") + baseSb.Where( + baseSb.In("metric_name", query.Aggregations[0].MetricName), + baseSb.GTE("unix_milli", start), + baseSb.LT("unix_milli", end), + ) + baseSb.GroupBy("ALL") + baseSb.OrderBy("fingerprint", "ts") + + innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse) + + switch query.Aggregations[0].TimeAggregation { + case metrictypes.TimeAggregationRate: + rateExpr := fmt.Sprintf(RateWithoutNegative, start) + wrapped := sqlbuilder.NewSelectBuilder() + wrapped.Select("ts") + for _, g := range query.GroupBy { + wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) + } + wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", rateExpr)) + wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery)) + q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...) + return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil + + case metrictypes.TimeAggregationIncrease: + incExpr := fmt.Sprintf(IncreaseWithoutNegative, start, start) + wrapped := sqlbuilder.NewSelectBuilder() + wrapped.Select("ts") + for _, g := range query.GroupBy { + wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) + } + wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", incExpr)) + wrapped.From(fmt.Sprintf("(%s) WINDOW increase_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery)) + q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...) + return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil + default: + return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", innerQuery), innerArgs, nil + } +} + +func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE( + _ context.Context, + _ uint64, + _ uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + _ map[string][]*telemetrytypes.TelemetryFieldKey, +) (string, []any, error) { + sb := sqlbuilder.NewSelectBuilder() + + sb.Select("ts") + for _, g := range query.GroupBy { + sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) + } + sb.SelectMore(fmt.Sprintf("%s(per_series_value) AS value", query.Aggregations[0].SpaceAggregation.StringValue())) + sb.From("__temporal_aggregation_cte") + sb.Where(sb.EQ("isNaN(per_series_value)", 0)) + if query.Aggregations[0].ValueFilter != nil { + sb.Where(sb.EQ("per_series_value", query.Aggregations[0].ValueFilter.Value)) + } + sb.GroupBy("ALL") + + q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args, nil +} + +func (b *metricQueryStatementBuilder) buildFinalSelect( + cteFragments []string, + cteArgs [][]any, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], +) (*qbtypes.Statement, error) { + combined := querybuilder.CombineCTEs(cteFragments) + + var args []any + for _, a := range cteArgs { + args = append(args, a...) + } + + sb := sqlbuilder.NewSelectBuilder() + + var quantile float64 + if query.Aggregations[0].SpaceAggregation.IsPercentile() { + quantile = query.Aggregations[0].SpaceAggregation.Percentile() + } + + if quantile != 0 && query.Aggregations[0].Type != metrictypes.ExpHistogramType { + sb.Select("ts") + for _, g := range query.GroupBy { + sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) + } + sb.SelectMore(fmt.Sprintf( + "histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) AS value", + quantile, + )) + sb.From("__spatial_aggregation_cte") + sb.GroupBy("ALL") + } else { + sb.Select("*") + sb.From("__spatial_aggregation_cte") + } + + q, a := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return &qbtypes.Statement{Query: combined + q, Args: append(args, a...)}, nil +} diff --git a/pkg/telemetrymetrics/stmt_builder_test.go b/pkg/telemetrymetrics/stmt_builder_test.go new file mode 100644 index 0000000000..bae8d9765b --- /dev/null +++ b/pkg/telemetrymetrics/stmt_builder_test.go @@ -0,0 +1,129 @@ +package telemetrymetrics + +import ( + "context" + "fmt" + "log/slog" + "testing" + "time" + + "github.com/SigNoz/signoz/pkg/querybuilder" + "github.com/SigNoz/signoz/pkg/types/metrictypes" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest" + "github.com/stretchr/testify/require" +) + +func TestStatementBuilder(t *testing.T) { + cases := []struct { + name string + requestType qbtypes.RequestType + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation] + expected qbtypes.Statement + expectedErr error + }{ + { + name: "test", + requestType: qbtypes.RequestTypeTimeSeries, + query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Signal: telemetrytypes.SignalMetrics, + StepInterval: qbtypes.Step{Duration: 30 * time.Second}, + Aggregations: []qbtypes.MetricAggregation{ + { + MetricName: "signoz_calls_total", + Type: metrictypes.SumType, + Temporality: metrictypes.Cumulative, + TimeAggregation: metrictypes.TimeAggregationRate, + SpaceAggregation: metrictypes.SpaceAggregationSum, + }, + }, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'cartservice'", + }, + Limit: 10, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service.name", + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL", + Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)}, + }, + expectedErr: nil, + }, + { + name: "test", + requestType: qbtypes.RequestTypeTimeSeries, + query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Signal: telemetrytypes.SignalMetrics, + StepInterval: qbtypes.Step{Duration: 30 * time.Second}, + Aggregations: []qbtypes.MetricAggregation{ + { + MetricName: "signoz_calls_total", + Type: metrictypes.SumType, + Temporality: metrictypes.Delta, + TimeAggregation: metrictypes.TimeAggregationRate, + SpaceAggregation: metrictypes.SpaceAggregationSum, + }, + }, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'cartservice'", + }, + Limit: 10, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service.name", + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL", + Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)}, + }, + expectedErr: nil, + }, + } + + fm := NewFieldMapper() + cb := NewConditionBuilder(fm) + mockMetadataStore := telemetrytypestest.NewMockMetadataStore() + mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() + + aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil) + + statementBuilder := NewMetricQueryStatementBuilder( + slog.Default(), + mockMetadataStore, + fm, + cb, + aggExprRewriter, + ) + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + + q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query) + + if c.expectedErr != nil { + require.Error(t, err) + require.Contains(t, err.Error(), c.expectedErr.Error()) + } else { + + fmt.Println(q.Query) + fmt.Println(q.Args) + + require.NoError(t, err) + require.Equal(t, c.expected.Query, q.Query) + require.Equal(t, c.expected.Args, q.Args) + require.Equal(t, c.expected.Warnings, q.Warnings) + } + }) + } +} diff --git a/pkg/telemetrymetrics/tables.go b/pkg/telemetrymetrics/tables.go index d8a5039f60..e0afc8e02b 100644 --- a/pkg/telemetrymetrics/tables.go +++ b/pkg/telemetrymetrics/tables.go @@ -1,5 +1,11 @@ package telemetrymetrics +import ( + "time" + + "github.com/SigNoz/signoz/pkg/types/metrictypes" +) + const ( DBName = "signoz_metrics" SamplesV4TableName = "distributed_samples_v4" @@ -21,3 +27,242 @@ const ( AttributesMetadataTableName = "distributed_metadata" AttributesMetadataLocalTableName = "metadata" ) + +var ( + oneHourInMilliseconds = uint64(time.Hour.Milliseconds() * 1) + sixHoursInMilliseconds = uint64(time.Hour.Milliseconds() * 6) + oneDayInMilliseconds = uint64(time.Hour.Milliseconds() * 24) + oneWeekInMilliseconds = uint64(oneDayInMilliseconds * 7) +) + +func WhichTSTableToUse( + start, end uint64, + tableHints *metrictypes.MetricTableHints, +) (uint64, uint64, string) { + // if we have a hint for the table, we need to use it + // the hint will be used to override the default table selection logic + if tableHints != nil { + if tableHints.TimeSeriesTableName != "" { + switch tableHints.TimeSeriesTableName { + case TimeseriesV4LocalTableName: + // adjust the start time to nearest 1 hour + start = start - (start % (oneHourInMilliseconds)) + case TimeseriesV46hrsLocalTableName: + // adjust the start time to nearest 6 hours + start = start - (start % (sixHoursInMilliseconds)) + case TimeseriesV41dayLocalTableName: + // adjust the start time to nearest 1 day + start = start - (start % (oneDayInMilliseconds)) + case TimeseriesV41weekLocalTableName: + // adjust the start time to nearest 1 week + start = start - (start % (oneWeekInMilliseconds)) + } + return start, end, tableHints.TimeSeriesTableName + } + } + + // If time range is less than 6 hours, we need to use the `time_series_v4` table + // else if time range is less than 1 day and greater than 6 hours, we need to use the `time_series_v4_6hrs` table + // else if time range is less than 1 week and greater than 1 day, we need to use the `time_series_v4_1day` table + // else we need to use the `time_series_v4_1week` table + var tableName string + if end-start < sixHoursInMilliseconds { + // adjust the start time to nearest 1 hour + start = start - (start % (oneHourInMilliseconds)) + tableName = TimeseriesV4LocalTableName + } else if end-start < oneDayInMilliseconds { + // adjust the start time to nearest 6 hours + start = start - (start % (sixHoursInMilliseconds)) + tableName = TimeseriesV46hrsLocalTableName + } else if end-start < oneWeekInMilliseconds { + // adjust the start time to nearest 1 day + start = start - (start % (oneDayInMilliseconds)) + tableName = TimeseriesV41dayLocalTableName + } else { + // adjust the start time to nearest 1 week + start = start - (start % (oneWeekInMilliseconds)) + tableName = TimeseriesV41weekLocalTableName + } + + return start, end, tableName +} + +// start and end are in milliseconds +// we have three tables for samples +// 1. distributed_samples_v4 +// 2. distributed_samples_v4_agg_5m - for queries with time range above or equal to 1 day and less than 1 week +// 3. distributed_samples_v4_agg_30m - for queries with time range above or equal to 1 week +// if the `timeAggregation` is `count_distinct` we can't use the aggregated tables because they don't support it +func WhichSamplesTableToUse( + start, end uint64, + metricType metrictypes.Type, + timeAggregation metrictypes.TimeAggregation, + tableHints *metrictypes.MetricTableHints, +) string { + + // if we have a hint for the table, we need to use it + // the hint will be used to override the default table selection logic + if tableHints != nil { + if tableHints.SamplesTableName != "" { + return tableHints.SamplesTableName + } + } + + // we don't have any aggregated table for sketches (yet) + if metricType == metrictypes.ExpHistogramType { + return ExpHistogramLocalTableName + } + + // if the time aggregation is count_distinct, we need to use the distributed_samples_v4 table + // because the aggregated tables don't support count_distinct + if timeAggregation == metrictypes.TimeAggregationCountDistinct { + return SamplesV4TableName + } + + if end-start < oneDayInMilliseconds { + return SamplesV4TableName + } else if end-start < oneWeekInMilliseconds { + return SamplesV4Agg5mTableName + } else { + return SamplesV4Agg30mTableName + } +} + +func AggregationColumnForSamplesTable( + start, end uint64, + metricType metrictypes.Type, + temporality metrictypes.Temporality, + timeAggregation metrictypes.TimeAggregation, + tableHints *metrictypes.MetricTableHints, +) string { + tableName := WhichSamplesTableToUse(start, end, metricType, timeAggregation, tableHints) + var aggregationColumn string + switch temporality { + case metrictypes.Delta: + // for delta metrics, we only support `RATE`/`INCREASE` both of which are sum + // although it doesn't make sense to use anyLast, avg, min, max, count on delta metrics, + // we are keeping it here to make sure that query will not be invalid + switch tableName { + case SamplesV4TableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(value)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(value)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "avg(value)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(value)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(value)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "count(value)" + case metrictypes.TimeAggregationCountDistinct: + aggregationColumn = "countDistinct(value)" + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "sum(value)" + } + case SamplesV4Agg5mTableName, SamplesV4Agg30mTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(last)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(min)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(max)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "sum(sum)" + } + } + case metrictypes.Cumulative: + // for cumulative metrics, we only support `RATE`/`INCREASE`. The max value in window is + // used to calculate the sum which is then divided by the window size to get the rate + switch tableName { + case SamplesV4TableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(value)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(value)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "avg(value)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(value)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(value)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "count(value)" + case metrictypes.TimeAggregationCountDistinct: + aggregationColumn = "countDistinct(value)" + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "max(value)" + } + case SamplesV4Agg5mTableName, SamplesV4Agg30mTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(last)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(min)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(max)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "max(max)" + } + } + case metrictypes.Unspecified: + switch tableName { + case SamplesV4TableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(value)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(value)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "avg(value)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(value)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(value)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "count(value)" + case metrictypes.TimeAggregationCountDistinct: + aggregationColumn = "countDistinct(value)" + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen + aggregationColumn = "sum(value)" + } + case SamplesV4Agg5mTableName, SamplesV4Agg30mTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(last)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(min)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(max)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen + aggregationColumn = "sum(sum)" + } + } + } + return aggregationColumn +} diff --git a/pkg/telemetrymetrics/test_data.go b/pkg/telemetrymetrics/test_data.go new file mode 100644 index 0000000000..dfc9e7ecb1 --- /dev/null +++ b/pkg/telemetrymetrics/test_data.go @@ -0,0 +1,30 @@ +package telemetrymetrics + +import ( + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" +) + +func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey { + keysMap := map[string][]*telemetrytypes.TelemetryFieldKey{ + "service.name": { + { + Name: "service.name", + FieldContext: telemetrytypes.FieldContextResource, + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + }, + "http.request.method": { + { + Name: "http.request.method", + FieldContext: telemetrytypes.FieldContextAttribute, + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + }, + } + for _, keys := range keysMap { + for _, key := range keys { + key.Signal = telemetrytypes.SignalMetrics + } + } + return keysMap +} diff --git a/pkg/types/metrictypes/metrictypes.go b/pkg/types/metrictypes/metrictypes.go index a92ac3528e..cd2aa6da84 100644 --- a/pkg/types/metrictypes/metrictypes.go +++ b/pkg/types/metrictypes/metrictypes.go @@ -65,3 +65,49 @@ var ( SpaceAggregationPercentile95 = SpaceAggregation{valuer.NewString("p95")} SpaceAggregationPercentile99 = SpaceAggregation{valuer.NewString("p99")} ) + +func (s SpaceAggregation) IsPercentile() bool { + return s == SpaceAggregationPercentile50 || + s == SpaceAggregationPercentile75 || + s == SpaceAggregationPercentile90 || + s == SpaceAggregationPercentile95 || + s == SpaceAggregationPercentile99 +} + +func (s SpaceAggregation) Percentile() float64 { + switch s { + case SpaceAggregationPercentile50: + return 0.5 + case SpaceAggregationPercentile75: + return 0.75 + case SpaceAggregationPercentile90: + return 0.9 + case SpaceAggregationPercentile95: + return 0.95 + case SpaceAggregationPercentile99: + return 0.99 + default: + return 0 + } +} + +// MetricTableHints is a struct that contains tables to use instead of the derived tables +// from the start and end time +type MetricTableHints struct { + TimeSeriesTableName string + SamplesTableName string +} + +// Certain OTEL metrics encode the state in the value of the metric, which is in general +// a bad modelling (presumably coming from some vendor) and makes it hard to write the aggregation queries +// +// (should have been a key:value with 0, 1 ), example: (pod_status: 0, 1, 2, 3, 4, 5) +// +// they are better modelled as pod_status: (state: running, pending, terminated, etc) +// the value would be 0 or 1, if the value is 1, and the state is pending, then it indicates pod is pending. +// +// however, there have been some metrics that do this, and we need to support them. +// This is workaround for those metrics. +type MetricValueFilter struct { + Value float64 +} diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go b/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go index eec4248ec7..96025c14b3 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go @@ -152,12 +152,18 @@ type LogAggregation struct { type MetricAggregation struct { // metric to query MetricName string `json:"metricName"` + // type of the metric + Type metrictypes.Type `json:"type"` // temporality to apply to the query Temporality metrictypes.Temporality `json:"temporality"` // time aggregation to apply to the query TimeAggregation metrictypes.TimeAggregation `json:"timeAggregation"` // space aggregation to apply to the query SpaceAggregation metrictypes.SpaceAggregation `json:"spaceAggregation"` + // table hints to use for the query + TableHints *metrictypes.MetricTableHints `json:"-"` + // value filter to apply to the query + ValueFilter *metrictypes.MetricValueFilter `json:"-"` } type Filter struct {