mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-14 13:46:00 +08:00
chore: cte ch version compatability
This commit is contained in:
parent
9fd7a1807a
commit
fe77bddb7e
@ -169,22 +169,23 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// make the time aggregation rate and space aggregation sum
|
// make the time aggregation rate and space aggregation sum
|
||||||
query.Aggregations[0].TimeAggregation = metrictypes.TimeAggregationSum
|
query.Aggregations[0].TimeAggregation = metrictypes.TimeAggregationRate
|
||||||
query.Aggregations[0].SpaceAggregation = metrictypes.SpaceAggregationSum
|
query.Aggregations[0].SpaceAggregation = metrictypes.SpaceAggregationSum
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var timeSeriesCTE string
|
||||||
|
var timeSeriesCTEArgs []any
|
||||||
|
var err error
|
||||||
|
|
||||||
// time_series_cte
|
// time_series_cte
|
||||||
// this is applicable for all the queries
|
// this is applicable for all the queries
|
||||||
if frag, args, err := b.buildTimeSeriesCTE(ctx, start, end, query, keys); err != nil {
|
if timeSeriesCTE, timeSeriesCTEArgs, err = b.buildTimeSeriesCTE(ctx, start, end, query, keys); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if frag != "" {
|
|
||||||
cteFragments = append(cteFragments, frag)
|
|
||||||
cteArgs = append(cteArgs, args)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.canShortCircuitDelta(query) {
|
if b.canShortCircuitDelta(query) {
|
||||||
// spatial_aggregation_cte directly for certain delta queries
|
// spatial_aggregation_cte directly for certain delta queries
|
||||||
if frag, args, err := b.buildTemporalAggDeltaFastPath(start, end, query); err != nil {
|
if frag, args, err := b.buildTemporalAggDeltaFastPath(start, end, query, timeSeriesCTE, timeSeriesCTEArgs); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if frag != "" {
|
} else if frag != "" {
|
||||||
cteFragments = append(cteFragments, frag)
|
cteFragments = append(cteFragments, frag)
|
||||||
@ -192,7 +193,7 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement(
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// temporal_aggregation_cte
|
// temporal_aggregation_cte
|
||||||
if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys); err != nil {
|
if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys, timeSeriesCTE, timeSeriesCTEArgs); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if frag != "" {
|
} else if frag != "" {
|
||||||
cteFragments = append(cteFragments, frag)
|
cteFragments = append(cteFragments, frag)
|
||||||
@ -220,6 +221,8 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement(
|
|||||||
func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
|
func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
|
||||||
start, end uint64,
|
start, end uint64,
|
||||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||||
|
timeSeriesCTE string,
|
||||||
|
timeSeriesCTEArgs []any,
|
||||||
) (string, []any, error) {
|
) (string, []any, error) {
|
||||||
stepSec := int64(query.StepInterval.Seconds())
|
stepSec := int64(query.StepInterval.Seconds())
|
||||||
|
|
||||||
@ -250,7 +253,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
|
|||||||
|
|
||||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||||
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||||
sb.JoinWithOption(sqlbuilder.InnerJoin, "__time_series_cte", "points.fingerprint = __time_series_cte.fingerprint")
|
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
|
||||||
sb.Where(
|
sb.Where(
|
||||||
sb.In("metric_name", query.Aggregations[0].MetricName),
|
sb.In("metric_name", query.Aggregations[0].MetricName),
|
||||||
sb.GTE("unix_milli", start),
|
sb.GTE("unix_milli", start),
|
||||||
@ -258,7 +261,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
|
|||||||
)
|
)
|
||||||
sb.GroupBy("ALL")
|
sb.GroupBy("ALL")
|
||||||
|
|
||||||
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
|
||||||
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args, nil
|
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -270,14 +273,19 @@ func (b *metricQueryStatementBuilder) buildTimeSeriesCTE(
|
|||||||
) (string, []any, error) {
|
) (string, []any, error) {
|
||||||
sb := sqlbuilder.NewSelectBuilder()
|
sb := sqlbuilder.NewSelectBuilder()
|
||||||
|
|
||||||
filterWhere, _, err := querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
var filterWhere *sqlbuilder.WhereClause
|
||||||
FieldMapper: b.fm,
|
var err error
|
||||||
ConditionBuilder: b.cb,
|
|
||||||
FieldKeys: keys,
|
if query.Filter != nil && query.Filter.Expression != "" {
|
||||||
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
|
filterWhere, _, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
||||||
})
|
FieldMapper: b.fm,
|
||||||
if err != nil {
|
ConditionBuilder: b.cb,
|
||||||
return "", nil, err
|
FieldKeys: keys,
|
||||||
|
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
start, end, tbl := WhichTSTableToUse(start, end, query.Aggregations[0].TableHints)
|
start, end, tbl := WhichTSTableToUse(start, end, query.Aggregations[0].TableHints)
|
||||||
@ -314,7 +322,7 @@ func (b *metricQueryStatementBuilder) buildTimeSeriesCTE(
|
|||||||
sb.GroupBy("ALL")
|
sb.GroupBy("ALL")
|
||||||
|
|
||||||
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||||
return fmt.Sprintf("__time_series_cte AS (%s)", q), args, nil
|
return fmt.Sprintf("(%s) AS filtered_time_series", q), args, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE(
|
func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE(
|
||||||
@ -322,17 +330,21 @@ func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE(
|
|||||||
start, end uint64,
|
start, end uint64,
|
||||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||||
_ map[string][]*telemetrytypes.TelemetryFieldKey,
|
_ map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||||
|
timeSeriesCTE string,
|
||||||
|
timeSeriesCTEArgs []any,
|
||||||
) (string, []any, error) {
|
) (string, []any, error) {
|
||||||
if query.Aggregations[0].Temporality == metrictypes.Delta {
|
if query.Aggregations[0].Temporality == metrictypes.Delta {
|
||||||
return b.buildTemporalAggDelta(ctx, start, end, query)
|
return b.buildTemporalAggDelta(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||||
}
|
}
|
||||||
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query)
|
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *metricQueryStatementBuilder) buildTemporalAggDelta(
|
func (b *metricQueryStatementBuilder) buildTemporalAggDelta(
|
||||||
_ context.Context,
|
_ context.Context,
|
||||||
start, end uint64,
|
start, end uint64,
|
||||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||||
|
timeSeriesCTE string,
|
||||||
|
timeSeriesCTEArgs []any,
|
||||||
) (string, []any, error) {
|
) (string, []any, error) {
|
||||||
stepSec := int64(query.StepInterval.Seconds())
|
stepSec := int64(query.StepInterval.Seconds())
|
||||||
|
|
||||||
@ -356,7 +368,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDelta(
|
|||||||
|
|
||||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||||
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||||
sb.JoinWithOption(sqlbuilder.InnerJoin, "__time_series_cte", "points.fingerprint = __time_series_cte.fingerprint")
|
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
|
||||||
sb.Where(
|
sb.Where(
|
||||||
sb.In("metric_name", query.Aggregations[0].MetricName),
|
sb.In("metric_name", query.Aggregations[0].MetricName),
|
||||||
sb.GTE("unix_milli", start),
|
sb.GTE("unix_milli", start),
|
||||||
@ -365,7 +377,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDelta(
|
|||||||
sb.GroupBy("ALL")
|
sb.GroupBy("ALL")
|
||||||
sb.OrderBy("fingerprint", "ts")
|
sb.OrderBy("fingerprint", "ts")
|
||||||
|
|
||||||
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
|
||||||
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
|
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -373,6 +385,8 @@ func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
|||||||
_ context.Context,
|
_ context.Context,
|
||||||
start, end uint64,
|
start, end uint64,
|
||||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||||
|
timeSeriesCTE string,
|
||||||
|
timeSeriesCTEArgs []any,
|
||||||
) (string, []any, error) {
|
) (string, []any, error) {
|
||||||
stepSec := int64(query.StepInterval.Seconds())
|
stepSec := int64(query.StepInterval.Seconds())
|
||||||
|
|
||||||
@ -391,7 +405,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
|||||||
|
|
||||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||||
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||||
baseSb.JoinWithOption(sqlbuilder.InnerJoin, "__time_series_cte", "points.fingerprint = __time_series_cte.fingerprint")
|
baseSb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
|
||||||
baseSb.Where(
|
baseSb.Where(
|
||||||
baseSb.In("metric_name", query.Aggregations[0].MetricName),
|
baseSb.In("metric_name", query.Aggregations[0].MetricName),
|
||||||
baseSb.GTE("unix_milli", start),
|
baseSb.GTE("unix_milli", start),
|
||||||
@ -400,7 +414,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
|||||||
baseSb.GroupBy("ALL")
|
baseSb.GroupBy("ALL")
|
||||||
baseSb.OrderBy("fingerprint", "ts")
|
baseSb.OrderBy("fingerprint", "ts")
|
||||||
|
|
||||||
innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
|
||||||
|
|
||||||
switch query.Aggregations[0].TimeAggregation {
|
switch query.Aggregations[0].TimeAggregation {
|
||||||
case metrictypes.TimeAggregationRate:
|
case metrictypes.TimeAggregationRate:
|
||||||
@ -485,7 +499,10 @@ func (b *metricQueryStatementBuilder) buildFinalSelect(
|
|||||||
quantile,
|
quantile,
|
||||||
))
|
))
|
||||||
sb.From("__spatial_aggregation_cte")
|
sb.From("__spatial_aggregation_cte")
|
||||||
sb.GroupBy("ALL")
|
for _, g := range query.GroupBy {
|
||||||
|
sb.GroupBy(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||||
|
}
|
||||||
|
sb.GroupBy("ts")
|
||||||
} else {
|
} else {
|
||||||
sb.Select("*")
|
sb.Select("*")
|
||||||
sb.From("__spatial_aggregation_cte")
|
sb.From("__spatial_aggregation_cte")
|
||||||
|
@ -50,7 +50,7 @@ func TestStatementBuilder(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
expected: qbtypes.Statement{
|
expected: qbtypes.Statement{
|
||||||
Query: "WITH __time_series_cte AS (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL), __temporal_aggregation_cte AS (SELECT ts, `service.name`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN __time_series_cte ON points.fingerprint = __time_series_cte.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte",
|
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte",
|
||||||
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983448000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947419000), uint64(1747983448000), 0},
|
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983448000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947419000), uint64(1747983448000), 0},
|
||||||
},
|
},
|
||||||
expectedErr: nil,
|
expectedErr: nil,
|
||||||
@ -83,7 +83,7 @@ func TestStatementBuilder(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
expected: qbtypes.Statement{
|
expected: qbtypes.Statement{
|
||||||
Query: "WITH __time_series_cte AS (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL), __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN __time_series_cte ON points.fingerprint = __time_series_cte.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte",
|
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte",
|
||||||
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983448000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947419000), uint64(1747983448000)},
|
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983448000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947419000), uint64(1747983448000)},
|
||||||
},
|
},
|
||||||
expectedErr: nil,
|
expectedErr: nil,
|
||||||
@ -115,7 +115,7 @@ func TestStatementBuilder(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
expected: qbtypes.Statement{
|
expected: qbtypes.Statement{
|
||||||
Query: "WITH __time_series_cte AS (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL), __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value) AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN __time_series_cte ON points.fingerprint = __time_series_cte.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY ALL",
|
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts",
|
||||||
Args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983448000), "delta", false, "cartservice", "signoz_latency", uint64(1747947419000), uint64(1747983448000)},
|
Args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983448000), "delta", false, "cartservice", "signoz_latency", uint64(1747947419000), uint64(1747983448000)},
|
||||||
},
|
},
|
||||||
expectedErr: nil,
|
expectedErr: nil,
|
||||||
@ -148,11 +148,40 @@ func TestStatementBuilder(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
expected: qbtypes.Statement{
|
expected: qbtypes.Statement{
|
||||||
Query: "WITH __time_series_cte AS (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY ALL), __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN __time_series_cte ON points.fingerprint = __time_series_cte.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte",
|
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte",
|
||||||
Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983448000), false, "big-data-node-1", "system.memory.usage", uint64(1747947419000), uint64(1747983448000), 0},
|
Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983448000), false, "big-data-node-1", "system.memory.usage", uint64(1747947419000), uint64(1747983448000), 0},
|
||||||
},
|
},
|
||||||
expectedErr: nil,
|
expectedErr: nil,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "test_histogram_percentile",
|
||||||
|
requestType: qbtypes.RequestTypeTimeSeries,
|
||||||
|
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||||
|
Signal: telemetrytypes.SignalMetrics,
|
||||||
|
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||||
|
Aggregations: []qbtypes.MetricAggregation{
|
||||||
|
{
|
||||||
|
MetricName: "http_server_duration_bucket",
|
||||||
|
Type: metrictypes.HistogramType,
|
||||||
|
Temporality: metrictypes.Cumulative,
|
||||||
|
SpaceAggregation: metrictypes.SpaceAggregationPercentile95,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Limit: 10,
|
||||||
|
GroupBy: []qbtypes.GroupByKey{
|
||||||
|
{
|
||||||
|
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||||
|
Name: "service.name",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expected: qbtypes.Statement{
|
||||||
|
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts",
|
||||||
|
Args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983448000), "cumulative", false, "http_server_duration_bucket", uint64(1747947419000), uint64(1747983448000), 0},
|
||||||
|
},
|
||||||
|
expectedErr: nil,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
fm := NewFieldMapper()
|
fm := NewFieldMapper()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user