chore: metric statement builder

This commit is contained in:
srikanthccv 2025-05-30 15:24:05 +05:30
parent 9e27b844db
commit 16fe71c3e5
11 changed files with 929 additions and 2 deletions

View File

@ -2,9 +2,10 @@ package helpers
import (
"fmt"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"strings"
"github.com/SigNoz/signoz/pkg/query-service/utils"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
)

View File

@ -1,6 +1,7 @@
package telemetrymetrics
var IntrinsicFields = []string{
"__normalized",
"temporality",
"metric_name",
"type",

View File

@ -3,6 +3,7 @@ package telemetrymetrics
import (
"context"
"fmt"
"slices"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@ -44,12 +45,18 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
switch key.FieldContext {
case telemetrytypes.FieldContextResource, telemetrytypes.FieldContextScope, telemetrytypes.FieldContextAttribute:
return timeSeriesV4Columns["labels"], nil
case telemetrytypes.FieldContextMetric, telemetrytypes.FieldContextUnspecified:
case telemetrytypes.FieldContextMetric:
col, ok := timeSeriesV4Columns[key.Name]
if !ok {
return nil, qbtypes.ErrColumnNotFound
}
return col, nil
case telemetrytypes.FieldContextUnspecified:
col, ok := timeSeriesV4Columns[key.Name]
if !ok {
return timeSeriesV4Columns["labels"], nil
}
return col, nil
}
return nil, qbtypes.ErrColumnNotFound
@ -66,6 +73,11 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
return fmt.Sprintf("JSONExtractString(%s, '%s')", column.Name, key.Name), nil
case telemetrytypes.FieldContextMetric:
return column.Name, nil
case telemetrytypes.FieldContextUnspecified:
if slices.Contains(IntrinsicFields, key.Name) {
return column.Name, nil
}
return fmt.Sprintf("JSONExtractString(%s, '%s')", column.Name, key.Name), nil
}
return column.Name, nil

View File

@ -0,0 +1 @@
package internal

View File

@ -0,0 +1 @@
package internal

View File

@ -0,0 +1,455 @@
package telemetrymetrics
import (
"context"
"fmt"
"log/slog"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
const (
RateWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
IncreaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, ((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)) * (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
)
type metricQueryStatementBuilder struct {
logger *slog.Logger
metadataStore telemetrytypes.MetadataStore
fm qbtypes.FieldMapper
cb qbtypes.ConditionBuilder
aggExprRewriter qbtypes.AggExprRewriter
}
var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*metricQueryStatementBuilder)(nil)
func NewMetricQueryStatementBuilder(
logger *slog.Logger,
metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
aggExprRewriter qbtypes.AggExprRewriter,
) *metricQueryStatementBuilder {
return &metricQueryStatementBuilder{
logger: logger,
metadataStore: metadataStore,
fm: fieldMapper,
cb: conditionBuilder,
aggExprRewriter: aggExprRewriter,
}
}
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) []*telemetrytypes.FieldKeySelector {
var keySelectors []*telemetrytypes.FieldKeySelector
if query.Filter != nil && query.Filter.Expression != "" {
whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression)
keySelectors = append(keySelectors, whereClauseSelectors...)
}
for idx := range query.GroupBy {
groupBy := query.GroupBy[idx]
selectors := querybuilder.QueryStringToKeysSelectors(groupBy.TelemetryFieldKey.Name)
keySelectors = append(keySelectors, selectors...)
}
for idx := range query.Order {
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
Name: query.Order[idx].Key.Name,
Signal: telemetrytypes.SignalMetrics,
FieldContext: query.Order[idx].Key.FieldContext,
FieldDataType: query.Order[idx].Key.FieldDataType,
})
}
for idx := range keySelectors {
keySelectors[idx].Signal = telemetrytypes.SignalMetrics
}
return keySelectors
}
func (b *metricQueryStatementBuilder) Build(
ctx context.Context,
start uint64,
end uint64,
_ qbtypes.RequestType,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
) (*qbtypes.Statement, error) {
keySelectors := getKeySelectors(query)
keys, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
return b.buildPipelineStatement(ctx, start, end, query, keys)
}
// Fastpath (no fingerprint grouping)
func (b *metricQueryStatementBuilder) canShortCircuitDelta(q qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) bool {
if q.Aggregations[0].Temporality != metrictypes.Delta {
return false
}
ta := q.Aggregations[0].TimeAggregation
sa := q.Aggregations[0].SpaceAggregation
if (ta == metrictypes.TimeAggregationRate || ta == metrictypes.TimeAggregationIncrease) && sa == metrictypes.SpaceAggregationSum {
return true
}
if ta == metrictypes.TimeAggregationSum && sa == metrictypes.SpaceAggregationSum {
return true
}
if ta == metrictypes.TimeAggregationMin && sa == metrictypes.SpaceAggregationMin {
return true
}
if ta == metrictypes.TimeAggregationMax && sa == metrictypes.SpaceAggregationMax {
return true
}
if q.Aggregations[0].Type == metrictypes.ExpHistogramType && sa.IsPercentile() {
return true
}
return false
}
func (b *metricQueryStatementBuilder) buildPipelineStatement(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
) (*qbtypes.Statement, error) {
var (
cteFragments []string
cteArgs [][]any
)
origSpaceAgg := query.Aggregations[0].SpaceAggregation
origTimeAgg := query.Aggregations[0].TimeAggregation
origGroupBy := query.GroupBy
if query.Aggregations[0].SpaceAggregation.IsPercentile() {
// 1. add le in the group by if doesn't exist
leExists := false
for _, g := range query.GroupBy {
if g.TelemetryFieldKey.Name == "le" {
leExists = true
break
}
}
if !leExists {
query.GroupBy = append(query.GroupBy, qbtypes.GroupByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "le"},
})
}
// 2. make the time aggregation rate and space aggregation sum
query.Aggregations[0].TimeAggregation = metrictypes.TimeAggregationSum
query.Aggregations[0].SpaceAggregation = metrictypes.SpaceAggregationSum
}
// 1. time_series_cte
if frag, args, err := b.buildTimeSeriesCTE(ctx, start, end, query, keys); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
if b.canShortCircuitDelta(query) {
// 2. spatial_aggregation_cte directly
if frag, args, err := b.buildTemporalAggDeltaFastPath(start, end, query); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
} else {
// 2. temporal_aggregation_cte
if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
// 3. spatial_aggregation_cte
if frag, args, err := b.buildSpatialAggregationCTE(ctx, start, end, query, keys); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
}
// reset the query to the original state
query.Aggregations[0].SpaceAggregation = origSpaceAgg
query.Aggregations[0].TimeAggregation = origTimeAgg
query.GroupBy = origGroupBy
// 4. final SELECT
return b.buildFinalSelect(cteFragments, cteArgs, query)
}
func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
) (string, []any, error) {
stepSec := int64(query.StepInterval.Seconds())
sb := sqlbuilder.NewSelectBuilder()
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
))
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec)
}
if query.Aggregations[0].SpaceAggregation.IsPercentile() {
aggCol = fmt.Sprintf("quantilesDDMerge(0.01, %f)(sketch)[1]", query.Aggregations[0].SpaceAggregation.Percentile())
}
sb.SelectMore(fmt.Sprintf("%s AS value", aggCol))
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
sb.JoinWithOption(sqlbuilder.InnerJoin, "__time_series_cte", "points.fingerprint = __time_series_cte.fingerprint")
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
sb.GTE("unix_milli", start),
sb.LT("unix_milli", end),
)
sb.GroupBy("ALL")
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args, nil
}
func (b *metricQueryStatementBuilder) buildTimeSeriesCTE(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, []any, error) {
sb := sqlbuilder.NewSelectBuilder()
filterWhere, _, err := querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
})
if err != nil {
return "", nil, err
}
start, end, tbl := WhichTSTableToUse(start, end, query.Aggregations[0].TableHints)
sb.From(fmt.Sprintf("%s.%s", DBName, tbl))
sb.Select("fingerprint")
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
if err != nil {
return "", nil, err
}
sb.SelectMore(col)
}
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
sb.GTE("unix_milli", start),
sb.LTE("unix_milli", end),
sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()),
sb.EQ("__normalized", false), // TODO configurable
)
if filterWhere != nil {
sb.AddWhereClause(filterWhere)
}
sb.GroupBy("ALL")
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__time_series_cte AS (%s)", q), args, nil
}
func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
_ map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, []any, error) {
if query.Aggregations[0].Temporality == metrictypes.Delta {
return b.buildTemporalAggDelta(start, end, query)
}
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query)
}
func (b *metricQueryStatementBuilder) buildTemporalAggDelta(
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
) (string, []any, error) {
stepSec := int64(query.StepInterval.Seconds())
sb := sqlbuilder.NewSelectBuilder()
sb.Select("fingerprint")
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
))
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec)
}
sb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
sb.JoinWithOption(sqlbuilder.InnerJoin, "__time_series_cte", "points.fingerprint = __time_series_cte.fingerprint")
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
sb.GTE("unix_milli", start),
sb.LT("unix_milli", end),
)
sb.GroupBy("ALL")
sb.OrderBy("fingerprint", "ts")
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
}
func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
) (string, []any, error) {
stepSec := int64(query.StepInterval.Seconds())
baseSb := sqlbuilder.NewSelectBuilder()
baseSb.Select("fingerprint")
baseSb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
))
for _, g := range query.GroupBy {
baseSb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
baseSb.JoinWithOption(sqlbuilder.InnerJoin, "__time_series_cte", "points.fingerprint = __time_series_cte.fingerprint")
baseSb.Where(
baseSb.In("metric_name", query.Aggregations[0].MetricName),
baseSb.GTE("unix_milli", start),
baseSb.LT("unix_milli", end),
)
baseSb.GroupBy("ALL")
baseSb.OrderBy("fingerprint", "ts")
innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse)
switch query.Aggregations[0].TimeAggregation {
case metrictypes.TimeAggregationRate:
rateExpr := fmt.Sprintf(RateWithoutNegative, start)
wrapped := sqlbuilder.NewSelectBuilder()
wrapped.Select("ts")
for _, g := range query.GroupBy {
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", rateExpr))
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
case metrictypes.TimeAggregationIncrease:
incExpr := fmt.Sprintf(IncreaseWithoutNegative, start, start)
wrapped := sqlbuilder.NewSelectBuilder()
wrapped.Select("ts")
for _, g := range query.GroupBy {
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", incExpr))
wrapped.From(fmt.Sprintf("(%s) WINDOW increase_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
default:
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", innerQuery), innerArgs, nil
}
}
func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE(
_ context.Context,
_ uint64,
_ uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
_ map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, []any, error) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ts")
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
sb.SelectMore(fmt.Sprintf("%s(per_series_value) AS value", query.Aggregations[0].SpaceAggregation.StringValue()))
sb.From("__temporal_aggregation_cte")
sb.Where(sb.EQ("isNaN(per_series_value)", 0))
if query.Aggregations[0].ValueFilter != nil {
sb.Where(sb.EQ("per_series_value", query.Aggregations[0].ValueFilter.Value))
}
sb.GroupBy("ALL")
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args, nil
}
func (b *metricQueryStatementBuilder) buildFinalSelect(
cteFragments []string,
cteArgs [][]any,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
) (*qbtypes.Statement, error) {
combined := querybuilder.CombineCTEs(cteFragments)
var args []any
for _, a := range cteArgs {
args = append(args, a...)
}
sb := sqlbuilder.NewSelectBuilder()
var quantile float64
if query.Aggregations[0].SpaceAggregation.IsPercentile() {
quantile = query.Aggregations[0].SpaceAggregation.Percentile()
}
if quantile != 0 && query.Aggregations[0].Type != metrictypes.ExpHistogramType {
sb.Select("ts")
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
sb.SelectMore(fmt.Sprintf(
"histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) AS value",
quantile,
))
sb.From("__spatial_aggregation_cte")
sb.GroupBy("ALL")
} else {
sb.Select("*")
sb.From("__spatial_aggregation_cte")
}
q, a := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return &qbtypes.Statement{Query: combined + q, Args: append(args, a...)}, nil
}

View File

@ -0,0 +1,129 @@
package telemetrymetrics
import (
"context"
"fmt"
"log/slog"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/require"
)
func TestStatementBuilder(t *testing.T) {
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
expected qbtypes.Statement
expectedErr error
}{
{
name: "test",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_calls_total",
Type: metrictypes.SumType,
Temporality: metrictypes.Cumulative,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL",
Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
},
expectedErr: nil,
},
{
name: "test",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_calls_total",
Type: metrictypes.SumType,
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL",
Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
},
expectedErr: nil,
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil)
statementBuilder := NewMetricQueryStatementBuilder(
slog.Default(),
mockMetadataStore,
fm,
cb,
aggExprRewriter,
)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query)
if c.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr.Error())
} else {
fmt.Println(q.Query)
fmt.Println(q.Args)
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
}
}

