diff --git a/pkg/querybuilder/agg_rewrite.go b/pkg/querybuilder/agg_rewrite.go index ccf0d99040..052831d316 100644 --- a/pkg/querybuilder/agg_rewrite.go +++ b/pkg/querybuilder/agg_rewrite.go @@ -14,13 +14,13 @@ import ( ) type AggExprRewriterOptions struct { - FieldKeys map[string][]*telemetrytypes.TelemetryFieldKey + MetadataStore telemetrytypes.MetadataStore FullTextColumn *telemetrytypes.TelemetryFieldKey FieldMapper qbtypes.FieldMapper ConditionBuilder qbtypes.ConditionBuilder + FilterCompiler qbtypes.FilterCompiler JsonBodyPrefix string JsonKeyToKey qbtypes.JsonKeyToFieldFunc - RateInterval uint64 } type aggExprRewriter struct { @@ -34,7 +34,13 @@ func NewAggExprRewriter(opts AggExprRewriterOptions) *aggExprRewriter { // Rewrite parses the given aggregation expression, maps the column, and condition to // valid data source column and condition expression, and returns the rewritten expression // and the args if the parametric aggregation function is used. -func (r *aggExprRewriter) Rewrite(expr string) (string, []any, error) { +func (r *aggExprRewriter) Rewrite(ctx context.Context, expr string, opts ...qbtypes.RewriteOption) (string, []any, error) { + + rctx := &qbtypes.RewriteCtx{} + for _, opt := range opts { + opt(rctx) + } + wrapped := fmt.Sprintf("SELECT %s", expr) p := chparser.NewParser(wrapped) stmts, err := p.ParseStmts() @@ -56,7 +62,14 @@ func (r *aggExprRewriter) Rewrite(expr string) (string, []any, error) { return "", nil, errors.NewInternalf(errors.CodeInternal, "no SELECT items for %q", expr) } - visitor := newExprVisitor(r.opts.FieldKeys, + selectors := QueryStringToKeysSelectors(expr) + + keys, err := r.opts.MetadataStore.GetKeysMulti(ctx, selectors) + if err != nil { + return "", nil, err + } + + visitor := newExprVisitor(keys, r.opts.FullTextColumn, r.opts.FieldMapper, r.opts.ConditionBuilder, @@ -67,26 +80,28 @@ func (r *aggExprRewriter) Rewrite(expr string) (string, []any, error) { if err := sel.SelectItems[0].Accept(visitor); err != nil { return "", nil, err } - // If nothing changed, return original - if !visitor.Modified { - return expr, nil, nil - } if visitor.isRate { - return fmt.Sprintf("%s/%d", sel.SelectItems[0].String(), r.opts.RateInterval), visitor.chArgs, nil + return fmt.Sprintf("%s/%d", sel.SelectItems[0].String(), rctx.RateInterval), visitor.chArgs, nil } return sel.SelectItems[0].String(), visitor.chArgs, nil } // RewriteMultiple rewrites a slice of expressions. func (r *aggExprRewriter) RewriteMultiple( + ctx context.Context, exprs []string, + opts ...qbtypes.RewriteOption, ) ([]string, [][]any, error) { + rctx := &qbtypes.RewriteCtx{} + for _, opt := range opts { + opt(rctx) + } out := make([]string, len(exprs)) var errs []error var chArgsList [][]any for i, e := range exprs { - w, chArgs, err := r.Rewrite(e) + w, chArgs, err := r.Rewrite(ctx, e, opts...) if err != nil { errs = append(errs, err) out[i] = e diff --git a/pkg/querybuilder/query_to_keys.go b/pkg/querybuilder/query_to_keys.go index ad97d10bf9..bdcb4c52cf 100644 --- a/pkg/querybuilder/query_to_keys.go +++ b/pkg/querybuilder/query_to_keys.go @@ -25,7 +25,7 @@ import ( // FieldDataType: telemetrytypes.FieldDataTypeUnspecified, // }, // } -func QueryStringToKeysSelectors(query string) ([]*telemetrytypes.FieldKeySelector, error) { +func QueryStringToKeysSelectors(query string) []*telemetrytypes.FieldKeySelector { lexer := grammar.NewFilterQueryLexer(antlr.NewInputStream(query)) keys := []*telemetrytypes.FieldKeySelector{} for { @@ -45,5 +45,5 @@ func QueryStringToKeysSelectors(query string) ([]*telemetrytypes.FieldKeySelecto } } - return keys, nil + return keys } diff --git a/pkg/querybuilder/query_to_keys_test.go b/pkg/querybuilder/query_to_keys_test.go index 8bf065f010..0a453088d1 100644 --- a/pkg/querybuilder/query_to_keys_test.go +++ b/pkg/querybuilder/query_to_keys_test.go @@ -76,10 +76,7 @@ func TestQueryToKeys(t *testing.T) { } for _, testCase := range testCases { - keys, err := QueryStringToKeysSelectors(testCase.query) - if err != nil { - t.Fatalf("Error: %v", err) - } + keys := QueryStringToKeysSelectors(testCase.query) if len(keys) != len(testCase.expectedKeys) { t.Fatalf("Expected %d keys, got %d", len(testCase.expectedKeys), len(keys)) } diff --git a/pkg/querybuilder/resourcefilter/condition_builder.go b/pkg/querybuilder/resourcefilter/condition_builder.go new file mode 100644 index 0000000000..779748e08f --- /dev/null +++ b/pkg/querybuilder/resourcefilter/condition_builder.go @@ -0,0 +1,188 @@ +package resourcefilter + +import ( + "context" + "fmt" + + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/huandu/go-sqlbuilder" +) + +type defaultConditionBuilder struct { + fm qbtypes.FieldMapper +} + +var _ qbtypes.ConditionBuilder = (*defaultConditionBuilder)(nil) + +func NewConditionBuilder(fm qbtypes.FieldMapper) *defaultConditionBuilder { + return &defaultConditionBuilder{fm: fm} +} + +func valueForIndexFilter(key *telemetrytypes.TelemetryFieldKey, value any) any { + switch v := value.(type) { + case string: + return fmt.Sprintf(`%%%s%%%s%%`, key.Name, v) + case []any: + values := make([]string, 0, len(v)) + for _, v := range v { + values = append(values, fmt.Sprintf(`%%%s%%%s%%`, key.Name, v)) + } + return values + } + return value +} + +func keyIndexFilter(key *telemetrytypes.TelemetryFieldKey) any { + return fmt.Sprintf(`%%%s%%`, key.Name) +} + +func (b *defaultConditionBuilder) ConditionFor( + ctx context.Context, + key *telemetrytypes.TelemetryFieldKey, + op qbtypes.FilterOperator, + value any, + sb *sqlbuilder.SelectBuilder, +) (string, error) { + + if key.FieldContext != telemetrytypes.FieldContextResource { + return "", nil + } + + column, err := b.fm.ColumnFor(ctx, key) + if err != nil { + return "", err + } + + keyIdxFilter := sb.Like(column.Name, keyIndexFilter(key)) + valueForIndexFilter := valueForIndexFilter(key, value) + + fieldName, err := b.fm.FieldFor(ctx, key) + if err != nil { + return "", err + } + + switch op { + case qbtypes.FilterOperatorEqual: + return sb.And( + sb.E(fieldName, value), + keyIdxFilter, + sb.Like(column.Name, valueForIndexFilter), + ), nil + case qbtypes.FilterOperatorNotEqual: + return sb.And( + sb.NE(fieldName, value), + sb.NotLike(column.Name, valueForIndexFilter), + ), nil + case qbtypes.FilterOperatorGreaterThan: + return sb.And(sb.GT(fieldName, value), keyIdxFilter), nil + case qbtypes.FilterOperatorGreaterThanOrEq: + return sb.And(sb.GE(fieldName, value), keyIdxFilter), nil + case qbtypes.FilterOperatorLessThan: + return sb.And(sb.LT(fieldName, value), keyIdxFilter), nil + case qbtypes.FilterOperatorLessThanOrEq: + return sb.And(sb.LE(fieldName, value), keyIdxFilter), nil + + case qbtypes.FilterOperatorLike, qbtypes.FilterOperatorILike: + return sb.And( + sb.ILike(fieldName, value), + keyIdxFilter, + sb.ILike(column.Name, valueForIndexFilter), + ), nil + case qbtypes.FilterOperatorNotLike, qbtypes.FilterOperatorNotILike: + return sb.And( + sb.NotILike(fieldName, value), + sb.NotILike(column.Name, valueForIndexFilter), + ), nil + + case qbtypes.FilterOperatorBetween: + values, ok := value.([]any) + if !ok { + return "", qbtypes.ErrBetweenValues + } + if len(values) != 2 { + return "", qbtypes.ErrBetweenValues + } + return sb.And(keyIdxFilter, sb.Between(fieldName, values[0], values[1])), nil + case qbtypes.FilterOperatorNotBetween: + values, ok := value.([]any) + if !ok { + return "", qbtypes.ErrBetweenValues + } + if len(values) != 2 { + return "", qbtypes.ErrBetweenValues + } + return sb.And(sb.NotBetween(fieldName, values[0], values[1])), nil + + case qbtypes.FilterOperatorIn: + values, ok := value.([]any) + if !ok { + return "", qbtypes.ErrInValues + } + inConditions := make([]string, 0, len(values)) + for _, v := range values { + inConditions = append(inConditions, sb.E(fieldName, v)) + } + mainCondition := sb.Or(inConditions...) + valConditions := make([]string, 0, len(values)) + if valuesForIndexFilter, ok := valueForIndexFilter.([]string); ok { + for _, v := range valuesForIndexFilter { + valConditions = append(valConditions, sb.Like(column.Name, v)) + } + } + mainCondition = sb.And(mainCondition, keyIdxFilter, sb.Or(valConditions...)) + + return mainCondition, nil + case qbtypes.FilterOperatorNotIn: + values, ok := value.([]any) + if !ok { + return "", qbtypes.ErrInValues + } + notInConditions := make([]string, 0, len(values)) + for _, v := range values { + notInConditions = append(notInConditions, sb.NE(fieldName, v)) + } + mainCondition := sb.And(notInConditions...) + valConditions := make([]string, 0, len(values)) + if valuesForIndexFilter, ok := valueForIndexFilter.([]string); ok { + for _, v := range valuesForIndexFilter { + valConditions = append(valConditions, sb.NotLike(column.Name, v)) + } + } + mainCondition = sb.And(mainCondition, sb.And(valConditions...)) + return mainCondition, nil + + case qbtypes.FilterOperatorExists: + return sb.And( + sb.E(fmt.Sprintf("simpleJSONHas(%s, '%s')", column.Name, key.Name), true), + keyIdxFilter, + ), nil + case qbtypes.FilterOperatorNotExists: + return sb.And( + sb.NE(fmt.Sprintf("simpleJSONHas(%s, '%s')", column.Name, key.Name), true), + ), nil + + case qbtypes.FilterOperatorRegexp: + return sb.And( + fmt.Sprintf("match(%s, %s)", fieldName, sb.Var(value)), + keyIdxFilter, + ), nil + case qbtypes.FilterOperatorNotRegexp: + return sb.And( + fmt.Sprintf("NOT match(%s, %s)", fieldName, sb.Var(value)), + ), nil + + case qbtypes.FilterOperatorContains: + return sb.And( + sb.ILike(fieldName, fmt.Sprintf(`%%%s%%`, value)), + keyIdxFilter, + sb.ILike(column.Name, valueForIndexFilter), + ), nil + case qbtypes.FilterOperatorNotContains: + return sb.And( + sb.NotILike(fieldName, fmt.Sprintf(`%%%s%%`, value)), + sb.NotILike(column.Name, valueForIndexFilter), + ), nil + } + return "", qbtypes.ErrUnsupportedOperator +} diff --git a/pkg/querybuilder/resourcefilter/condition_builder_test.go b/pkg/querybuilder/resourcefilter/condition_builder_test.go new file mode 100644 index 0000000000..b59cf9316f --- /dev/null +++ b/pkg/querybuilder/resourcefilter/condition_builder_test.go @@ -0,0 +1,154 @@ +package resourcefilter + +import ( + "context" + "testing" + + "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/huandu/go-sqlbuilder" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConditionBuilder(t *testing.T) { + + testCases := []struct { + name string + key *telemetrytypes.TelemetryFieldKey + op querybuildertypesv5.FilterOperator + value any + expected string + expectedArgs []any + expectedErr error + }{ + { + name: "string_equal", + key: &telemetrytypes.TelemetryFieldKey{ + Name: "k8s.namespace.name", + FieldContext: telemetrytypes.FieldContextResource, + }, + op: querybuildertypesv5.FilterOperatorEqual, + value: "watch", + expected: "simpleJSONExtractString(labels, 'k8s.namespace.name') = ? AND labels LIKE ? AND labels LIKE ?", + expectedArgs: []any{"watch", "%k8s.namespace.name%", `%k8s.namespace.name%watch%`}, + }, + { + name: "string_not_equal", + key: &telemetrytypes.TelemetryFieldKey{ + Name: "k8s.namespace.name", + FieldContext: telemetrytypes.FieldContextResource, + }, + op: querybuildertypesv5.FilterOperatorNotEqual, + value: "redis", + expected: "simpleJSONExtractString(labels, 'k8s.namespace.name') <> ? AND labels NOT LIKE ?", + expectedArgs: []any{"redis", `%k8s.namespace.name%redis%`}, + }, + { + name: "string_like", + key: &telemetrytypes.TelemetryFieldKey{ + Name: "k8s.namespace.name", + FieldContext: telemetrytypes.FieldContextResource, + }, + op: querybuildertypesv5.FilterOperatorLike, + value: "_mango%", + expected: "LOWER(simpleJSONExtractString(labels, 'k8s.namespace.name')) LIKE LOWER(?) AND labels LIKE ? AND LOWER(labels) LIKE LOWER(?)", + expectedArgs: []any{"_mango%", "%k8s.namespace.name%", `%k8s.namespace.name%_mango%%`}, + }, + { + name: "string_not_like", + key: &telemetrytypes.TelemetryFieldKey{ + Name: "k8s.namespace.name", + FieldContext: telemetrytypes.FieldContextResource, + }, + op: querybuildertypesv5.FilterOperatorNotLike, + value: "_mango%", + expected: "LOWER(simpleJSONExtractString(labels, 'k8s.namespace.name')) NOT LIKE LOWER(?) AND LOWER(labels) NOT LIKE LOWER(?)", + expectedArgs: []any{"_mango%", `%k8s.namespace.name%_mango%%`}, + }, + { + name: "string_contains", + key: &telemetrytypes.TelemetryFieldKey{ + Name: "k8s.namespace.name", + FieldContext: telemetrytypes.FieldContextResource, + }, + op: querybuildertypesv5.FilterOperatorContains, + value: "banana", + expected: "LOWER(simpleJSONExtractString(labels, 'k8s.namespace.name')) LIKE LOWER(?) AND labels LIKE ? AND LOWER(labels) LIKE LOWER(?)", + expectedArgs: []any{"%banana%", "%k8s.namespace.name%", `%k8s.namespace.name%banana%`}, + }, + { + name: "string_not_contains", + key: &telemetrytypes.TelemetryFieldKey{ + Name: "k8s.namespace.name", + FieldContext: telemetrytypes.FieldContextResource, + }, + op: querybuildertypesv5.FilterOperatorNotContains, + value: "banana", + expected: "LOWER(simpleJSONExtractString(labels, 'k8s.namespace.name')) NOT LIKE LOWER(?) AND LOWER(labels) NOT LIKE LOWER(?)", + expectedArgs: []any{"%banana%", `%k8s.namespace.name%banana%`}, + }, + { + name: "string_in", + key: &telemetrytypes.TelemetryFieldKey{ + Name: "k8s.namespace.name", + FieldContext: telemetrytypes.FieldContextResource, + }, + op: querybuildertypesv5.FilterOperatorIn, + value: []any{"watch", "redis"}, + expected: "(simpleJSONExtractString(labels, 'k8s.namespace.name') = ? OR simpleJSONExtractString(labels, 'k8s.namespace.name') = ?) AND labels LIKE ? AND (labels LIKE ? OR labels LIKE ?)", + expectedArgs: []any{"watch", "redis", "%k8s.namespace.name%", "%k8s.namespace.name%watch%", "%k8s.namespace.name%redis%"}, + }, + { + name: "string_not_in", + key: &telemetrytypes.TelemetryFieldKey{ + Name: "k8s.namespace.name", + FieldContext: telemetrytypes.FieldContextResource, + }, + op: querybuildertypesv5.FilterOperatorNotIn, + value: []any{"watch", "redis"}, + expected: "(simpleJSONExtractString(labels, 'k8s.namespace.name') <> ? AND simpleJSONExtractString(labels, 'k8s.namespace.name') <> ?) AND (labels NOT LIKE ? AND labels NOT LIKE ?)", + expectedArgs: []any{"watch", "redis", "%k8s.namespace.name%watch%", "%k8s.namespace.name%redis%"}, + }, + { + name: "string_exists", + key: &telemetrytypes.TelemetryFieldKey{ + Name: "k8s.namespace.name", + FieldContext: telemetrytypes.FieldContextResource, + }, + op: querybuildertypesv5.FilterOperatorExists, + expected: "simpleJSONHas(labels, 'k8s.namespace.name') = ? AND labels LIKE ?", + expectedArgs: []any{true, "%k8s.namespace.name%"}, + }, + { + name: "string_not_exists", + key: &telemetrytypes.TelemetryFieldKey{ + Name: "k8s.namespace.name", + FieldContext: telemetrytypes.FieldContextResource, + }, + op: querybuildertypesv5.FilterOperatorNotExists, + expected: "simpleJSONHas(labels, 'k8s.namespace.name') <> ?", + expectedArgs: []any{true}, + }, + } + + fm := NewFieldMapper() + conditionBuilder := NewConditionBuilder(fm) + + for _, tc := range testCases { + sb := sqlbuilder.NewSelectBuilder() + t.Run(tc.name, func(t *testing.T) { + cond, err := conditionBuilder.ConditionFor(context.Background(), tc.key, tc.op, tc.value, sb) + sb.Where(cond) + + if tc.expectedErr != nil { + assert.Error(t, err) + } else { + require.NoError(t, err) + sql, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + assert.Contains(t, sql, tc.expected) + assert.Equal(t, tc.expectedArgs, args) + } + }) + } +} diff --git a/pkg/querybuilder/resourcefilter/field_mapper.go b/pkg/querybuilder/resourcefilter/field_mapper.go new file mode 100644 index 0000000000..73e0e7dd31 --- /dev/null +++ b/pkg/querybuilder/resourcefilter/field_mapper.go @@ -0,0 +1,72 @@ +package resourcefilter + +import ( + "context" + "fmt" + + schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" +) + +var ( + resourceColumns = map[string]*schema.Column{ + "labels": {Name: "labels", Type: schema.ColumnTypeString}, + "fingerprint": {Name: "fingerprint", Type: schema.ColumnTypeString}, + "seen_at_ts_bucket_start": {Name: "seen_at_ts_bucket_start", Type: schema.ColumnTypeInt64}, + } +) + +type defaultFieldMapper struct{} + +var _ qbtypes.FieldMapper = (*defaultFieldMapper)(nil) + +func NewFieldMapper() *defaultFieldMapper { + return &defaultFieldMapper{} +} + +func (m *defaultFieldMapper) getColumn( + _ context.Context, + key *telemetrytypes.TelemetryFieldKey, +) (*schema.Column, error) { + if key.FieldContext == telemetrytypes.FieldContextResource { + return resourceColumns["labels"], nil + } + if col, ok := resourceColumns[key.Name]; ok { + return col, nil + } + return nil, qbtypes.ErrColumnNotFound +} + +func (m *defaultFieldMapper) ColumnFor( + ctx context.Context, + key *telemetrytypes.TelemetryFieldKey, +) (*schema.Column, error) { + return m.getColumn(ctx, key) +} + +func (m *defaultFieldMapper) FieldFor( + ctx context.Context, + key *telemetrytypes.TelemetryFieldKey, +) (string, error) { + column, err := m.getColumn(ctx, key) + if err != nil { + return "", err + } + if key.FieldContext == telemetrytypes.FieldContextResource { + return fmt.Sprintf("simpleJSONExtractString(%s, '%s')", column.Name, key.Name), nil + } + return column.Name, nil +} + +func (m *defaultFieldMapper) ColumnExpressionFor( + ctx context.Context, + key *telemetrytypes.TelemetryFieldKey, + _ map[string][]*telemetrytypes.TelemetryFieldKey, +) (string, error) { + colName, err := m.FieldFor(ctx, key) + if err != nil { + return "", err + } + return fmt.Sprintf("%s AS `%s`", colName, key.Name), nil +} diff --git a/pkg/querybuilder/resourcefilter/filter_compiler.go b/pkg/querybuilder/resourcefilter/filter_compiler.go new file mode 100644 index 0000000000..bf75b24bf0 --- /dev/null +++ b/pkg/querybuilder/resourcefilter/filter_compiler.go @@ -0,0 +1,47 @@ +package resourcefilter + +import ( + "context" + + "github.com/SigNoz/signoz/pkg/querybuilder" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/huandu/go-sqlbuilder" +) + +type FilterCompilerOpts struct { + FieldMapper qbtypes.FieldMapper + ConditionBuilder qbtypes.ConditionBuilder + MetadataStore telemetrytypes.MetadataStore +} + +type filterCompiler struct { + opts FilterCompilerOpts +} + +func NewFilterCompiler(opts FilterCompilerOpts) *filterCompiler { + return &filterCompiler{ + opts: opts, + } +} + +func (c *filterCompiler) Compile(ctx context.Context, expr string) (*sqlbuilder.WhereClause, []string, error) { + selectors := querybuilder.QueryStringToKeysSelectors(expr) + + keys, err := c.opts.MetadataStore.GetKeysMulti(ctx, selectors) + if err != nil { + return nil, nil, err + } + + filterWhereClause, warnings, err := querybuilder.PrepareWhereClause(expr, querybuilder.FilterExprVisitorOpts{ + FieldMapper: c.opts.FieldMapper, + ConditionBuilder: c.opts.ConditionBuilder, + FieldKeys: keys, + }) + + if err != nil { + return nil, nil, err + } + + return filterWhereClause, warnings, nil +} diff --git a/pkg/querybuilder/resourcefilter/statement_builder.go b/pkg/querybuilder/resourcefilter/statement_builder.go new file mode 100644 index 0000000000..f4afcb2e6e --- /dev/null +++ b/pkg/querybuilder/resourcefilter/statement_builder.go @@ -0,0 +1,133 @@ +package resourcefilter + +import ( + "context" + "fmt" + + "github.com/SigNoz/signoz/pkg/errors" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/huandu/go-sqlbuilder" +) + +type ResourceFilterStatementBuilderOpts struct { + FieldMapper qbtypes.FieldMapper + ConditionBuilder qbtypes.ConditionBuilder + Compiler qbtypes.FilterCompiler +} + +var ( + ErrUnsupportedSignal = errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported signal type") +) + +// Configuration for different signal types +type signalConfig struct { + dbName string + tableName string +} + +var signalConfigs = map[telemetrytypes.Signal]signalConfig{ + telemetrytypes.SignalTraces: { + dbName: TracesDBName, + tableName: TraceResourceV3TableName, + }, + telemetrytypes.SignalLogs: { + dbName: LogsDBName, + tableName: LogsResourceV2TableName, + }, +} + +// Generic resource filter statement builder +type resourceFilterStatementBuilder[T any] struct { + opts ResourceFilterStatementBuilderOpts + signal telemetrytypes.Signal +} + +// Ensure interface compliance at compile time +var ( + _ qbtypes.StatementBuilder[qbtypes.TraceAggregation] = (*resourceFilterStatementBuilder[qbtypes.TraceAggregation])(nil) + _ qbtypes.StatementBuilder[qbtypes.LogAggregation] = (*resourceFilterStatementBuilder[qbtypes.LogAggregation])(nil) +) + +// Constructor functions +func NewTraceResourceFilterStatementBuilder(opts ResourceFilterStatementBuilderOpts) *resourceFilterStatementBuilder[qbtypes.TraceAggregation] { + return &resourceFilterStatementBuilder[qbtypes.TraceAggregation]{ + opts: opts, + signal: telemetrytypes.SignalTraces, + } +} + +func NewLogResourceFilterStatementBuilder(opts ResourceFilterStatementBuilderOpts) *resourceFilterStatementBuilder[qbtypes.LogAggregation] { + return &resourceFilterStatementBuilder[qbtypes.LogAggregation]{ + opts: opts, + signal: telemetrytypes.SignalLogs, + } +} + +// Build builds a SQL query based on the given parameters +func (b *resourceFilterStatementBuilder[T]) Build( + ctx context.Context, + start uint64, + end uint64, + requestType qbtypes.RequestType, + query qbtypes.QueryBuilderQuery[T], +) (*qbtypes.Statement, error) { + config, exists := signalConfigs[b.signal] + if !exists { + return nil, fmt.Errorf("%w: %s", ErrUnsupportedSignal, b.signal) + } + + q := sqlbuilder.ClickHouse.NewSelectBuilder() + q.Select("fingerprint") + q.From(fmt.Sprintf("%s.%s", config.dbName, config.tableName)) + + if err := b.addConditions(ctx, q, start, end, query); err != nil { + return nil, err + } + + stmt, args := q.Build() + return &qbtypes.Statement{ + Query: stmt, + Args: args, + }, nil +} + +// addConditions adds both filter and time conditions to the query +func (b *resourceFilterStatementBuilder[T]) addConditions( + ctx context.Context, + sb *sqlbuilder.SelectBuilder, + start, end uint64, + query qbtypes.QueryBuilderQuery[T], +) error { + // Add filter condition if present + if query.Filter != nil && query.Filter.Expression != "" { + filterWhereClause, _, err := b.opts.Compiler.Compile(ctx, query.Filter.Expression) + if err != nil { + return err + } + if filterWhereClause != nil { + sb.AddWhereClause(filterWhereClause) + } + } + + // Add time filter + b.addTimeFilter(sb, start, end) + return nil +} + +// addTimeFilter adds time-based filtering conditions +func (b *resourceFilterStatementBuilder[T]) addTimeFilter(sb *sqlbuilder.SelectBuilder, start, end uint64) { + // Convert nanoseconds to seconds and adjust start bucket + const ( + nsToSeconds = 1000000000 + bucketAdjustment = 1800 // 30 minutes + ) + + startBucket := start/nsToSeconds - bucketAdjustment + endBucket := end / nsToSeconds + + sb.Where( + sb.GE("seen_at_ts_bucket_start", startBucket), + sb.LE("seen_at_ts_bucket_start", endBucket), + ) +} diff --git a/pkg/querybuilder/resourcefilter/tables.go b/pkg/querybuilder/resourcefilter/tables.go new file mode 100644 index 0000000000..bcc8133341 --- /dev/null +++ b/pkg/querybuilder/resourcefilter/tables.go @@ -0,0 +1,8 @@ +package resourcefilter + +const ( + TracesDBName = "signoz_traces" + TraceResourceV3TableName = "distributed_traces_v3_resource" + LogsDBName = "signoz_logs" + LogsResourceV2TableName = "distributed_logs_v2_resource" +) diff --git a/pkg/querybuilder/time.go b/pkg/querybuilder/time.go new file mode 100644 index 0000000000..4c9fe46f04 --- /dev/null +++ b/pkg/querybuilder/time.go @@ -0,0 +1,18 @@ +package querybuilder + +import "math" + +// ToNanoSecs takes epoch and returns it in ns +func ToNanoSecs(epoch uint64) uint64 { + temp := epoch + count := 0 + if epoch == 0 { + count = 1 + } else { + for epoch != 0 { + epoch /= 10 + count++ + } + } + return temp * uint64(math.Pow(10, float64(19-count))) +} diff --git a/pkg/querybuilder/time_test.go b/pkg/querybuilder/time_test.go new file mode 100644 index 0000000000..7617d93fa4 --- /dev/null +++ b/pkg/querybuilder/time_test.go @@ -0,0 +1,62 @@ +package querybuilder + +import "testing" + +func TestToNanoSecs(t *testing.T) { + tests := []struct { + name string + epoch uint64 + expected uint64 + }{ + { + name: "10-digit Unix timestamp (seconds) - 2023-01-01 00:00:00 UTC", + epoch: 1672531200, // January 1, 2023 00:00:00 UTC + expected: 1672531200000000000, // 1672531200 * 10^9 + }, + { + name: "13-digit Unix timestamp (milliseconds) - 2023-01-01 00:00:00 UTC", + epoch: 1672531200000, // January 1, 2023 00:00:00.000 UTC + expected: 1672531200000000000, // 1672531200000 * 10^6 + }, + { + name: "16-digit Unix timestamp (microseconds) - 2023-01-01 00:00:00 UTC", + epoch: 1672531200000000, // January 1, 2023 00:00:00.000000 UTC + expected: 1672531200000000000, // 1672531200000000 * 10^3 + }, + { + name: "19-digit Unix timestamp (nanoseconds) - 2023-01-01 00:00:00 UTC", + epoch: 1672531200000000000, // January 1, 2023 00:00:00.000000000 UTC + expected: 1672531200000000000, // 1672531200000000000 * 10^0 + }, + { + name: "Unix epoch start - 1970-01-01 00:00:00 UTC", + epoch: 0, + expected: 0, + }, + { + name: "Recent timestamp - 2024-05-25 12:00:00 UTC", + epoch: 1716638400, // May 25, 2024 12:00:00 UTC + expected: 1716638400000000000, // 1716638400 * 10^9 + }, + + { + name: "Large valid timestamp - 2025-05-15 10:30:45 UTC", + epoch: 1747204245, // May 15, 2025 10:30:45 UTC + expected: 1747204245000000000, // 1747204245 * 10^9 + }, + { + name: "18-digit microsecond timestamp", + epoch: 1672531200123456, // Jan 1, 2023 with microseconds + expected: 1672531200123456000, // 1672531200123456 * 10^3 + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ToNanoSecs(tt.epoch) + if result != tt.expected { + t.Errorf("ToNanoSecs(%d) = %d, want %d", tt.epoch, result, tt.expected) + } + }) + } +} diff --git a/pkg/telemetrytraces/filter_compiler.go b/pkg/telemetrytraces/filter_compiler.go new file mode 100644 index 0000000000..a91e231264 --- /dev/null +++ b/pkg/telemetrytraces/filter_compiler.go @@ -0,0 +1,55 @@ +package telemetrytraces + +import ( + "context" + + "github.com/SigNoz/signoz/pkg/querybuilder" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/huandu/go-sqlbuilder" +) + +type FilterCompilerOpts struct { + FieldMapper qbtypes.FieldMapper + ConditionBuilder qbtypes.ConditionBuilder + MetadataStore telemetrytypes.MetadataStore + FullTextColumn *telemetrytypes.TelemetryFieldKey + JsonBodyPrefix string + JsonKeyToKey qbtypes.JsonKeyToFieldFunc + SkipResourceFilter bool +} + +type filterCompiler struct { + opts FilterCompilerOpts +} + +func NewFilterCompiler(opts FilterCompilerOpts) *filterCompiler { + return &filterCompiler{ + opts: opts, + } +} + +func (c *filterCompiler) Compile(ctx context.Context, expr string) (*sqlbuilder.WhereClause, []string, error) { + selectors := querybuilder.QueryStringToKeysSelectors(expr) + + keys, err := c.opts.MetadataStore.GetKeysMulti(ctx, selectors) + if err != nil { + return nil, nil, err + } + + filterWhereClause, warnings, err := querybuilder.PrepareWhereClause(expr, querybuilder.FilterExprVisitorOpts{ + FieldMapper: c.opts.FieldMapper, + ConditionBuilder: c.opts.ConditionBuilder, + FieldKeys: keys, + FullTextColumn: c.opts.FullTextColumn, + JsonBodyPrefix: c.opts.JsonBodyPrefix, + JsonKeyToKey: c.opts.JsonKeyToKey, + SkipResourceFilter: c.opts.SkipResourceFilter, + }) + + if err != nil { + return nil, nil, err + } + + return filterWhereClause, warnings, nil +} diff --git a/pkg/telemetrytraces/statement_builder.go b/pkg/telemetrytraces/statement_builder.go new file mode 100644 index 0000000000..c6313f820c --- /dev/null +++ b/pkg/telemetrytraces/statement_builder.go @@ -0,0 +1,456 @@ +package telemetrytraces + +import ( + "context" + "fmt" + "strings" + + "github.com/SigNoz/signoz/pkg/errors" + "github.com/SigNoz/signoz/pkg/querybuilder" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/huandu/go-sqlbuilder" +) + +var ( + ErrUnsupportedAggregation = errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported aggregation") +) + +type TraceQueryStatementBuilderOpts struct { + MetadataStore telemetrytypes.MetadataStore + FieldMapper qbtypes.FieldMapper + ConditionBuilder qbtypes.ConditionBuilder + ResourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation] + Compiler qbtypes.FilterCompiler + AggExprRewriter qbtypes.AggExprRewriter +} + +type traceQueryStatementBuilder struct { + opts TraceQueryStatementBuilderOpts + fm qbtypes.FieldMapper + cb qbtypes.ConditionBuilder + compiler qbtypes.FilterCompiler + aggExprRewriter qbtypes.AggExprRewriter +} + +var _ qbtypes.StatementBuilder[qbtypes.TraceAggregation] = (*traceQueryStatementBuilder)(nil) + +func NewTraceQueryStatementBuilder(opts TraceQueryStatementBuilderOpts) *traceQueryStatementBuilder { + return &traceQueryStatementBuilder{ + opts: opts, + fm: opts.FieldMapper, + cb: opts.ConditionBuilder, + compiler: opts.Compiler, + aggExprRewriter: opts.AggExprRewriter, + } +} + +// Build builds a SQL query for traces based on the given parameters +func (b *traceQueryStatementBuilder) Build( + ctx context.Context, + start uint64, + end uint64, + requestType qbtypes.RequestType, + query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], +) (*qbtypes.Statement, error) { + + start = querybuilder.ToNanoSecs(start) + end = querybuilder.ToNanoSecs(end) + + keySelectors := getKeySelectors(query) + keys, err := b.opts.MetadataStore.GetKeysMulti(ctx, keySelectors) + if err != nil { + return nil, err + } + + // Create SQL builder + q := sqlbuilder.ClickHouse.NewSelectBuilder() + + switch requestType { + case qbtypes.RequestTypeRaw: + return b.buildListQuery(ctx, q, query, start, end, keys) + case qbtypes.RequestTypeTimeSeries: + return b.buildTimeSeriesQuery(ctx, q, query, start, end, keys) + case qbtypes.RequestTypeScalar: + return b.buildScalarQuery(ctx, q, query, start, end, keys, false) + } + + return nil, fmt.Errorf("unsupported request type: %s", requestType) +} + +func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]) []*telemetrytypes.FieldKeySelector { + var keySelectors []*telemetrytypes.FieldKeySelector + + for idx := range query.Aggregations { + aggExpr := query.Aggregations[idx] + selectors := querybuilder.QueryStringToKeysSelectors(aggExpr.Expression) + keySelectors = append(keySelectors, selectors...) + } + + whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression) + keySelectors = append(keySelectors, whereClauseSelectors...) + + return keySelectors +} + +// buildListQuery builds a query for list panel type +func (b *traceQueryStatementBuilder) buildListQuery( + ctx context.Context, + sb *sqlbuilder.SelectBuilder, + query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], + start, end uint64, + keys map[string][]*telemetrytypes.TelemetryFieldKey, +) (*qbtypes.Statement, error) { + + var ( + cteFragments []string + cteArgs [][]any + ) + + if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end); err != nil { + return nil, err + } else if frag != "" { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + + // Select default columns + sb.Select( + "timestamp", + "trace_id", + "span_id", + "name", + "resource_string_service$$name", + "duration_nano", + "response_status_code", + ) + + for _, field := range query.SelectFields { + colExpr, err := b.fm.ColumnExpressionFor(ctx, &field, keys) + if err != nil { + return nil, err + } + sb.SelectMore(colExpr) + } + + // From table + sb.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName)) + + // Add filter conditions + warnings, err := b.addFilterCondition(ctx, sb, start, end, query) + if err != nil { + return nil, err + } + + // Add order by + for _, orderBy := range query.Order { + sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction)) + } + + // Add limit and offset + if query.Limit > 0 { + sb.Limit(query.Limit) + } + + if query.Offset > 0 { + sb.Offset(query.Offset) + } + + mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + + finalSQL := combineCTEs(cteFragments) + mainSQL + finalArgs := prependArgs(cteArgs, mainArgs) + + return &qbtypes.Statement{ + Query: finalSQL, + Args: finalArgs, + Warnings: warnings, + }, nil +} + +func (b *traceQueryStatementBuilder) buildTimeSeriesQuery( + ctx context.Context, + sb *sqlbuilder.SelectBuilder, + query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], + start, end uint64, + keys map[string][]*telemetrytypes.TelemetryFieldKey, +) (*qbtypes.Statement, error) { + + var ( + cteFragments []string + cteArgs [][]any + ) + + if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end); err != nil { + return nil, err + } else if frag != "" { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + + sb.SelectMore(fmt.Sprintf( + "toStartOfInterval(timestamp, INTERVAL %d SECOND) AS ts", + int64(query.StepInterval.Seconds()), + )) + + // Keep original column expressions so we can build the tuple + fieldNames := make([]string, 0, len(query.GroupBy)) + for _, gb := range query.GroupBy { + colExpr, err := b.fm.ColumnExpressionFor(ctx, &gb.TelemetryFieldKey, keys) + if err != nil { + return nil, err + } + sb.SelectMore(colExpr) + fieldNames = append(fieldNames, fmt.Sprintf("`%s`", gb.TelemetryFieldKey.Name)) + } + + // Aggregations + allAggChArgs := make([]any, 0) + for i, agg := range query.Aggregations { + rewritten, chArgs, err := b.aggExprRewriter.Rewrite(ctx, agg.Expression) + if err != nil { + return nil, err + } + allAggChArgs = append(allAggChArgs, chArgs...) + sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, i)) + } + + sb.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName)) + warnings, err := b.addFilterCondition(ctx, sb, start, end, query) + if err != nil { + return nil, err + } + + var finalSQL string + var finalArgs []any + + if query.Limit > 0 { + // build the scalar “top/bottom-N” query in its own builder. + cteSB := sqlbuilder.ClickHouse.NewSelectBuilder() + cteStmt, err := b.buildScalarQuery(ctx, cteSB, query, start, end, keys, true) + if err != nil { + return nil, err + } + + cteFragments = append(cteFragments, fmt.Sprintf("__limit_cte AS (%s)", cteStmt.Query)) + cteArgs = append(cteArgs, cteStmt.Args) + + // Constrain the main query to the rows that appear in the CTE. + tuple := fmt.Sprintf("(%s)", strings.Join(fieldNames, ", ")) + sb.Where(fmt.Sprintf("%s IN (SELECT %s FROM __limit_cte)", tuple, strings.Join(fieldNames, ", "))) + + // Group by all dimensions + sb.GroupBy("ALL") + if query.Having != nil && query.Having.Expression != "" { + sb.Having(query.Having.Expression) + } + + mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...) + + // Stitch it all together: WITH … SELECT … + finalSQL = combineCTEs(cteFragments) + mainSQL + finalArgs = prependArgs(cteArgs, mainArgs) + + } else { + sb.GroupBy("ALL") + if query.Having != nil && query.Having.Expression != "" { + sb.Having(query.Having.Expression) + } + + mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...) + + // Stitch it all together: WITH … SELECT … + finalSQL = combineCTEs(cteFragments) + mainSQL + finalArgs = prependArgs(cteArgs, mainArgs) + } + + return &qbtypes.Statement{ + Query: finalSQL, + Args: finalArgs, + Warnings: warnings, + }, nil +} + +// buildScalarQuery builds a query for scalar panel type +func (b *traceQueryStatementBuilder) buildScalarQuery( + ctx context.Context, + sb *sqlbuilder.SelectBuilder, + query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], + start, end uint64, + keys map[string][]*telemetrytypes.TelemetryFieldKey, + skipResourceCTE bool, +) (*qbtypes.Statement, error) { + + var ( + cteFragments []string + cteArgs [][]any + ) + + if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end); err != nil { + return nil, err + } else if frag != "" && !skipResourceCTE { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + + allAggChArgs := []any{} + + // Add group by columns + for _, groupBy := range query.GroupBy { + colExpr, err := b.fm.ColumnExpressionFor(ctx, &groupBy.TelemetryFieldKey, keys) + if err != nil { + return nil, err + } + sb.SelectMore(colExpr) + } + + // Add aggregation + if len(query.Aggregations) > 0 { + for idx := range query.Aggregations { + aggExpr := query.Aggregations[idx] + rewritten, chArgs, err := b.aggExprRewriter.Rewrite(ctx, aggExpr.Expression) + if err != nil { + return nil, err + } + allAggChArgs = append(allAggChArgs, chArgs...) + sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, idx)) + } + } + + // From table + sb.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName)) + + // Add filter conditions + warnings, err := b.addFilterCondition(ctx, sb, start, end, query) + if err != nil { + return nil, err + } + + // Group by dimensions + sb.GroupBy("ALL") + + // Add having clause if needed + if query.Having != nil && query.Having.Expression != "" { + sb.Having(query.Having.Expression) + } + + // Add order by + for _, orderBy := range query.Order { + idx, ok := aggOrderBy(orderBy, query) + if ok { + sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction)) + } else { + sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction)) + } + } + + // if there is no order by, then use the __result_0 as the order by + if len(query.Order) == 0 { + sb.OrderBy("__result_0 DESC") + } + + // Add limit and offset + if query.Limit > 0 { + sb.Limit(query.Limit) + } + + mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...) + + finalSQL := combineCTEs(cteFragments) + mainSQL + finalArgs := prependArgs(cteArgs, mainArgs) + + return &qbtypes.Statement{ + Query: finalSQL, + Args: finalArgs, + Warnings: warnings, + }, nil +} + +// buildFilterCondition builds SQL condition from filter expression +func (b *traceQueryStatementBuilder) addFilterCondition(ctx context.Context, sb *sqlbuilder.SelectBuilder, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]) ([]string, error) { + + // add filter expression + + filterWhereClause, warnings, err := b.compiler.Compile(ctx, query.Filter.Expression) + + if err != nil { + return nil, err + } + + if filterWhereClause != nil { + sb.AddWhereClause(filterWhereClause) + } + + // add time filter + startBucket := start/1000000000 - 1800 + endBucket := end / 1000000000 + + sb.Where(sb.GE("timestamp", start), sb.LE("timestamp", end), sb.GE("ts_bucket_start", startBucket), sb.LE("ts_bucket_start", endBucket)) + + return warnings, nil +} + +// combineCTEs takes any number of individual CTE fragments like +// +// "__resource_filter AS (...)", "__limit_cte AS (...)" +// +// and renders the final `WITH …` clause. +func combineCTEs(ctes []string) string { + if len(ctes) == 0 { + return "" + } + return "WITH " + strings.Join(ctes, ", ") + " " +} + +// prependArgs ensures CTE arguments appear before main-query arguments +// in the final slice so their ordinal positions match the SQL string. +func prependArgs(cteArgs [][]any, mainArgs []any) []any { + out := make([]any, 0, len(mainArgs)+len(cteArgs)) + for _, a := range cteArgs { // CTEs first, in declaration order + out = append(out, a...) + } + return append(out, mainArgs...) +} + +func aggOrderBy(k qbtypes.OrderBy, q qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]) (int, bool) { + for i, agg := range q.Aggregations { + if k.Key.Name == agg.Alias || + k.Key.Name == agg.Expression || + k.Key.Name == fmt.Sprintf("%d", i) { + return i, true + } + } + return 0, false +} + +func (b *traceQueryStatementBuilder) maybeAttachResourceFilter( + ctx context.Context, + sb *sqlbuilder.SelectBuilder, + query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], + start, end uint64, +) (cteSQL string, cteArgs []any, err error) { + + stmt, err := b.buildResourceFilterCTE(ctx, query, start, end) + if err != nil { + return "", nil, err + } + + sb.Where("resource_fingerprint IN (SELECT fingerprint FROM __resource_filter)") + + return fmt.Sprintf("__resource_filter AS (%s)", stmt.Query), stmt.Args, nil +} + +func (b *traceQueryStatementBuilder) buildResourceFilterCTE( + ctx context.Context, + query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], + start, end uint64, +) (*qbtypes.Statement, error) { + + return b.opts.ResourceFilterStmtBuilder.Build( + ctx, + start, + end, + qbtypes.RequestTypeRaw, + query, + ) +} diff --git a/pkg/telemetrytraces/stmt_builder_test.go b/pkg/telemetrytraces/stmt_builder_test.go new file mode 100644 index 0000000000..864838cf30 --- /dev/null +++ b/pkg/telemetrytraces/stmt_builder_test.go @@ -0,0 +1,117 @@ +package telemetrytraces + +import ( + "context" + "testing" + "time" + + "github.com/SigNoz/signoz/pkg/querybuilder" + "github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest" + "github.com/stretchr/testify/require" +) + +func resourceFilterStmtBuilder() (qbtypes.StatementBuilder[qbtypes.TraceAggregation], error) { + fm := resourcefilter.NewFieldMapper() + cb := resourcefilter.NewConditionBuilder(fm) + mockMetadataStore := telemetrytypestest.NewMockMetadataStore() + mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() + compiler := resourcefilter.NewFilterCompiler(resourcefilter.FilterCompilerOpts{ + FieldMapper: fm, + ConditionBuilder: cb, + MetadataStore: mockMetadataStore, + }) + + return resourcefilter.NewTraceResourceFilterStatementBuilder(resourcefilter.ResourceFilterStatementBuilderOpts{ + FieldMapper: fm, + ConditionBuilder: cb, + Compiler: compiler, + }), nil +} + +func TestStatementBuilder(t *testing.T) { + cases := []struct { + name string + requestType qbtypes.RequestType + query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation] + expected qbtypes.Statement + expectedErr error + }{ + { + name: "test", + requestType: qbtypes.RequestTypeTimeSeries, + query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Signal: telemetrytypes.SignalTraces, + StepInterval: qbtypes.Step{Duration: 30 * time.Second}, + Aggregations: []qbtypes.TraceAggregation{ + { + Expression: "count()", + }, + }, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'redis-manual'", + }, + Limit: 10, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service.name", + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT resources_string['service.name'] AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, resources_string['service.name'] AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL", + Args: []any{"redis-manual", "%service.name%", "%service.name%redis-manual%", uint64(1747945619), uint64(1747983448), uint64(1747947419000000000), uint64(1747983448000000000), uint64(1747945619), uint64(1747983448), 10, uint64(1747947419000000000), uint64(1747983448000000000), uint64(1747945619), uint64(1747983448)}, + }, + expectedErr: nil, + }, + } + + fm := NewFieldMapper() + cb := NewConditionBuilder(fm) + mockMetadataStore := telemetrytypestest.NewMockMetadataStore() + mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() + compiler := NewFilterCompiler(FilterCompilerOpts{ + FieldMapper: fm, + ConditionBuilder: cb, + MetadataStore: mockMetadataStore, + SkipResourceFilter: true, + }) + aggExprRewriter := querybuilder.NewAggExprRewriter(querybuilder.AggExprRewriterOptions{ + FieldMapper: fm, + ConditionBuilder: cb, + MetadataStore: mockMetadataStore, + }) + + resourceFilterStmtBuilder, err := resourceFilterStmtBuilder() + require.NoError(t, err) + + statementBuilder := NewTraceQueryStatementBuilder(TraceQueryStatementBuilderOpts{ + FieldMapper: fm, + ConditionBuilder: cb, + Compiler: compiler, + MetadataStore: mockMetadataStore, + AggExprRewriter: aggExprRewriter, + ResourceFilterStmtBuilder: resourceFilterStmtBuilder, + }) + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + + q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query) + + if c.expectedErr != nil { + require.Error(t, err) + require.Contains(t, err.Error(), c.expectedErr.Error()) + } else { + require.NoError(t, err) + require.Equal(t, c.expected.Query, q.Query) + require.Equal(t, c.expected.Args, q.Args) + require.Equal(t, c.expected.Warnings, q.Warnings) + } + }) + } +} diff --git a/pkg/telemetrytraces/test_data.go b/pkg/telemetrytraces/test_data.go new file mode 100644 index 0000000000..051af18cc7 --- /dev/null +++ b/pkg/telemetrytraces/test_data.go @@ -0,0 +1,38 @@ +package telemetrytraces + +import ( + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" +) + +func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey { + return map[string][]*telemetrytypes.TelemetryFieldKey{ + "service.name": { + { + Name: "service.name", + FieldContext: telemetrytypes.FieldContextResource, + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + }, + "http.request.method": { + { + Name: "http.request.method", + FieldContext: telemetrytypes.FieldContextAttribute, + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + }, + "http.response.status_code": { + { + Name: "http.status_code", + FieldContext: telemetrytypes.FieldContextAttribute, + FieldDataType: telemetrytypes.FieldDataTypeInt64, + }, + }, + "kind_string": { + { + Name: "kind_string", + FieldContext: telemetrytypes.FieldContextSpan, + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + }, + } +}