diff --git a/pkg/telemetrylogs/filter_compiler.go b/pkg/telemetrylogs/filter_compiler.go new file mode 100644 index 0000000000..69dc90bd52 --- /dev/null +++ b/pkg/telemetrylogs/filter_compiler.go @@ -0,0 +1,55 @@ +package telemetrylogs + +import ( + "context" + + "github.com/SigNoz/signoz/pkg/querybuilder" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/huandu/go-sqlbuilder" +) + +type FilterCompilerOpts struct { + FieldMapper qbtypes.FieldMapper + ConditionBuilder qbtypes.ConditionBuilder + MetadataStore telemetrytypes.MetadataStore + FullTextColumn *telemetrytypes.TelemetryFieldKey + JsonBodyPrefix string + JsonKeyToKey qbtypes.JsonKeyToFieldFunc + SkipResourceFilter bool +} + +type filterCompiler struct { + opts FilterCompilerOpts +} + +func NewFilterCompiler(opts FilterCompilerOpts) *filterCompiler { + return &filterCompiler{ + opts: opts, + } +} + +func (c *filterCompiler) Compile(ctx context.Context, expr string) (*sqlbuilder.WhereClause, []string, error) { + selectors := querybuilder.QueryStringToKeysSelectors(expr) + + keys, err := c.opts.MetadataStore.GetKeysMulti(ctx, selectors) + if err != nil { + return nil, nil, err + } + + filterWhereClause, warnings, err := querybuilder.PrepareWhereClause(expr, querybuilder.FilterExprVisitorOpts{ + FieldMapper: c.opts.FieldMapper, + ConditionBuilder: c.opts.ConditionBuilder, + FieldKeys: keys, + FullTextColumn: c.opts.FullTextColumn, + JsonBodyPrefix: c.opts.JsonBodyPrefix, + JsonKeyToKey: c.opts.JsonKeyToKey, + SkipResourceFilter: c.opts.SkipResourceFilter, + }) + + if err != nil { + return nil, nil, err + } + + return filterWhereClause, warnings, nil +} diff --git a/pkg/telemetrylogs/statement_builder.go b/pkg/telemetrylogs/statement_builder.go new file mode 100644 index 0000000000..1806624a63 --- /dev/null +++ b/pkg/telemetrylogs/statement_builder.go @@ -0,0 +1,442 @@ +package telemetrylogs + +import ( + "context" + "fmt" + "strings" + + "github.com/SigNoz/signoz/pkg/errors" + "github.com/SigNoz/signoz/pkg/querybuilder" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/huandu/go-sqlbuilder" +) + +var ( + ErrUnsupportedAggregation = errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported aggregation") +) + +type LogQueryStatementBuilderOpts struct { + MetadataStore telemetrytypes.MetadataStore + FieldMapper qbtypes.FieldMapper + ConditionBuilder qbtypes.ConditionBuilder + ResourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation] + Compiler qbtypes.FilterCompiler + AggExprRewriter qbtypes.AggExprRewriter +} + +type logQueryStatementBuilder struct { + opts LogQueryStatementBuilderOpts + fm qbtypes.FieldMapper + cb qbtypes.ConditionBuilder + compiler qbtypes.FilterCompiler + aggExprRewriter qbtypes.AggExprRewriter +} + +var _ qbtypes.StatementBuilder[qbtypes.LogAggregation] = (*logQueryStatementBuilder)(nil) + +func NewLogQueryStatementBuilder(opts LogQueryStatementBuilderOpts) *logQueryStatementBuilder { + return &logQueryStatementBuilder{ + opts: opts, + fm: opts.FieldMapper, + cb: opts.ConditionBuilder, + compiler: opts.Compiler, + aggExprRewriter: opts.AggExprRewriter, + } +} + +// Build builds a SQL query for logs based on the given parameters +func (b *logQueryStatementBuilder) Build( + ctx context.Context, + start uint64, + end uint64, + requestType qbtypes.RequestType, + query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], +) (*qbtypes.Statement, error) { + + start = querybuilder.ToNanoSecs(start) + end = querybuilder.ToNanoSecs(end) + + keySelectors := getKeySelectors(query) + keys, err := b.opts.MetadataStore.GetKeysMulti(ctx, keySelectors) + if err != nil { + return nil, err + } + + // Create SQL builder + q := sqlbuilder.ClickHouse.NewSelectBuilder() + + switch requestType { + case qbtypes.RequestTypeRaw: + return b.buildListQuery(ctx, q, query, start, end, keys) + case qbtypes.RequestTypeTimeSeries: + return b.buildTimeSeriesQuery(ctx, q, query, start, end, keys) + case qbtypes.RequestTypeScalar: + return b.buildScalarQuery(ctx, q, query, start, end, keys, false) + } + + return nil, fmt.Errorf("unsupported request type: %s", requestType) +} + +func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []*telemetrytypes.FieldKeySelector { + var keySelectors []*telemetrytypes.FieldKeySelector + + for idx := range query.Aggregations { + aggExpr := query.Aggregations[idx] + selectors := querybuilder.QueryStringToKeysSelectors(aggExpr.Expression) + keySelectors = append(keySelectors, selectors...) + } + + whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression) + keySelectors = append(keySelectors, whereClauseSelectors...) + + return keySelectors +} + +// buildListQuery builds a query for list panel type +func (b *logQueryStatementBuilder) buildListQuery( + ctx context.Context, + sb *sqlbuilder.SelectBuilder, + query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], + start, end uint64, + _ map[string][]*telemetrytypes.TelemetryFieldKey, +) (*qbtypes.Statement, error) { + + var ( + cteFragments []string + cteArgs [][]any + ) + + if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end); err != nil { + return nil, err + } else if frag != "" { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + + // Select default columns + sb.Select( + "timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string", + ) + + // From table + sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName)) + + // Add filter conditions + warnings, err := b.addFilterCondition(ctx, sb, start, end, query) + if err != nil { + return nil, err + } + + // Add order by + for _, orderBy := range query.Order { + sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction)) + } + + // Add limit and offset + if query.Limit > 0 { + sb.Limit(query.Limit) + } + + if query.Offset > 0 { + sb.Offset(query.Offset) + } + + mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + + finalSQL := combineCTEs(cteFragments) + mainSQL + finalArgs := prependArgs(cteArgs, mainArgs) + + return &qbtypes.Statement{ + Query: finalSQL, + Args: finalArgs, + Warnings: warnings, + }, nil +} + +func (b *logQueryStatementBuilder) buildTimeSeriesQuery( + ctx context.Context, + sb *sqlbuilder.SelectBuilder, + query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], + start, end uint64, + keys map[string][]*telemetrytypes.TelemetryFieldKey, +) (*qbtypes.Statement, error) { + + var ( + cteFragments []string + cteArgs [][]any + ) + + if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end); err != nil { + return nil, err + } else if frag != "" { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + + sb.SelectMore(fmt.Sprintf( + "toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts", + int64(query.StepInterval.Seconds()), + )) + + // Keep original column expressions so we can build the tuple + fieldNames := make([]string, 0, len(query.GroupBy)) + for _, gb := range query.GroupBy { + colExpr, err := b.fm.ColumnExpressionFor(ctx, &gb.TelemetryFieldKey, keys) + if err != nil { + return nil, err + } + sb.SelectMore(colExpr) + fieldNames = append(fieldNames, fmt.Sprintf("`%s`", gb.TelemetryFieldKey.Name)) + } + + // Aggregations + allAggChArgs := make([]any, 0) + for i, agg := range query.Aggregations { + rewritten, chArgs, err := b.aggExprRewriter.Rewrite(ctx, agg.Expression) + if err != nil { + return nil, err + } + allAggChArgs = append(allAggChArgs, chArgs...) + sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, i)) + } + + sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName)) + warnings, err := b.addFilterCondition(ctx, sb, start, end, query) + if err != nil { + return nil, err + } + + var finalSQL string + var finalArgs []any + + if query.Limit > 0 { + // build the scalar “top/bottom-N” query in its own builder. + cteSB := sqlbuilder.ClickHouse.NewSelectBuilder() + cteStmt, err := b.buildScalarQuery(ctx, cteSB, query, start, end, keys, true) + if err != nil { + return nil, err + } + + cteFragments = append(cteFragments, fmt.Sprintf("__limit_cte AS (%s)", cteStmt.Query)) + cteArgs = append(cteArgs, cteStmt.Args) + + // Constrain the main query to the rows that appear in the CTE. + tuple := fmt.Sprintf("(%s)", strings.Join(fieldNames, ", ")) + sb.Where(fmt.Sprintf("%s IN (SELECT %s FROM __limit_cte)", tuple, strings.Join(fieldNames, ", "))) + + // Group by all dimensions + sb.GroupBy("ALL") + if query.Having != nil && query.Having.Expression != "" { + sb.Having(query.Having.Expression) + } + + mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...) + + // Stitch it all together: WITH … SELECT … + finalSQL = combineCTEs(cteFragments) + mainSQL + finalArgs = prependArgs(cteArgs, mainArgs) + + } else { + sb.GroupBy("ALL") + if query.Having != nil && query.Having.Expression != "" { + sb.Having(query.Having.Expression) + } + + mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...) + + // Stitch it all together: WITH … SELECT … + finalSQL = combineCTEs(cteFragments) + mainSQL + finalArgs = prependArgs(cteArgs, mainArgs) + } + + return &qbtypes.Statement{ + Query: finalSQL, + Args: finalArgs, + Warnings: warnings, + }, nil +} + +// buildScalarQuery builds a query for scalar panel type +func (b *logQueryStatementBuilder) buildScalarQuery( + ctx context.Context, + sb *sqlbuilder.SelectBuilder, + query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], + start, end uint64, + keys map[string][]*telemetrytypes.TelemetryFieldKey, + skipResourceCTE bool, +) (*qbtypes.Statement, error) { + + var ( + cteFragments []string + cteArgs [][]any + ) + + if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end); err != nil { + return nil, err + } else if frag != "" && !skipResourceCTE { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + + allAggChArgs := []any{} + + // Add group by columns + for _, groupBy := range query.GroupBy { + colExpr, err := b.fm.ColumnExpressionFor(ctx, &groupBy.TelemetryFieldKey, keys) + if err != nil { + return nil, err + } + sb.SelectMore(colExpr) + } + + // Add aggregation + if len(query.Aggregations) > 0 { + for idx := range query.Aggregations { + aggExpr := query.Aggregations[idx] + rewritten, chArgs, err := b.aggExprRewriter.Rewrite(ctx, aggExpr.Expression) + if err != nil { + return nil, err + } + allAggChArgs = append(allAggChArgs, chArgs...) + sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, idx)) + } + } + + // From table + sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName)) + + // Add filter conditions + warnings, err := b.addFilterCondition(ctx, sb, start, end, query) + if err != nil { + return nil, err + } + + // Group by dimensions + sb.GroupBy("ALL") + + // Add having clause if needed + if query.Having != nil && query.Having.Expression != "" { + sb.Having(query.Having.Expression) + } + + // Add order by + for _, orderBy := range query.Order { + idx, ok := aggOrderBy(orderBy, query) + if ok { + sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction)) + } else { + sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction)) + } + } + + // if there is no order by, then use the __result_0 as the order by + if len(query.Order) == 0 { + sb.OrderBy("__result_0 DESC") + } + + // Add limit and offset + if query.Limit > 0 { + sb.Limit(query.Limit) + } + + mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...) + + finalSQL := combineCTEs(cteFragments) + mainSQL + finalArgs := prependArgs(cteArgs, mainArgs) + + return &qbtypes.Statement{ + Query: finalSQL, + Args: finalArgs, + Warnings: warnings, + }, nil +} + +// buildFilterCondition builds SQL condition from filter expression +func (b *logQueryStatementBuilder) addFilterCondition(ctx context.Context, sb *sqlbuilder.SelectBuilder, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) ([]string, error) { + + // add filter expression + + filterWhereClause, warnings, err := b.compiler.Compile(ctx, query.Filter.Expression) + + if err != nil { + return nil, err + } + + if filterWhereClause != nil { + sb.AddWhereClause(filterWhereClause) + } + + // add time filter + startBucket := start/1000000000 - 1800 + endBucket := end / 1000000000 + + sb.Where(sb.GE("timestamp", start), sb.LE("timestamp", end), sb.GE("ts_bucket_start", startBucket), sb.LE("ts_bucket_start", endBucket)) + + return warnings, nil +} + +// combineCTEs takes any number of individual CTE fragments like +// +// "__resource_filter AS (...)", "__limit_cte AS (...)" +// +// and renders the final `WITH …` clause. +func combineCTEs(ctes []string) string { + if len(ctes) == 0 { + return "" + } + return "WITH " + strings.Join(ctes, ", ") + " " +} + +// prependArgs ensures CTE arguments appear before main-query arguments +// in the final slice so their ordinal positions match the SQL string. +func prependArgs(cteArgs [][]any, mainArgs []any) []any { + out := make([]any, 0, len(mainArgs)+len(cteArgs)) + for _, a := range cteArgs { // CTEs first, in declaration order + out = append(out, a...) + } + return append(out, mainArgs...) +} + +func aggOrderBy(k qbtypes.OrderBy, q qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) (int, bool) { + for i, agg := range q.Aggregations { + if k.Key.Name == agg.Alias || + k.Key.Name == agg.Expression || + k.Key.Name == fmt.Sprintf("%d", i) { + return i, true + } + } + return 0, false +} + +func (b *logQueryStatementBuilder) maybeAttachResourceFilter( + ctx context.Context, + sb *sqlbuilder.SelectBuilder, + query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], + start, end uint64, +) (cteSQL string, cteArgs []any, err error) { + + stmt, err := b.buildResourceFilterCTE(ctx, query, start, end) + if err != nil { + return "", nil, err + } + + sb.Where("resource_fingerprint IN (SELECT fingerprint FROM __resource_filter)") + + return fmt.Sprintf("__resource_filter AS (%s)", stmt.Query), stmt.Args, nil +} + +func (b *logQueryStatementBuilder) buildResourceFilterCTE( + ctx context.Context, + query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], + start, end uint64, +) (*qbtypes.Statement, error) { + + return b.opts.ResourceFilterStmtBuilder.Build( + ctx, + start, + end, + qbtypes.RequestTypeRaw, + query, + ) +} diff --git a/pkg/telemetrylogs/stmt_builder_test.go b/pkg/telemetrylogs/stmt_builder_test.go new file mode 100644 index 0000000000..418b2ce529 --- /dev/null +++ b/pkg/telemetrylogs/stmt_builder_test.go @@ -0,0 +1,117 @@ +package telemetrylogs + +import ( + "context" + "testing" + "time" + + "github.com/SigNoz/signoz/pkg/querybuilder" + "github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest" + "github.com/stretchr/testify/require" +) + +func resourceFilterStmtBuilder() (qbtypes.StatementBuilder[qbtypes.LogAggregation], error) { + fm := resourcefilter.NewFieldMapper() + cb := resourcefilter.NewConditionBuilder(fm) + mockMetadataStore := telemetrytypestest.NewMockMetadataStore() + mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() + compiler := resourcefilter.NewFilterCompiler(resourcefilter.FilterCompilerOpts{ + FieldMapper: fm, + ConditionBuilder: cb, + MetadataStore: mockMetadataStore, + }) + + return resourcefilter.NewLogResourceFilterStatementBuilder(resourcefilter.ResourceFilterStatementBuilderOpts{ + FieldMapper: fm, + ConditionBuilder: cb, + Compiler: compiler, + }), nil +} + +func TestStatementBuilder(t *testing.T) { + cases := []struct { + name string + requestType qbtypes.RequestType + query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation] + expected qbtypes.Statement + expectedErr error + }{ + { + name: "test", + requestType: qbtypes.RequestTypeTimeSeries, + query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{ + Signal: telemetrytypes.SignalLogs, + StepInterval: qbtypes.Step{Duration: 30 * time.Second}, + Aggregations: []qbtypes.LogAggregation{ + { + Expression: "count()", + }, + }, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'cartservice'", + }, + Limit: 10, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service.name", + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT resources_string['service.name'] AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, resources_string['service.name'] AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL", + Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), uint64(1747947419000000000), uint64(1747983448000000000), uint64(1747945619), uint64(1747983448), 10, uint64(1747947419000000000), uint64(1747983448000000000), uint64(1747945619), uint64(1747983448)}, + }, + expectedErr: nil, + }, + } + + fm := NewFieldMapper() + cb := NewConditionBuilder(fm) + mockMetadataStore := telemetrytypestest.NewMockMetadataStore() + mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() + compiler := NewFilterCompiler(FilterCompilerOpts{ + FieldMapper: fm, + ConditionBuilder: cb, + MetadataStore: mockMetadataStore, + SkipResourceFilter: true, + }) + aggExprRewriter := querybuilder.NewAggExprRewriter(querybuilder.AggExprRewriterOptions{ + FieldMapper: fm, + ConditionBuilder: cb, + MetadataStore: mockMetadataStore, + }) + + resourceFilterStmtBuilder, err := resourceFilterStmtBuilder() + require.NoError(t, err) + + statementBuilder := NewLogQueryStatementBuilder(LogQueryStatementBuilderOpts{ + FieldMapper: fm, + ConditionBuilder: cb, + Compiler: compiler, + MetadataStore: mockMetadataStore, + AggExprRewriter: aggExprRewriter, + ResourceFilterStmtBuilder: resourceFilterStmtBuilder, + }) + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + + q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query) + + if c.expectedErr != nil { + require.Error(t, err) + require.Contains(t, err.Error(), c.expectedErr.Error()) + } else { + require.NoError(t, err) + require.Equal(t, c.expected.Query, q.Query) + require.Equal(t, c.expected.Args, q.Args) + require.Equal(t, c.expected.Warnings, q.Warnings) + } + }) + } +}