View File

@ -1,5 +1,11 @@
package telemetrymetrics
import (
"time"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
)
const (
DBName = "signoz_metrics"
SamplesV4TableName = "distributed_samples_v4"
@ -21,3 +27,242 @@ const (
AttributesMetadataTableName = "distributed_metadata"
AttributesMetadataLocalTableName = "metadata"
)
var (
oneHourInMilliseconds = uint64(time.Hour.Milliseconds() * 1)
sixHoursInMilliseconds = uint64(time.Hour.Milliseconds() * 6)
oneDayInMilliseconds = uint64(time.Hour.Milliseconds() * 24)
oneWeekInMilliseconds = uint64(oneDayInMilliseconds * 7)
)
func WhichTSTableToUse(
start, end uint64,
tableHints *metrictypes.MetricTableHints,
) (uint64, uint64, string) {
// if we have a hint for the table, we need to use it
// the hint will be used to override the default table selection logic
if tableHints != nil {
if tableHints.TimeSeriesTableName != "" {
switch tableHints.TimeSeriesTableName {
case TimeseriesV4LocalTableName:
// adjust the start time to nearest 1 hour
start = start - (start % (oneHourInMilliseconds))
case TimeseriesV46hrsLocalTableName:
// adjust the start time to nearest 6 hours
start = start - (start % (sixHoursInMilliseconds))
case TimeseriesV41dayLocalTableName:
// adjust the start time to nearest 1 day
start = start - (start % (oneDayInMilliseconds))
case TimeseriesV41weekLocalTableName:
// adjust the start time to nearest 1 week
start = start - (start % (oneWeekInMilliseconds))
}
return start, end, tableHints.TimeSeriesTableName
}
}
// If time range is less than 6 hours, we need to use the `time_series_v4` table
// else if time range is less than 1 day and greater than 6 hours, we need to use the `time_series_v4_6hrs` table
// else if time range is less than 1 week and greater than 1 day, we need to use the `time_series_v4_1day` table
// else we need to use the `time_series_v4_1week` table
var tableName string
if end-start < sixHoursInMilliseconds {
// adjust the start time to nearest 1 hour
start = start - (start % (oneHourInMilliseconds))
tableName = TimeseriesV4LocalTableName
} else if end-start < oneDayInMilliseconds {
// adjust the start time to nearest 6 hours
start = start - (start % (sixHoursInMilliseconds))
tableName = TimeseriesV46hrsLocalTableName
} else if end-start < oneWeekInMilliseconds {
// adjust the start time to nearest 1 day
start = start - (start % (oneDayInMilliseconds))
tableName = TimeseriesV41dayLocalTableName
} else {
// adjust the start time to nearest 1 week
start = start - (start % (oneWeekInMilliseconds))
tableName = TimeseriesV41weekLocalTableName
}
return start, end, tableName
}
// start and end are in milliseconds
// we have three tables for samples
// 1. distributed_samples_v4
// 2. distributed_samples_v4_agg_5m - for queries with time range above or equal to 1 day and less than 1 week
// 3. distributed_samples_v4_agg_30m - for queries with time range above or equal to 1 week
// if the `timeAggregation` is `count_distinct` we can't use the aggregated tables because they don't support it
func WhichSamplesTableToUse(
start, end uint64,
metricType metrictypes.Type,
timeAggregation metrictypes.TimeAggregation,
tableHints *metrictypes.MetricTableHints,
) string {
// if we have a hint for the table, we need to use it
// the hint will be used to override the default table selection logic
if tableHints != nil {
if tableHints.SamplesTableName != "" {
return tableHints.SamplesTableName
}
}
// we don't have any aggregated table for sketches (yet)
if metricType == metrictypes.ExpHistogramType {
return ExpHistogramLocalTableName
}
// if the time aggregation is count_distinct, we need to use the distributed_samples_v4 table
// because the aggregated tables don't support count_distinct
if timeAggregation == metrictypes.TimeAggregationCountDistinct {
return SamplesV4TableName
}
if end-start < oneDayInMilliseconds {
return SamplesV4TableName
} else if end-start < oneWeekInMilliseconds {
return SamplesV4Agg5mTableName
} else {
return SamplesV4Agg30mTableName
}
}
func AggregationColumnForSamplesTable(
start, end uint64,
metricType metrictypes.Type,
temporality metrictypes.Temporality,
timeAggregation metrictypes.TimeAggregation,
tableHints *metrictypes.MetricTableHints,
) string {
tableName := WhichSamplesTableToUse(start, end, metricType, timeAggregation, tableHints)
var aggregationColumn string
switch temporality {
case metrictypes.Delta:
// for delta metrics, we only support `RATE`/`INCREASE` both of which are sum
// although it doesn't make sense to use anyLast, avg, min, max, count on delta metrics,
// we are keeping it here to make sure that query will not be invalid
switch tableName {
case SamplesV4TableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(value)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(value)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "avg(value)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(value)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(value)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "count(value)"
case metrictypes.TimeAggregationCountDistinct:
aggregationColumn = "countDistinct(value)"
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
aggregationColumn = "sum(value)"
}
case SamplesV4Agg5mTableName, SamplesV4Agg30mTableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(last)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(sum)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "sum(sum) / sum(count)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(min)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(max)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "sum(count)"
// count_distinct is not supported in aggregated tables
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
aggregationColumn = "sum(sum)"
}
}
case metrictypes.Cumulative:
// for cumulative metrics, we only support `RATE`/`INCREASE`. The max value in window is
// used to calculate the sum which is then divided by the window size to get the rate
switch tableName {
case SamplesV4TableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(value)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(value)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "avg(value)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(value)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(value)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "count(value)"
case metrictypes.TimeAggregationCountDistinct:
aggregationColumn = "countDistinct(value)"
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
aggregationColumn = "max(value)"
}
case SamplesV4Agg5mTableName, SamplesV4Agg30mTableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(last)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(sum)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "sum(sum) / sum(count)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(min)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(max)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "sum(count)"
// count_distinct is not supported in aggregated tables
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
aggregationColumn = "max(max)"
}
}
case metrictypes.Unspecified:
switch tableName {
case SamplesV4TableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(value)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(value)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "avg(value)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(value)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(value)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "count(value)"
case metrictypes.TimeAggregationCountDistinct:
aggregationColumn = "countDistinct(value)"
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen
aggregationColumn = "sum(value)"
}
case SamplesV4Agg5mTableName, SamplesV4Agg30mTableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(last)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(sum)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "sum(sum) / sum(count)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(min)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(max)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "sum(count)"
// count_distinct is not supported in aggregated tables
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen
aggregationColumn = "sum(sum)"
}
}
}
return aggregationColumn
}

