From fe77bddb7e39ea81c2d7b24b90672a0f645dd840 Mon Sep 17 00:00:00 2001 From: srikanthccv Date: Mon, 2 Jun 2025 15:15:50 +0530 Subject: [PATCH] chore: cte ch version compatability --- pkg/telemetrymetrics/statement_builder.go | 67 ++++++++++++++--------- pkg/telemetrymetrics/stmt_builder_test.go | 37 +++++++++++-- 2 files changed, 75 insertions(+), 29 deletions(-) diff --git a/pkg/telemetrymetrics/statement_builder.go b/pkg/telemetrymetrics/statement_builder.go index d57d08e242..6e364ec346 100644 --- a/pkg/telemetrymetrics/statement_builder.go +++ b/pkg/telemetrymetrics/statement_builder.go @@ -169,22 +169,23 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement( } // make the time aggregation rate and space aggregation sum - query.Aggregations[0].TimeAggregation = metrictypes.TimeAggregationSum + query.Aggregations[0].TimeAggregation = metrictypes.TimeAggregationRate query.Aggregations[0].SpaceAggregation = metrictypes.SpaceAggregationSum } + var timeSeriesCTE string + var timeSeriesCTEArgs []any + var err error + // time_series_cte // this is applicable for all the queries - if frag, args, err := b.buildTimeSeriesCTE(ctx, start, end, query, keys); err != nil { + if timeSeriesCTE, timeSeriesCTEArgs, err = b.buildTimeSeriesCTE(ctx, start, end, query, keys); err != nil { return nil, err - } else if frag != "" { - cteFragments = append(cteFragments, frag) - cteArgs = append(cteArgs, args) } if b.canShortCircuitDelta(query) { // spatial_aggregation_cte directly for certain delta queries - if frag, args, err := b.buildTemporalAggDeltaFastPath(start, end, query); err != nil { + if frag, args, err := b.buildTemporalAggDeltaFastPath(start, end, query, timeSeriesCTE, timeSeriesCTEArgs); err != nil { return nil, err } else if frag != "" { cteFragments = append(cteFragments, frag) @@ -192,7 +193,7 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement( } } else { // temporal_aggregation_cte - if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys); err != nil { + if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys, timeSeriesCTE, timeSeriesCTEArgs); err != nil { return nil, err } else if frag != "" { cteFragments = append(cteFragments, frag) @@ -220,6 +221,8 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement( func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath( start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + timeSeriesCTE string, + timeSeriesCTEArgs []any, ) (string, []any, error) { stepSec := int64(query.StepInterval.Seconds()) @@ -250,7 +253,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath( tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl)) - sb.JoinWithOption(sqlbuilder.InnerJoin, "__time_series_cte", "points.fingerprint = __time_series_cte.fingerprint") + sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint") sb.Where( sb.In("metric_name", query.Aggregations[0].MetricName), sb.GTE("unix_milli", start), @@ -258,7 +261,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath( ) sb.GroupBy("ALL") - q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...) return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args, nil } @@ -270,14 +273,19 @@ func (b *metricQueryStatementBuilder) buildTimeSeriesCTE( ) (string, []any, error) { sb := sqlbuilder.NewSelectBuilder() - filterWhere, _, err := querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{ - FieldMapper: b.fm, - ConditionBuilder: b.cb, - FieldKeys: keys, - FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"}, - }) - if err != nil { - return "", nil, err + var filterWhere *sqlbuilder.WhereClause + var err error + + if query.Filter != nil && query.Filter.Expression != "" { + filterWhere, _, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{ + FieldMapper: b.fm, + ConditionBuilder: b.cb, + FieldKeys: keys, + FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"}, + }) + if err != nil { + return "", nil, err + } } start, end, tbl := WhichTSTableToUse(start, end, query.Aggregations[0].TableHints) @@ -314,7 +322,7 @@ func (b *metricQueryStatementBuilder) buildTimeSeriesCTE( sb.GroupBy("ALL") q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) - return fmt.Sprintf("__time_series_cte AS (%s)", q), args, nil + return fmt.Sprintf("(%s) AS filtered_time_series", q), args, nil } func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE( @@ -322,17 +330,21 @@ func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE( start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], _ map[string][]*telemetrytypes.TelemetryFieldKey, + timeSeriesCTE string, + timeSeriesCTEArgs []any, ) (string, []any, error) { if query.Aggregations[0].Temporality == metrictypes.Delta { - return b.buildTemporalAggDelta(ctx, start, end, query) + return b.buildTemporalAggDelta(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs) } - return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query) + return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs) } func (b *metricQueryStatementBuilder) buildTemporalAggDelta( _ context.Context, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + timeSeriesCTE string, + timeSeriesCTEArgs []any, ) (string, []any, error) { stepSec := int64(query.StepInterval.Seconds()) @@ -356,7 +368,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDelta( tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl)) - sb.JoinWithOption(sqlbuilder.InnerJoin, "__time_series_cte", "points.fingerprint = __time_series_cte.fingerprint") + sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint") sb.Where( sb.In("metric_name", query.Aggregations[0].MetricName), sb.GTE("unix_milli", start), @@ -365,7 +377,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDelta( sb.GroupBy("ALL") sb.OrderBy("fingerprint", "ts") - q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...) return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil } @@ -373,6 +385,8 @@ func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( _ context.Context, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + timeSeriesCTE string, + timeSeriesCTEArgs []any, ) (string, []any, error) { stepSec := int64(query.StepInterval.Seconds()) @@ -391,7 +405,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl)) - baseSb.JoinWithOption(sqlbuilder.InnerJoin, "__time_series_cte", "points.fingerprint = __time_series_cte.fingerprint") + baseSb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint") baseSb.Where( baseSb.In("metric_name", query.Aggregations[0].MetricName), baseSb.GTE("unix_milli", start), @@ -400,7 +414,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( baseSb.GroupBy("ALL") baseSb.OrderBy("fingerprint", "ts") - innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse) + innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...) switch query.Aggregations[0].TimeAggregation { case metrictypes.TimeAggregationRate: @@ -485,7 +499,10 @@ func (b *metricQueryStatementBuilder) buildFinalSelect( quantile, )) sb.From("__spatial_aggregation_cte") - sb.GroupBy("ALL") + for _, g := range query.GroupBy { + sb.GroupBy(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) + } + sb.GroupBy("ts") } else { sb.Select("*") sb.From("__spatial_aggregation_cte") diff --git a/pkg/telemetrymetrics/stmt_builder_test.go b/pkg/telemetrymetrics/stmt_builder_test.go index d6122c5ae8..68ed605cbc 100644 --- a/pkg/telemetrymetrics/stmt_builder_test.go +++ b/pkg/telemetrymetrics/stmt_builder_test.go @@ -50,7 +50,7 @@ func TestStatementBuilder(t *testing.T) { }, }, expected: qbtypes.Statement{ - Query: "WITH __time_series_cte AS (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL), __temporal_aggregation_cte AS (SELECT ts, `service.name`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN __time_series_cte ON points.fingerprint = __time_series_cte.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte", + Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte", Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983448000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947419000), uint64(1747983448000), 0}, }, expectedErr: nil, @@ -83,7 +83,7 @@ func TestStatementBuilder(t *testing.T) { }, }, expected: qbtypes.Statement{ - Query: "WITH __time_series_cte AS (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL), __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN __time_series_cte ON points.fingerprint = __time_series_cte.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte", + Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte", Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983448000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947419000), uint64(1747983448000)}, }, expectedErr: nil, @@ -115,7 +115,7 @@ func TestStatementBuilder(t *testing.T) { }, }, expected: qbtypes.Statement{ - Query: "WITH __time_series_cte AS (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL), __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value) AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN __time_series_cte ON points.fingerprint = __time_series_cte.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY ALL", + Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts", Args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983448000), "delta", false, "cartservice", "signoz_latency", uint64(1747947419000), uint64(1747983448000)}, }, expectedErr: nil, @@ -148,11 +148,40 @@ func TestStatementBuilder(t *testing.T) { }, }, expected: qbtypes.Statement{ - Query: "WITH __time_series_cte AS (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY ALL), __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN __time_series_cte ON points.fingerprint = __time_series_cte.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte", + Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte", Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983448000), false, "big-data-node-1", "system.memory.usage", uint64(1747947419000), uint64(1747983448000), 0}, }, expectedErr: nil, }, + { + name: "test_histogram_percentile", + requestType: qbtypes.RequestTypeTimeSeries, + query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Signal: telemetrytypes.SignalMetrics, + StepInterval: qbtypes.Step{Duration: 30 * time.Second}, + Aggregations: []qbtypes.MetricAggregation{ + { + MetricName: "http_server_duration_bucket", + Type: metrictypes.HistogramType, + Temporality: metrictypes.Cumulative, + SpaceAggregation: metrictypes.SpaceAggregationPercentile95, + }, + }, + Limit: 10, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service.name", + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts", + Args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983448000), "cumulative", false, "http_server_duration_bucket", uint64(1747947419000), uint64(1747983448000), 0}, + }, + expectedErr: nil, + }, } fm := NewFieldMapper()