chore: add logs statement builder base (#8024)

This commit is contained in:
Srikanth Chekuri 2025-05-27 19:21:38 +05:30 committed by GitHub
parent b921a2280b
commit 62810428d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 614 additions and 0 deletions

View File

@ -0,0 +1,55 @@
package telemetrylogs
import (
"context"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
type FilterCompilerOpts struct {
FieldMapper qbtypes.FieldMapper
ConditionBuilder qbtypes.ConditionBuilder
MetadataStore telemetrytypes.MetadataStore
FullTextColumn *telemetrytypes.TelemetryFieldKey
JsonBodyPrefix string
JsonKeyToKey qbtypes.JsonKeyToFieldFunc
SkipResourceFilter bool
}
type filterCompiler struct {
opts FilterCompilerOpts
}
func NewFilterCompiler(opts FilterCompilerOpts) *filterCompiler {
return &filterCompiler{
opts: opts,
}
}
func (c *filterCompiler) Compile(ctx context.Context, expr string) (*sqlbuilder.WhereClause, []string, error) {
selectors := querybuilder.QueryStringToKeysSelectors(expr)
keys, err := c.opts.MetadataStore.GetKeysMulti(ctx, selectors)
if err != nil {
return nil, nil, err
}
filterWhereClause, warnings, err := querybuilder.PrepareWhereClause(expr, querybuilder.FilterExprVisitorOpts{
FieldMapper: c.opts.FieldMapper,
ConditionBuilder: c.opts.ConditionBuilder,
FieldKeys: keys,
FullTextColumn: c.opts.FullTextColumn,
JsonBodyPrefix: c.opts.JsonBodyPrefix,
JsonKeyToKey: c.opts.JsonKeyToKey,
SkipResourceFilter: c.opts.SkipResourceFilter,
})
if err != nil {
return nil, nil, err
}
return filterWhereClause, warnings, nil
}

View File

@ -0,0 +1,442 @@
package telemetrylogs
import (
"context"
"fmt"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
var (
ErrUnsupportedAggregation = errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported aggregation")
)
type LogQueryStatementBuilderOpts struct {
MetadataStore telemetrytypes.MetadataStore
FieldMapper qbtypes.FieldMapper
ConditionBuilder qbtypes.ConditionBuilder
ResourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
Compiler qbtypes.FilterCompiler
AggExprRewriter qbtypes.AggExprRewriter
}
type logQueryStatementBuilder struct {
opts LogQueryStatementBuilderOpts
fm qbtypes.FieldMapper
cb qbtypes.ConditionBuilder
compiler qbtypes.FilterCompiler
aggExprRewriter qbtypes.AggExprRewriter
}
var _ qbtypes.StatementBuilder[qbtypes.LogAggregation] = (*logQueryStatementBuilder)(nil)
func NewLogQueryStatementBuilder(opts LogQueryStatementBuilderOpts) *logQueryStatementBuilder {
return &logQueryStatementBuilder{
opts: opts,
fm: opts.FieldMapper,
cb: opts.ConditionBuilder,
compiler: opts.Compiler,
aggExprRewriter: opts.AggExprRewriter,
}
}
// Build builds a SQL query for logs based on the given parameters
func (b *logQueryStatementBuilder) Build(
ctx context.Context,
start uint64,
end uint64,
requestType qbtypes.RequestType,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
) (*qbtypes.Statement, error) {
start = querybuilder.ToNanoSecs(start)
end = querybuilder.ToNanoSecs(end)
keySelectors := getKeySelectors(query)
keys, err := b.opts.MetadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
// Create SQL builder
q := sqlbuilder.ClickHouse.NewSelectBuilder()
switch requestType {
case qbtypes.RequestTypeRaw:
return b.buildListQuery(ctx, q, query, start, end, keys)
case qbtypes.RequestTypeTimeSeries:
return b.buildTimeSeriesQuery(ctx, q, query, start, end, keys)
case qbtypes.RequestTypeScalar:
return b.buildScalarQuery(ctx, q, query, start, end, keys, false)
}
return nil, fmt.Errorf("unsupported request type: %s", requestType)
}
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []*telemetrytypes.FieldKeySelector {
var keySelectors []*telemetrytypes.FieldKeySelector
for idx := range query.Aggregations {
aggExpr := query.Aggregations[idx]
selectors := querybuilder.QueryStringToKeysSelectors(aggExpr.Expression)
keySelectors = append(keySelectors, selectors...)
}
whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression)
keySelectors = append(keySelectors, whereClauseSelectors...)
return keySelectors
}
// buildListQuery builds a query for list panel type
func (b *logQueryStatementBuilder) buildListQuery(
ctx context.Context,
sb *sqlbuilder.SelectBuilder,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
start, end uint64,
_ map[string][]*telemetrytypes.TelemetryFieldKey,
) (*qbtypes.Statement, error) {
var (
cteFragments []string
cteArgs [][]any
)
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
// Select default columns
sb.Select(
"timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string",
)
// From table
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
// Add filter conditions
warnings, err := b.addFilterCondition(ctx, sb, start, end, query)
if err != nil {
return nil, err
}
// Add order by
for _, orderBy := range query.Order {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction))
}
// Add limit and offset
if query.Limit > 0 {
sb.Limit(query.Limit)
}
if query.Offset > 0 {
sb.Offset(query.Offset)
}
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
finalSQL := combineCTEs(cteFragments) + mainSQL
finalArgs := prependArgs(cteArgs, mainArgs)
return &qbtypes.Statement{
Query: finalSQL,
Args: finalArgs,
Warnings: warnings,
}, nil
}
func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
ctx context.Context,
sb *sqlbuilder.SelectBuilder,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
start, end uint64,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
) (*qbtypes.Statement, error) {
var (
cteFragments []string
cteArgs [][]any
)
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts",
int64(query.StepInterval.Seconds()),
))
// Keep original column expressions so we can build the tuple
fieldNames := make([]string, 0, len(query.GroupBy))
for _, gb := range query.GroupBy {
colExpr, err := b.fm.ColumnExpressionFor(ctx, &gb.TelemetryFieldKey, keys)
if err != nil {
return nil, err
}
sb.SelectMore(colExpr)
fieldNames = append(fieldNames, fmt.Sprintf("`%s`", gb.TelemetryFieldKey.Name))
}
// Aggregations
allAggChArgs := make([]any, 0)
for i, agg := range query.Aggregations {
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(ctx, agg.Expression)
if err != nil {
return nil, err
}
allAggChArgs = append(allAggChArgs, chArgs...)
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, i))
}
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
warnings, err := b.addFilterCondition(ctx, sb, start, end, query)
if err != nil {
return nil, err
}
var finalSQL string
var finalArgs []any
if query.Limit > 0 {
// build the scalar “top/bottom-N” query in its own builder.
cteSB := sqlbuilder.ClickHouse.NewSelectBuilder()
cteStmt, err := b.buildScalarQuery(ctx, cteSB, query, start, end, keys, true)
if err != nil {
return nil, err
}
cteFragments = append(cteFragments, fmt.Sprintf("__limit_cte AS (%s)", cteStmt.Query))
cteArgs = append(cteArgs, cteStmt.Args)
// Constrain the main query to the rows that appear in the CTE.
tuple := fmt.Sprintf("(%s)", strings.Join(fieldNames, ", "))
sb.Where(fmt.Sprintf("%s IN (SELECT %s FROM __limit_cte)", tuple, strings.Join(fieldNames, ", ")))
// Group by all dimensions
sb.GroupBy("ALL")
if query.Having != nil && query.Having.Expression != "" {
sb.Having(query.Having.Expression)
}
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...)
// Stitch it all together: WITH … SELECT …
finalSQL = combineCTEs(cteFragments) + mainSQL
finalArgs = prependArgs(cteArgs, mainArgs)
} else {
sb.GroupBy("ALL")
if query.Having != nil && query.Having.Expression != "" {
sb.Having(query.Having.Expression)
}
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...)
// Stitch it all together: WITH … SELECT …
finalSQL = combineCTEs(cteFragments) + mainSQL
finalArgs = prependArgs(cteArgs, mainArgs)
}
return &qbtypes.Statement{
Query: finalSQL,
Args: finalArgs,
Warnings: warnings,
}, nil
}
// buildScalarQuery builds a query for scalar panel type
func (b *logQueryStatementBuilder) buildScalarQuery(
ctx context.Context,
sb *sqlbuilder.SelectBuilder,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
start, end uint64,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
skipResourceCTE bool,
) (*qbtypes.Statement, error) {
var (
cteFragments []string
cteArgs [][]any
)
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end); err != nil {
return nil, err
} else if frag != "" && !skipResourceCTE {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
allAggChArgs := []any{}
// Add group by columns
for _, groupBy := range query.GroupBy {
colExpr, err := b.fm.ColumnExpressionFor(ctx, &groupBy.TelemetryFieldKey, keys)
if err != nil {
return nil, err
}
sb.SelectMore(colExpr)
}
// Add aggregation
if len(query.Aggregations) > 0 {
for idx := range query.Aggregations {
aggExpr := query.Aggregations[idx]
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(ctx, aggExpr.Expression)
if err != nil {
return nil, err
}
allAggChArgs = append(allAggChArgs, chArgs...)
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, idx))
}
}
// From table
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
// Add filter conditions
warnings, err := b.addFilterCondition(ctx, sb, start, end, query)
if err != nil {
return nil, err
}
// Group by dimensions
sb.GroupBy("ALL")
// Add having clause if needed
if query.Having != nil && query.Having.Expression != "" {
sb.Having(query.Having.Expression)
}
// Add order by
for _, orderBy := range query.Order {
idx, ok := aggOrderBy(orderBy, query)
if ok {
sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction))
} else {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction))
}
}
// if there is no order by, then use the __result_0 as the order by
if len(query.Order) == 0 {
sb.OrderBy("__result_0 DESC")
}
// Add limit and offset
if query.Limit > 0 {
sb.Limit(query.Limit)
}
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...)
finalSQL := combineCTEs(cteFragments) + mainSQL
finalArgs := prependArgs(cteArgs, mainArgs)
return &qbtypes.Statement{
Query: finalSQL,
Args: finalArgs,
Warnings: warnings,
}, nil
}
// buildFilterCondition builds SQL condition from filter expression
func (b *logQueryStatementBuilder) addFilterCondition(ctx context.Context, sb *sqlbuilder.SelectBuilder, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) ([]string, error) {
// add filter expression
filterWhereClause, warnings, err := b.compiler.Compile(ctx, query.Filter.Expression)
if err != nil {
return nil, err
}
if filterWhereClause != nil {
sb.AddWhereClause(filterWhereClause)
}
// add time filter
startBucket := start/1000000000 - 1800
endBucket := end / 1000000000
sb.Where(sb.GE("timestamp", start), sb.LE("timestamp", end), sb.GE("ts_bucket_start", startBucket), sb.LE("ts_bucket_start", endBucket))
return warnings, nil
}
// combineCTEs takes any number of individual CTE fragments like
//
// "__resource_filter AS (...)", "__limit_cte AS (...)"
//
// and renders the final `WITH …` clause.
func combineCTEs(ctes []string) string {
if len(ctes) == 0 {
return ""
}
return "WITH " + strings.Join(ctes, ", ") + " "
}
// prependArgs ensures CTE arguments appear before main-query arguments
// in the final slice so their ordinal positions match the SQL string.
func prependArgs(cteArgs [][]any, mainArgs []any) []any {
out := make([]any, 0, len(mainArgs)+len(cteArgs))
for _, a := range cteArgs { // CTEs first, in declaration order
out = append(out, a...)
}
return append(out, mainArgs...)
}
func aggOrderBy(k qbtypes.OrderBy, q qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) (int, bool) {
for i, agg := range q.Aggregations {
if k.Key.Name == agg.Alias ||
k.Key.Name == agg.Expression ||
k.Key.Name == fmt.Sprintf("%d", i) {
return i, true
}
}
return 0, false
}
func (b *logQueryStatementBuilder) maybeAttachResourceFilter(
ctx context.Context,
sb *sqlbuilder.SelectBuilder,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
start, end uint64,
) (cteSQL string, cteArgs []any, err error) {
stmt, err := b.buildResourceFilterCTE(ctx, query, start, end)
if err != nil {
return "", nil, err
}
sb.Where("resource_fingerprint IN (SELECT fingerprint FROM __resource_filter)")
return fmt.Sprintf("__resource_filter AS (%s)", stmt.Query), stmt.Args, nil
}
func (b *logQueryStatementBuilder) buildResourceFilterCTE(
ctx context.Context,
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
start, end uint64,
) (*qbtypes.Statement, error) {
return b.opts.ResourceFilterStmtBuilder.Build(
ctx,
start,
end,
qbtypes.RequestTypeRaw,
query,
)
}