View File

@ -0,0 +1,30 @@
package telemetrymetrics
import (
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
keysMap := map[string][]*telemetrytypes.TelemetryFieldKey{
"service.name": {
{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
"http.request.method": {
{
Name: "http.request.method",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
}
for _, keys := range keysMap {
for _, key := range keys {
key.Signal = telemetrytypes.SignalMetrics
}
}
return keysMap
}

View File

@ -65,3 +65,49 @@ var (
SpaceAggregationPercentile95 = SpaceAggregation{valuer.NewString("p95")}
SpaceAggregationPercentile99 = SpaceAggregation{valuer.NewString("p99")}
)
func (s SpaceAggregation) IsPercentile() bool {
return s == SpaceAggregationPercentile50 ||
s == SpaceAggregationPercentile75 ||
s == SpaceAggregationPercentile90 ||
s == SpaceAggregationPercentile95 ||
s == SpaceAggregationPercentile99
}
func (s SpaceAggregation) Percentile() float64 {
switch s {
case SpaceAggregationPercentile50:
return 0.5
case SpaceAggregationPercentile75:
return 0.75
case SpaceAggregationPercentile90:
return 0.9
case SpaceAggregationPercentile95:
return 0.95
case SpaceAggregationPercentile99:
return 0.99
default:
return 0
}
}
// MetricTableHints is a struct that contains tables to use instead of the derived tables
// from the start and end time
type MetricTableHints struct {
TimeSeriesTableName string
SamplesTableName string
}
// Certain OTEL metrics encode the state in the value of the metric, which is in general
// a bad modelling (presumably coming from some vendor) and makes it hard to write the aggregation queries
//
// (should have been a key:value with 0, 1 ), example: (pod_status: 0, 1, 2, 3, 4, 5)
//
// they are better modelled as pod_status: (state: running, pending, terminated, etc)
// the value would be 0 or 1, if the value is 1, and the state is pending, then it indicates pod is pending.
//
// however, there have been some metrics that do this, and we need to support them.
// This is workaround for those metrics.
type MetricValueFilter struct {
Value float64
}

View File

@ -152,12 +152,18 @@ type LogAggregation struct {
type MetricAggregation struct {
// metric to query
MetricName string `json:"metricName"`
// type of the metric
Type metrictypes.Type `json:"type"`
// temporality to apply to the query
Temporality metrictypes.Temporality `json:"temporality"`
// time aggregation to apply to the query
TimeAggregation metrictypes.TimeAggregation `json:"timeAggregation"`
// space aggregation to apply to the query
SpaceAggregation metrictypes.SpaceAggregation `json:"spaceAggregation"`
// table hints to use for the query
TableHints *metrictypes.MetricTableHints `json:"-"`
// value filter to apply to the query
ValueFilter *metrictypes.MetricValueFilter `json:"-"`
}
type Filter struct {