View File

@ -0,0 +1,117 @@
package telemetrylogs
import (
"context"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/require"
)
func resourceFilterStmtBuilder() (qbtypes.StatementBuilder[qbtypes.LogAggregation], error) {
fm := resourcefilter.NewFieldMapper()
cb := resourcefilter.NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
compiler := resourcefilter.NewFilterCompiler(resourcefilter.FilterCompilerOpts{
FieldMapper: fm,
ConditionBuilder: cb,
MetadataStore: mockMetadataStore,
})
return resourcefilter.NewLogResourceFilterStatementBuilder(resourcefilter.ResourceFilterStatementBuilderOpts{
FieldMapper: fm,
ConditionBuilder: cb,
Compiler: compiler,
}), nil
}
func TestStatementBuilder(t *testing.T) {
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
expected qbtypes.Statement
expectedErr error
}{
{
name: "test",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.LogAggregation{
{
Expression: "count()",
},
},
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT resources_string['service.name'] AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, resources_string['service.name'] AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL",
Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), uint64(1747947419000000000), uint64(1747983448000000000), uint64(1747945619), uint64(1747983448), 10, uint64(1747947419000000000), uint64(1747983448000000000), uint64(1747945619), uint64(1747983448)},
},
expectedErr: nil,
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
compiler := NewFilterCompiler(FilterCompilerOpts{
FieldMapper: fm,
ConditionBuilder: cb,
MetadataStore: mockMetadataStore,
SkipResourceFilter: true,
})
aggExprRewriter := querybuilder.NewAggExprRewriter(querybuilder.AggExprRewriterOptions{
FieldMapper: fm,
ConditionBuilder: cb,
MetadataStore: mockMetadataStore,
})
resourceFilterStmtBuilder, err := resourceFilterStmtBuilder()
require.NoError(t, err)
statementBuilder := NewLogQueryStatementBuilder(LogQueryStatementBuilderOpts{
FieldMapper: fm,
ConditionBuilder: cb,
Compiler: compiler,
MetadataStore: mockMetadataStore,
AggExprRewriter: aggExprRewriter,
ResourceFilterStmtBuilder: resourceFilterStmtBuilder,
})
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query)
if c.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
}
}