diff --git a/pkg/querier/builder_query.go b/pkg/querier/builder_query.go new file mode 100644 index 0000000000..dfb041dd65 --- /dev/null +++ b/pkg/querier/builder_query.go @@ -0,0 +1,204 @@ +package querier + +import ( + "context" + "encoding/base64" + "strconv" + "strings" + "time" + + "github.com/SigNoz/signoz/pkg/telemetrystore" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" +) + +type builderQuery[T any] struct { + telemetryStore telemetrystore.TelemetryStore + stmtBuilder qbtypes.StatementBuilder[T] + spec qbtypes.QueryBuilderQuery[T] + + fromMS uint64 + toMS uint64 + kind qbtypes.RequestType +} + +var _ qbtypes.Query = (*builderQuery[any])(nil) + +func newBuilderQuery[T any]( + telemetryStore telemetrystore.TelemetryStore, + stmtBuilder qbtypes.StatementBuilder[T], + spec qbtypes.QueryBuilderQuery[T], + tr qbtypes.TimeRange, + kind qbtypes.RequestType, +) *builderQuery[T] { + return &builderQuery[T]{ + telemetryStore: telemetryStore, + stmtBuilder: stmtBuilder, + spec: spec, + fromMS: tr.From, + toMS: tr.To, + kind: kind, + } +} + +func (q *builderQuery[T]) Fingerprint() string { + // TODO: implement this + return "" +} + +func (q *builderQuery[T]) Window() (uint64, uint64) { + return q.fromMS, q.toMS +} + +// must be a single query, ordered by timestamp (logs need an id tie-break). +func (q *builderQuery[T]) isWindowList() bool { + if len(q.spec.Order) == 0 { + return false + } + + // first ORDER BY must be `timestamp` + if q.spec.Order[0].Key.Name != "timestamp" { + return false + } + + if q.spec.Signal == telemetrytypes.SignalLogs { + // logs require timestamp,id with identical direction + if len(q.spec.Order) != 2 || q.spec.Order[1].Key.Name != "id" || + q.spec.Order[1].Direction != q.spec.Order[0].Direction { + return false + } + } + return true +} + +func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error) { + + // can we do window based pagination? + if q.kind == qbtypes.RequestTypeRaw && q.isWindowList() { + return q.executeWindowList(ctx) + } + + stmt, err := q.stmtBuilder.Build(ctx, q.fromMS, q.toMS, q.kind, q.spec) + if err != nil { + return nil, err + } + + chQuery := qbtypes.ClickHouseQuery{ + Name: q.spec.Name, + Query: stmt.Query, + } + + chExec := newchSQLQuery(q.telemetryStore, chQuery, stmt.Args, qbtypes.TimeRange{From: q.fromMS, To: q.toMS}, q.kind) + result, err := chExec.Execute(ctx) + if err != nil { + return nil, err + } + result.Warnings = stmt.Warnings + return result, nil +} + +func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Result, error) { + isAsc := len(q.spec.Order) > 0 && + strings.ToLower(string(q.spec.Order[0].Direction.StringValue())) == "asc" + + // Adjust [fromMS,toMS] window if a cursor was supplied + if cur := strings.TrimSpace(q.spec.Cursor); cur != "" { + if ts, err := decodeCursor(cur); err == nil { + if isAsc { + if uint64(ts) >= q.fromMS { + q.fromMS = uint64(ts + 1) + } + } else { // DESC + if uint64(ts) <= q.toMS { + q.toMS = uint64(ts - 1) + } + } + } + } + + reqLimit := q.spec.Limit + if reqLimit == 0 { + reqLimit = 10_000 // sane upper-bound default + } + offsetLeft := q.spec.Offset + need := reqLimit + offsetLeft // rows to fetch from ClickHouse + + var rows []*qbtypes.RawRow + + totalRows := uint64(0) + totalBytes := uint64(0) + start := time.Now() + + for _, r := range makeBuckets(q.fromMS, q.toMS) { + q.spec.Offset = 0 + q.spec.Limit = need + + stmt, err := q.stmtBuilder.Build(ctx, r.fromNS/1e6, r.toNS/1e6, q.kind, q.spec) + if err != nil { + return nil, err + } + + chExec := newchSQLQuery( + q.telemetryStore, + qbtypes.ClickHouseQuery{Name: q.spec.Name, Query: stmt.Query}, + stmt.Args, + qbtypes.TimeRange{From: q.fromMS, To: q.toMS}, + q.kind, + ) + res, err := chExec.Execute(ctx) + if err != nil { + return nil, err + } + totalRows += res.Stats.RowsScanned + totalBytes += res.Stats.BytesScanned + + rawRows := res.Value.(*qbtypes.RawData).Rows + need -= len(rawRows) + + for _, rr := range rawRows { + if offsetLeft > 0 { // client-requested initial offset + offsetLeft-- + continue + } + rows = append(rows, rr) + if len(rows) >= reqLimit { // page filled + break + } + } + if len(rows) >= reqLimit { + break + } + } + + nextCursor := "" + if len(rows) == reqLimit { + lastTS := rows[len(rows)-1].Timestamp.UnixMilli() + nextCursor = encodeCursor(lastTS) + } + + return &qbtypes.Result{ + Type: qbtypes.RequestTypeRaw, + Value: &qbtypes.RawData{ + QueryName: q.spec.Name, + Rows: rows, + NextCursor: nextCursor, + }, + Stats: qbtypes.ExecStats{ + RowsScanned: totalRows, + BytesScanned: totalBytes, + DurationMS: uint64(time.Since(start).Milliseconds()), + }, + }, nil +} + +func encodeCursor(tsMilli int64) string { + return base64.StdEncoding.EncodeToString([]byte(strconv.FormatInt(tsMilli, 10))) +} + +func decodeCursor(cur string) (int64, error) { + b, err := base64.StdEncoding.DecodeString(cur) + if err != nil { + return 0, err + } + return strconv.ParseInt(string(b), 10, 64) +} diff --git a/pkg/querier/clickhouse_query.go b/pkg/querier/clickhouse_query.go new file mode 100644 index 0000000000..c5973ede37 --- /dev/null +++ b/pkg/querier/clickhouse_query.go @@ -0,0 +1,77 @@ +package querier + +import ( + "context" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/SigNoz/signoz/pkg/telemetrystore" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" +) + +type chSQLQuery struct { + telemetryStore telemetrystore.TelemetryStore + + query qbtypes.ClickHouseQuery + args []any + fromMS uint64 + toMS uint64 + kind qbtypes.RequestType +} + +var _ qbtypes.Query = (*chSQLQuery)(nil) + +func newchSQLQuery( + telemetryStore telemetrystore.TelemetryStore, + query qbtypes.ClickHouseQuery, + args []any, + tr qbtypes.TimeRange, + kind qbtypes.RequestType, +) *chSQLQuery { + return &chSQLQuery{ + telemetryStore: telemetryStore, + query: query, + args: args, + fromMS: tr.From, + toMS: tr.To, + kind: kind, + } +} + +// TODO: use the same query hash scheme as ClickHouse +func (q *chSQLQuery) Fingerprint() string { return q.query.Query } +func (q *chSQLQuery) Window() (uint64, uint64) { return q.fromMS, q.toMS } + +func (q *chSQLQuery) Execute(ctx context.Context) (*qbtypes.Result, error) { + + totalRows := uint64(0) + totalBytes := uint64(0) + elapsed := time.Duration(0) + + ctx = clickhouse.Context(ctx, clickhouse.WithProgress(func(p *clickhouse.Progress) { + totalRows += p.Rows + totalBytes += p.Bytes + elapsed += p.Elapsed + })) + + rows, err := q.telemetryStore.ClickhouseDB().Query(ctx, q.query.Query, q.args...) + if err != nil { + return nil, err + } + defer rows.Close() + + // TODO: map the errors from ClickHouse to our error types + payload, err := consume(rows, q.kind) + if err != nil { + return nil, err + } + return &qbtypes.Result{ + Type: q.kind, + Value: payload, + Stats: qbtypes.ExecStats{ + RowsScanned: totalRows, + BytesScanned: totalBytes, + DurationMS: uint64(elapsed.Milliseconds()), + }, + }, nil +} diff --git a/pkg/querier/consume.go b/pkg/querier/consume.go new file mode 100644 index 0000000000..ba51387812 --- /dev/null +++ b/pkg/querier/consume.go @@ -0,0 +1,373 @@ +package querier + +import ( + "fmt" + "math" + "reflect" + "regexp" + "sort" + "strconv" + "strings" + "time" + + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" +) + +var ( + aggRe = regexp.MustCompile(`^__result_(\d+)$`) +) + +// consume reads every row and shapes it into the payload expected for the +// given request type. +// +// * Time-series - []*qbtypes.TimeSeriesData +// * Scalar - []*qbtypes.ScalarData +// * Raw - []*qbtypes.RawData +// * Distribution- []*qbtypes.DistributionData +func consume(rows driver.Rows, kind qbtypes.RequestType) (any, error) { + var ( + payload any + err error + ) + + switch kind { + case qbtypes.RequestTypeTimeSeries: + payload, err = readAsTimeSeries(rows) + case qbtypes.RequestTypeScalar: + payload, err = readAsScalar(rows) + case qbtypes.RequestTypeRaw: + payload, err = readAsRaw(rows) + // TODO: add support for other request types + } + + return payload, err +} + +func readAsTimeSeries(rows driver.Rows) ([]*qbtypes.TimeSeriesData, error) { + + colTypes := rows.ColumnTypes() + colNames := rows.Columns() + + slots := make([]any, len(colTypes)) + numericColsCount := 0 + for i, ct := range colTypes { + slots[i] = reflect.New(ct.ScanType()).Interface() + if numericKind(ct.ScanType().Kind()) { + numericColsCount++ + } + } + + type sKey struct { + agg int + key string // deterministic join of label values + } + seriesMap := map[sKey]*qbtypes.TimeSeries{} + + for rows.Next() { + if err := rows.Scan(slots...); err != nil { + return nil, err + } + + var ( + ts int64 + lblVals []string + lblObjs []*qbtypes.Label + aggValues = map[int]float64{} // all __result_N in this row + fallbackValue float64 // value when NO __result_N columns exist + fallbackSeen bool + ) + + for idx, ptr := range slots { + name := colNames[idx] + + switch v := ptr.(type) { + case *time.Time: + ts = v.UnixMilli() + + case *float64, *float32, *int64, *int32, *uint64, *uint32: + val := numericAsFloat(reflect.ValueOf(ptr).Elem().Interface()) + if m := aggRe.FindStringSubmatch(name); m != nil { + id, _ := strconv.Atoi(m[1]) + aggValues[id] = val + } else if numericColsCount == 1 { // classic single-value query + fallbackValue = val + fallbackSeen = true + } else { + // numeric label + lblVals = append(lblVals, fmt.Sprint(val)) + lblObjs = append(lblObjs, &qbtypes.Label{ + Key: telemetrytypes.TelemetryFieldKey{Name: name}, + Value: val, + }) + } + + case **float64, **float32, **int64, **int32, **uint64, **uint32: + tempVal := reflect.ValueOf(ptr) + if tempVal.IsValid() && !tempVal.IsNil() && !tempVal.Elem().IsNil() { + val := numericAsFloat(tempVal.Elem().Elem().Interface()) + if m := aggRe.FindStringSubmatch(name); m != nil { + id, _ := strconv.Atoi(m[1]) + aggValues[id] = val + } else if numericColsCount == 1 { // classic single-value query + fallbackValue = val + fallbackSeen = true + } else { + // numeric label + lblVals = append(lblVals, fmt.Sprint(val)) + lblObjs = append(lblObjs, &qbtypes.Label{ + Key: telemetrytypes.TelemetryFieldKey{Name: name}, + Value: val, + }) + } + } + + case *string: + lblVals = append(lblVals, *v) + lblObjs = append(lblObjs, &qbtypes.Label{ + Key: telemetrytypes.TelemetryFieldKey{Name: name}, + Value: *v, + }) + + case **string: + val := *v + if val == nil { + var empty string + val = &empty + } + lblVals = append(lblVals, *val) + lblObjs = append(lblObjs, &qbtypes.Label{ + Key: telemetrytypes.TelemetryFieldKey{Name: name}, + Value: val, + }) + + default: + continue + } + } + + // Edge-case: no __result_N columns, but a single numeric column present + if len(aggValues) == 0 && fallbackSeen { + aggValues[0] = fallbackValue + } + + if ts == 0 || len(aggValues) == 0 { + continue // nothing useful + } + + sort.Strings(lblVals) + labelsKey := strings.Join(lblVals, ",") + + // one point per aggregation in this row + for aggIdx, val := range aggValues { + if math.IsNaN(val) || math.IsInf(val, 0) { + continue + } + + key := sKey{agg: aggIdx, key: labelsKey} + + series, ok := seriesMap[key] + if !ok { + series = &qbtypes.TimeSeries{Labels: lblObjs} + seriesMap[key] = series + } + series.Values = append(series.Values, &qbtypes.TimeSeriesValue{ + Timestamp: ts, + Value: val, + }) + } + } + if err := rows.Err(); err != nil { + return nil, err + } + + maxAgg := -1 + for k := range seriesMap { + if k.agg > maxAgg { + maxAgg = k.agg + } + } + if maxAgg < 0 { + return nil, nil // empty result-set + } + + buckets := make([]*qbtypes.AggregationBucket, maxAgg+1) + for i := range buckets { + buckets[i] = &qbtypes.AggregationBucket{ + Index: i, + Alias: "__result_" + strconv.Itoa(i), + } + } + for k, s := range seriesMap { + buckets[k.agg].Series = append(buckets[k.agg].Series, s) + } + + var nonEmpty []*qbtypes.AggregationBucket + for _, b := range buckets { + if len(b.Series) > 0 { + nonEmpty = append(nonEmpty, b) + } + } + + return []*qbtypes.TimeSeriesData{{ + Aggregations: nonEmpty, + }}, nil +} + +func numericKind(k reflect.Kind) bool { + switch k { + case reflect.Float32, reflect.Float64, + reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, + reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + return true + default: + return false + } +} + +func readAsScalar(rows driver.Rows) (*qbtypes.ScalarData, error) { + colNames := rows.Columns() + colTypes := rows.ColumnTypes() + + cd := make([]*qbtypes.ColumnDescriptor, len(colNames)) + + for i, name := range colNames { + colType := qbtypes.ColumnTypeGroup + if aggRe.MatchString(name) { + colType = qbtypes.ColumnTypeAggregation + } + cd[i] = &qbtypes.ColumnDescriptor{ + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: name}, + AggregationIndex: int64(i), + Type: colType, + } + } + + var data [][]any + + for rows.Next() { + scan := make([]any, len(colTypes)) + for i := range scan { + scan[i] = reflect.New(colTypes[i].ScanType()).Interface() + } + if err := rows.Scan(scan...); err != nil { + return nil, err + } + + // 2. deref each slot into the output row + row := make([]any, len(scan)) + for i, cell := range scan { + valPtr := reflect.ValueOf(cell) + if valPtr.Kind() == reflect.Pointer && !valPtr.IsNil() { + row[i] = valPtr.Elem().Interface() + } else { + row[i] = nil // Nullable columns come back as nil pointers + } + } + data = append(data, row) + } + if err := rows.Err(); err != nil { + return nil, err + } + + return &qbtypes.ScalarData{ + Columns: cd, + Data: data, + }, nil +} + +func readAsRaw(rows driver.Rows) (*qbtypes.RawData, error) { + + colNames := rows.Columns() + colTypes := rows.ColumnTypes() + colCnt := len(colNames) + + // Build a template slice of correctly-typed pointers once + scanTpl := make([]any, colCnt) + for i, ct := range colTypes { + scanTpl[i] = reflect.New(ct.ScanType()).Interface() + } + + var outRows []*qbtypes.RawRow + + for rows.Next() { + // fresh copy of the scan slice (otherwise the driver reuses pointers) + scan := make([]any, colCnt) + for i := range scanTpl { + scan[i] = reflect.New(colTypes[i].ScanType()).Interface() + } + + if err := rows.Scan(scan...); err != nil { + return nil, err + } + + rr := qbtypes.RawRow{ + Data: make(map[string]*any, colCnt), + } + + for i, cellPtr := range scan { + name := colNames[i] + + // de-reference the typed pointer to any + val := reflect.ValueOf(cellPtr).Elem().Interface() + + // special-case: timestamp column + if name == "timestamp" || name == "timestamp_datetime" { + switch t := val.(type) { + case time.Time: + rr.Timestamp = t + case uint64: // epoch-ns stored as integer + rr.Timestamp = time.Unix(0, int64(t)) + case int64: + rr.Timestamp = time.Unix(0, t) + default: + // leave zero time if unrecognised + } + } + + // store value in map as *any, to match the schema + v := any(val) + rr.Data[name] = &v + } + outRows = append(outRows, &rr) + } + if err := rows.Err(); err != nil { + return nil, err + } + + return &qbtypes.RawData{ + Rows: outRows, + }, nil +} + +func numericAsFloat(v any) float64 { + switch x := v.(type) { + case float64: + return x + case float32: + return float64(x) + case int64: + return float64(x) + case int32: + return float64(x) + case int16: + return float64(x) + case int8: + return float64(x) + case int: + return float64(x) + case uint64: + return float64(x) + case uint32: + return float64(x) + case uint16: + return float64(x) + case uint8: + return float64(x) + case uint: + return float64(x) + default: + return math.NaN() + } +} diff --git a/pkg/querier/list_range.go b/pkg/querier/list_range.go new file mode 100644 index 0000000000..b62f932ba0 --- /dev/null +++ b/pkg/querier/list_range.go @@ -0,0 +1,36 @@ +package querier + +import "github.com/SigNoz/signoz/pkg/querybuilder" + +const hourNanos = int64(3_600_000_000_000) // 1 h in ns + +type tsRange struct{ fromNS, toNS uint64 } + +// slice the timerange into exponentially growing buckets +func makeBuckets(start, end uint64) []tsRange { + startNS := querybuilder.ToNanoSecs(start) + endNS := querybuilder.ToNanoSecs(end) + + if endNS-startNS <= uint64(hourNanos) { + return []tsRange{{fromNS: startNS, toNS: endNS}} + } + + var out []tsRange + bucket := uint64(hourNanos) + curEnd := endNS + + for { + curStart := curEnd - bucket + if curStart < startNS { + curStart = startNS + } + out = append(out, tsRange{fromNS: curStart, toNS: curEnd}) + + if curStart == startNS { + break + } + curEnd = curStart + bucket *= 2 + } + return out +} diff --git a/pkg/querier/promql_query.go b/pkg/querier/promql_query.go new file mode 100644 index 0000000000..2934563749 --- /dev/null +++ b/pkg/querier/promql_query.go @@ -0,0 +1,41 @@ +package querier + +import ( + "context" + + "github.com/SigNoz/signoz/pkg/prometheus" + qbv5 "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" +) + +type promqlQuery struct { + promEngine prometheus.Prometheus + query qbv5.PromQuery + tr qbv5.TimeRange + requestType qbv5.RequestType +} + +var _ qbv5.Query = (*promqlQuery)(nil) + +func newPromqlQuery( + promEngine prometheus.Prometheus, + query qbv5.PromQuery, + tr qbv5.TimeRange, + requestType qbv5.RequestType, +) *promqlQuery { + return &promqlQuery{promEngine, query, tr, requestType} +} + +func (q *promqlQuery) Fingerprint() string { + // TODO: Implement this + return "" +} + +func (q *promqlQuery) Window() (uint64, uint64) { + return q.tr.From, q.tr.To +} + +func (q *promqlQuery) Execute(ctx context.Context) (*qbv5.Result, error) { + // TODO: Implement this + //nolint:nilnil + return nil, nil +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go new file mode 100644 index 0000000000..070b480ce3 --- /dev/null +++ b/pkg/querier/querier.go @@ -0,0 +1,96 @@ +package querier + +import ( + "context" + + "github.com/SigNoz/signoz/pkg/errors" + "github.com/SigNoz/signoz/pkg/prometheus" + "github.com/SigNoz/signoz/pkg/telemetrystore" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" +) + +type querier struct { + telemetryStore telemetrystore.TelemetryStore + metadataStore telemetrytypes.MetadataStore + promEngine prometheus.Prometheus + traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation] + logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation] + metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation] +} + +func NewQuerier( + telemetryStore telemetrystore.TelemetryStore, + metadataStore telemetrytypes.MetadataStore, + promEngine prometheus.Prometheus, + traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation], + logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation], + metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation], +) *querier { + return &querier{ + telemetryStore: telemetryStore, + metadataStore: metadataStore, + promEngine: promEngine, + traceStmtBuilder: traceStmtBuilder, + logStmtBuilder: logStmtBuilder, + metricStmtBuilder: metricStmtBuilder, + } +} + +func (q *querier) QueryRange(ctx context.Context, orgID string, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error) { + + queries := make(map[string]qbtypes.Query) + + for _, query := range req.CompositeQuery.Queries { + switch query.Type { + case qbtypes.QueryTypePromQL: + promQuery, ok := query.Spec.(qbtypes.PromQuery) + if !ok { + return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query spec %T", query.Spec) + } + promqlQuery := newPromqlQuery(q.promEngine, promQuery, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) + queries[query.Name] = promqlQuery + case qbtypes.QueryTypeClickHouseSQL: + chQuery, ok := query.Spec.(qbtypes.ClickHouseQuery) + if !ok { + return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid clickhouse query spec %T", query.Spec) + } + chSQLQuery := newchSQLQuery(q.telemetryStore, chQuery, nil, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) + queries[query.Name] = chSQLQuery + case qbtypes.QueryTypeBuilder: + switch spec := query.Spec.(type) { + case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]: + bq := newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) + queries[query.Name] = bq + + case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]: + bq := newBuilderQuery(q.telemetryStore, q.logStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) + queries[query.Name] = bq + + case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]: + bq := newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) + queries[query.Name] = bq + default: + return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported builder spec type %T", query.Spec) + } + } + } + return q.run(ctx, orgID, queries, req.RequestType) +} + +func (q *querier) run(ctx context.Context, _ string, qs map[string]qbtypes.Query, kind qbtypes.RequestType) (*qbtypes.QueryRangeResponse, error) { + results := make([]*qbtypes.Result, 0, len(qs)) + for _, query := range qs { + // TODO: run in controlled batches + result, err := query.Execute(ctx) + if err != nil { + return nil, err + } + results = append(results, result) + } + return &qbtypes.QueryRangeResponse{ + Type: kind, + Data: results, + }, nil +} diff --git a/pkg/querybuilder/agg_funcs.go b/pkg/querybuilder/agg_funcs.go index 80279c5a70..4879d923cb 100644 --- a/pkg/querybuilder/agg_funcs.go +++ b/pkg/querybuilder/agg_funcs.go @@ -13,6 +13,7 @@ type AggrFunc struct { FuncName string Aliases []valuer.String RequireArgs bool + Numeric bool FuncCombinator bool Rate bool MinArgs int @@ -46,156 +47,156 @@ var ( AggrFuncSum = AggrFunc{ Name: valuer.NewString("sum"), FuncName: "sum", - RequireArgs: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, MinArgs: 1, MaxArgs: 1, } AggrFuncSumIf = AggrFunc{ Name: valuer.NewString("sumif"), FuncName: "sumIf", Aliases: []valuer.String{valuer.NewString("sum_if")}, - RequireArgs: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, + RequireArgs: true, Numeric: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, } AggrFuncAvg = AggrFunc{ Name: valuer.NewString("avg"), FuncName: "avg", - RequireArgs: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, MinArgs: 1, MaxArgs: 1, } AggrFuncAvgIf = AggrFunc{ Name: valuer.NewString("avgif"), FuncName: "avgIf", Aliases: []valuer.String{valuer.NewString("avg_if")}, - RequireArgs: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, + RequireArgs: true, Numeric: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, } AggrFuncMin = AggrFunc{ Name: valuer.NewString("min"), FuncName: "min", - RequireArgs: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, MinArgs: 1, MaxArgs: 1, } AggrFuncMinIf = AggrFunc{ Name: valuer.NewString("minif"), FuncName: "minIf", Aliases: []valuer.String{valuer.NewString("min_if")}, - RequireArgs: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, + RequireArgs: true, Numeric: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, } AggrFuncMax = AggrFunc{ Name: valuer.NewString("max"), FuncName: "max", - RequireArgs: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, MinArgs: 1, MaxArgs: 1, } AggrFuncMaxIf = AggrFunc{ Name: valuer.NewString("maxif"), FuncName: "maxIf", Aliases: []valuer.String{valuer.NewString("max_if")}, - RequireArgs: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, + RequireArgs: true, Numeric: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, } AggrFuncP05 = AggrFunc{ Name: valuer.NewString("p05"), FuncName: "quantile(0.05)", - RequireArgs: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, MinArgs: 1, MaxArgs: 1, } AggrFuncP05IF = AggrFunc{ Name: valuer.NewString("p05if"), FuncName: "quantileIf(0.05)", Aliases: []valuer.String{valuer.NewString("p05_if")}, - RequireArgs: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, + RequireArgs: true, Numeric: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, } AggrFuncP10 = AggrFunc{ Name: valuer.NewString("p10"), FuncName: "quantile(0.10)", - RequireArgs: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, MinArgs: 1, MaxArgs: 1, } AggrFuncP10IF = AggrFunc{ Name: valuer.NewString("p10if"), FuncName: "quantileIf(0.10)", Aliases: []valuer.String{valuer.NewString("p10_if")}, - RequireArgs: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, + RequireArgs: true, Numeric: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, } AggrFuncP20 = AggrFunc{ Name: valuer.NewString("p20"), FuncName: "quantile(0.20)", - RequireArgs: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, MinArgs: 1, MaxArgs: 1, } AggrFuncP20IF = AggrFunc{ Name: valuer.NewString("p20if"), FuncName: "quantileIf(0.20)", Aliases: []valuer.String{valuer.NewString("p20_if")}, - RequireArgs: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, + RequireArgs: true, Numeric: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, } AggrFuncP25 = AggrFunc{ Name: valuer.NewString("p25"), FuncName: "quantile(0.25)", - RequireArgs: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, MinArgs: 1, MaxArgs: 1, } AggrFuncP25IF = AggrFunc{ Name: valuer.NewString("p25if"), FuncName: "quantileIf(0.25)", Aliases: []valuer.String{valuer.NewString("p25_if")}, - RequireArgs: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, + RequireArgs: true, Numeric: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, } AggrFuncP50 = AggrFunc{ Name: valuer.NewString("p50"), FuncName: "quantile(0.50)", - RequireArgs: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, MinArgs: 1, MaxArgs: 1, } AggrFuncP50IF = AggrFunc{ Name: valuer.NewString("p50if"), FuncName: "quantileIf(0.50)", Aliases: []valuer.String{valuer.NewString("p50_if")}, - RequireArgs: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, + RequireArgs: true, Numeric: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, } AggrFuncP75 = AggrFunc{ Name: valuer.NewString("p75"), FuncName: "quantile(0.75)", - RequireArgs: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, MinArgs: 1, MaxArgs: 1, } AggrFuncP75IF = AggrFunc{ Name: valuer.NewString("p75if"), FuncName: "quantileIf(0.75)", Aliases: []valuer.String{valuer.NewString("p75_if")}, - RequireArgs: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, + RequireArgs: true, Numeric: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, } AggrFuncP90 = AggrFunc{ Name: valuer.NewString("p90"), FuncName: "quantile(0.90)", - RequireArgs: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, MinArgs: 1, MaxArgs: 1, } AggrFuncP90IF = AggrFunc{ Name: valuer.NewString("p90if"), FuncName: "quantileIf(0.90)", Aliases: []valuer.String{valuer.NewString("p90_if")}, - RequireArgs: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, + RequireArgs: true, Numeric: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, } AggrFuncP95 = AggrFunc{ Name: valuer.NewString("p95"), FuncName: "quantile(0.95)", - RequireArgs: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, MinArgs: 1, MaxArgs: 1, } AggrFuncP95IF = AggrFunc{ Name: valuer.NewString("p95if"), FuncName: "quantileIf(0.95)", Aliases: []valuer.String{valuer.NewString("p95_if")}, - RequireArgs: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, + RequireArgs: true, Numeric: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, } AggrFuncP99 = AggrFunc{ Name: valuer.NewString("p99"), FuncName: "quantile(0.99)", - RequireArgs: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, MinArgs: 1, MaxArgs: 1, } AggrFuncP99IF = AggrFunc{ Name: valuer.NewString("p99if"), FuncName: "quantileIf(0.99)", Aliases: []valuer.String{valuer.NewString("p99_if")}, - RequireArgs: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, + RequireArgs: true, Numeric: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, } AggrFuncP999 = AggrFunc{ Name: valuer.NewString("p999"), FuncName: "quantile(0.999)", - RequireArgs: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, MinArgs: 1, MaxArgs: 1, } AggrFuncP999IF = AggrFunc{ Name: valuer.NewString("p999if"), FuncName: "quantileIf(0.999)", Aliases: []valuer.String{valuer.NewString("p999_if")}, - RequireArgs: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, + RequireArgs: true, Numeric: true, FuncCombinator: true, MinArgs: 2, MaxArgs: 2, } AggrFuncRate = AggrFunc{ Name: valuer.NewString("rate"), @@ -211,22 +212,22 @@ var ( AggrFuncRateSum = AggrFunc{ Name: valuer.NewString("rate_sum"), FuncName: "sum", - RequireArgs: true, Rate: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, Rate: true, MinArgs: 1, MaxArgs: 1, } AggrFuncRateAvg = AggrFunc{ Name: valuer.NewString("rate_avg"), FuncName: "avg", - RequireArgs: true, Rate: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, Rate: true, MinArgs: 1, MaxArgs: 1, } AggrFuncRateMin = AggrFunc{ Name: valuer.NewString("rate_min"), FuncName: "min", - RequireArgs: true, Rate: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, Rate: true, MinArgs: 1, MaxArgs: 1, } AggrFuncRateMax = AggrFunc{ Name: valuer.NewString("rate_max"), FuncName: "max", - RequireArgs: true, Rate: true, MinArgs: 1, MaxArgs: 1, + RequireArgs: true, Numeric: true, Rate: true, MinArgs: 1, MaxArgs: 1, } ) diff --git a/pkg/querybuilder/agg_rewrite.go b/pkg/querybuilder/agg_rewrite.go index 052831d316..1b661a3882 100644 --- a/pkg/querybuilder/agg_rewrite.go +++ b/pkg/querybuilder/agg_rewrite.go @@ -13,33 +13,41 @@ import ( "github.com/huandu/go-sqlbuilder" ) -type AggExprRewriterOptions struct { - MetadataStore telemetrytypes.MetadataStore - FullTextColumn *telemetrytypes.TelemetryFieldKey - FieldMapper qbtypes.FieldMapper - ConditionBuilder qbtypes.ConditionBuilder - FilterCompiler qbtypes.FilterCompiler - JsonBodyPrefix string - JsonKeyToKey qbtypes.JsonKeyToFieldFunc -} - type aggExprRewriter struct { - opts AggExprRewriterOptions + fullTextColumn *telemetrytypes.TelemetryFieldKey + fieldMapper qbtypes.FieldMapper + conditionBuilder qbtypes.ConditionBuilder + jsonBodyPrefix string + jsonKeyToKey qbtypes.JsonKeyToFieldFunc } -func NewAggExprRewriter(opts AggExprRewriterOptions) *aggExprRewriter { - return &aggExprRewriter{opts: opts} +var _ qbtypes.AggExprRewriter = (*aggExprRewriter)(nil) + +func NewAggExprRewriter( + fullTextColumn *telemetrytypes.TelemetryFieldKey, + fieldMapper qbtypes.FieldMapper, + conditionBuilder qbtypes.ConditionBuilder, + jsonBodyPrefix string, + jsonKeyToKey qbtypes.JsonKeyToFieldFunc, +) *aggExprRewriter { + return &aggExprRewriter{ + fullTextColumn: fullTextColumn, + fieldMapper: fieldMapper, + conditionBuilder: conditionBuilder, + jsonBodyPrefix: jsonBodyPrefix, + jsonKeyToKey: jsonKeyToKey, + } } // Rewrite parses the given aggregation expression, maps the column, and condition to // valid data source column and condition expression, and returns the rewritten expression // and the args if the parametric aggregation function is used. -func (r *aggExprRewriter) Rewrite(ctx context.Context, expr string, opts ...qbtypes.RewriteOption) (string, []any, error) { - - rctx := &qbtypes.RewriteCtx{} - for _, opt := range opts { - opt(rctx) - } +func (r *aggExprRewriter) Rewrite( + ctx context.Context, + expr string, + rateInterval uint64, + keys map[string][]*telemetrytypes.TelemetryFieldKey, +) (string, []any, error) { wrapped := fmt.Sprintf("SELECT %s", expr) p := chparser.NewParser(wrapped) @@ -62,19 +70,12 @@ func (r *aggExprRewriter) Rewrite(ctx context.Context, expr string, opts ...qbty return "", nil, errors.NewInternalf(errors.CodeInternal, "no SELECT items for %q", expr) } - selectors := QueryStringToKeysSelectors(expr) - - keys, err := r.opts.MetadataStore.GetKeysMulti(ctx, selectors) - if err != nil { - return "", nil, err - } - visitor := newExprVisitor(keys, - r.opts.FullTextColumn, - r.opts.FieldMapper, - r.opts.ConditionBuilder, - r.opts.JsonBodyPrefix, - r.opts.JsonKeyToKey, + r.fullTextColumn, + r.fieldMapper, + r.conditionBuilder, + r.jsonBodyPrefix, + r.jsonKeyToKey, ) // Rewrite the first select item (our expression) if err := sel.SelectItems[0].Accept(visitor); err != nil { @@ -82,26 +83,23 @@ func (r *aggExprRewriter) Rewrite(ctx context.Context, expr string, opts ...qbty } if visitor.isRate { - return fmt.Sprintf("%s/%d", sel.SelectItems[0].String(), rctx.RateInterval), visitor.chArgs, nil + return fmt.Sprintf("%s/%d", sel.SelectItems[0].String(), rateInterval), visitor.chArgs, nil } return sel.SelectItems[0].String(), visitor.chArgs, nil } -// RewriteMultiple rewrites a slice of expressions. -func (r *aggExprRewriter) RewriteMultiple( +// RewriteMulti rewrites a slice of expressions. +func (r *aggExprRewriter) RewriteMulti( ctx context.Context, exprs []string, - opts ...qbtypes.RewriteOption, + rateInterval uint64, + keys map[string][]*telemetrytypes.TelemetryFieldKey, ) ([]string, [][]any, error) { - rctx := &qbtypes.RewriteCtx{} - for _, opt := range opts { - opt(rctx) - } out := make([]string, len(exprs)) var errs []error var chArgsList [][]any for i, e := range exprs { - w, chArgs, err := r.Rewrite(ctx, e, opts...) + w, chArgs, err := r.Rewrite(ctx, e, rateInterval, keys) if err != nil { errs = append(errs, err) out[i] = e @@ -173,6 +171,11 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error { v.isRate = true } + dataType := telemetrytypes.FieldDataTypeString + if aggFunc.Numeric { + dataType = telemetrytypes.FieldDataTypeFloat64 + } + // Handle *If functions with predicate + values if aggFunc.FuncCombinator { // Map the predicate (last argument) @@ -205,11 +208,13 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error { // Map each value column argument for i := 0; i < len(args)-1; i++ { origVal := args[i].String() - colName, err := v.fieldMapper.ColumnExpressionFor(context.Background(), &telemetrytypes.TelemetryFieldKey{Name: origVal}, v.fieldKeys) + fieldKey := telemetrytypes.GetFieldKeyFromKeyText(origVal) + expr, exprArgs, err := CollisionHandledFinalExpr(context.Background(), &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType) if err != nil { return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "failed to get table field name for %q", origVal) } - newVal := colName + v.chArgs = append(v.chArgs, exprArgs...) + newVal := expr parsedVal, err := parseFragment(newVal) if err != nil { return err @@ -221,11 +226,13 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error { // Non-If functions: map every argument as a column/value for i, arg := range args { orig := arg.String() - colName, err := v.fieldMapper.ColumnExpressionFor(context.Background(), &telemetrytypes.TelemetryFieldKey{Name: orig}, v.fieldKeys) + fieldKey := telemetrytypes.GetFieldKeyFromKeyText(orig) + expr, exprArgs, err := CollisionHandledFinalExpr(context.Background(), &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType) if err != nil { return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "failed to get table field name for %q", orig) } - newCol := colName + v.chArgs = append(v.chArgs, exprArgs...) + newCol := expr parsed, err := parseFragment(newCol) if err != nil { return err diff --git a/pkg/querybuilder/cte.go b/pkg/querybuilder/cte.go new file mode 100644 index 0000000000..e3da7828c2 --- /dev/null +++ b/pkg/querybuilder/cte.go @@ -0,0 +1,27 @@ +package querybuilder + +import ( + "strings" +) + +// combineCTEs takes any number of individual CTE fragments like +// +// "__resource_filter AS (...)", "__limit_cte AS (...)" +// +// and renders the final `WITH …` clause. +func CombineCTEs(ctes []string) string { + if len(ctes) == 0 { + return "" + } + return "WITH " + strings.Join(ctes, ", ") + " " +} + +// prependArgs ensures CTE arguments appear before main-query arguments +// in the final slice so their ordinal positions match the SQL string. +func PrependArgs(cteArgs [][]any, mainArgs []any) []any { + out := make([]any, 0, len(mainArgs)+len(cteArgs)) + for _, a := range cteArgs { // CTEs first, in declaration order + out = append(out, a...) + } + return append(out, mainArgs...) +} diff --git a/pkg/querybuilder/fallback_expr.go b/pkg/querybuilder/fallback_expr.go new file mode 100644 index 0000000000..3001cc3fcd --- /dev/null +++ b/pkg/querybuilder/fallback_expr.go @@ -0,0 +1,96 @@ +package querybuilder + +import ( + "context" + "fmt" + "strings" + + "github.com/SigNoz/signoz/pkg/errors" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/huandu/go-sqlbuilder" + "golang.org/x/exp/maps" +) + +func CollisionHandledFinalExpr( + ctx context.Context, + field *telemetrytypes.TelemetryFieldKey, + fm qbtypes.FieldMapper, + cb qbtypes.ConditionBuilder, + keys map[string][]*telemetrytypes.TelemetryFieldKey, + requiredDataType telemetrytypes.FieldDataType, +) (string, []any, error) { + + if requiredDataType != telemetrytypes.FieldDataTypeString && + requiredDataType != telemetrytypes.FieldDataTypeFloat64 { + return "", nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "unsupported data type %s", requiredDataType) + } + + var dummyValue any + if requiredDataType == telemetrytypes.FieldDataTypeFloat64 { + dummyValue = 0.0 + } else { + dummyValue = "" + } + + var stmts []string + var allArgs []any + + addCondition := func(key *telemetrytypes.TelemetryFieldKey) error { + sb := sqlbuilder.NewSelectBuilder() + condition, err := cb.ConditionFor(ctx, key, qbtypes.FilterOperatorExists, nil, sb) + if err != nil { + return err + } + sb.Where(condition) + + expr, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + expr = strings.TrimPrefix(expr, "WHERE ") + stmts = append(stmts, expr) + allArgs = append(allArgs, args...) + return nil + } + + colName, err := fm.FieldFor(ctx, field) + if errors.Is(err, qbtypes.ErrColumnNotFound) { + // the key didn't have the right context to be added to the query + // we try to use the context we know of + keysForField := keys[field.Name] + if len(keysForField) == 0 { + // - the context is not provided + // - there are not keys for the field + // - it is not a static field + // - the next best thing to do is see if there is a typo + // and suggest a correction + correction, found := telemetrytypes.SuggestCorrection(field.Name, maps.Keys(keys)) + if found { + // we found a close match, in the error message send the suggestion + return "", nil, errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction) + } else { + // not even a close match, return an error + return "", nil, err + } + } else { + for _, key := range keysForField { + err := addCondition(key) + if err != nil { + return "", nil, err + } + colName, _ = fm.FieldFor(ctx, key) + colName, _ = telemetrytypes.DataTypeCollisionHandledFieldName(key, dummyValue, colName) + stmts = append(stmts, colName) + } + } + } else { + err := addCondition(field) + if err != nil { + return "", nil, err + } + colName, _ = telemetrytypes.DataTypeCollisionHandledFieldName(field, dummyValue, colName) + stmts = append(stmts, colName) + } + + multiIfStmt := fmt.Sprintf("multiIf(%s, NULL)", strings.Join(stmts, ", ")) + + return multiIfStmt, allArgs, nil +} diff --git a/pkg/querybuilder/resourcefilter/filter_compiler.go b/pkg/querybuilder/resourcefilter/filter_compiler.go deleted file mode 100644 index bf75b24bf0..0000000000 --- a/pkg/querybuilder/resourcefilter/filter_compiler.go +++ /dev/null @@ -1,47 +0,0 @@ -package resourcefilter - -import ( - "context" - - "github.com/SigNoz/signoz/pkg/querybuilder" - qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" - "github.com/SigNoz/signoz/pkg/types/telemetrytypes" - "github.com/huandu/go-sqlbuilder" -) - -type FilterCompilerOpts struct { - FieldMapper qbtypes.FieldMapper - ConditionBuilder qbtypes.ConditionBuilder - MetadataStore telemetrytypes.MetadataStore -} - -type filterCompiler struct { - opts FilterCompilerOpts -} - -func NewFilterCompiler(opts FilterCompilerOpts) *filterCompiler { - return &filterCompiler{ - opts: opts, - } -} - -func (c *filterCompiler) Compile(ctx context.Context, expr string) (*sqlbuilder.WhereClause, []string, error) { - selectors := querybuilder.QueryStringToKeysSelectors(expr) - - keys, err := c.opts.MetadataStore.GetKeysMulti(ctx, selectors) - if err != nil { - return nil, nil, err - } - - filterWhereClause, warnings, err := querybuilder.PrepareWhereClause(expr, querybuilder.FilterExprVisitorOpts{ - FieldMapper: c.opts.FieldMapper, - ConditionBuilder: c.opts.ConditionBuilder, - FieldKeys: keys, - }) - - if err != nil { - return nil, nil, err - } - - return filterWhereClause, warnings, nil -} diff --git a/pkg/querybuilder/resourcefilter/statement_builder.go b/pkg/querybuilder/resourcefilter/statement_builder.go index f4afcb2e6e..559c552f66 100644 --- a/pkg/querybuilder/resourcefilter/statement_builder.go +++ b/pkg/querybuilder/resourcefilter/statement_builder.go @@ -5,17 +5,12 @@ import ( "fmt" "github.com/SigNoz/signoz/pkg/errors" + "github.com/SigNoz/signoz/pkg/querybuilder" qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" "github.com/SigNoz/signoz/pkg/types/telemetrytypes" "github.com/huandu/go-sqlbuilder" ) -type ResourceFilterStatementBuilderOpts struct { - FieldMapper qbtypes.FieldMapper - ConditionBuilder qbtypes.ConditionBuilder - Compiler qbtypes.FilterCompiler -} - var ( ErrUnsupportedSignal = errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported signal type") ) @@ -39,8 +34,10 @@ var signalConfigs = map[telemetrytypes.Signal]signalConfig{ // Generic resource filter statement builder type resourceFilterStatementBuilder[T any] struct { - opts ResourceFilterStatementBuilderOpts - signal telemetrytypes.Signal + fieldMapper qbtypes.FieldMapper + conditionBuilder qbtypes.ConditionBuilder + metadataStore telemetrytypes.MetadataStore + signal telemetrytypes.Signal } // Ensure interface compliance at compile time @@ -50,20 +47,47 @@ var ( ) // Constructor functions -func NewTraceResourceFilterStatementBuilder(opts ResourceFilterStatementBuilderOpts) *resourceFilterStatementBuilder[qbtypes.TraceAggregation] { +func NewTraceResourceFilterStatementBuilder( + fieldMapper qbtypes.FieldMapper, + conditionBuilder qbtypes.ConditionBuilder, + metadataStore telemetrytypes.MetadataStore, +) *resourceFilterStatementBuilder[qbtypes.TraceAggregation] { return &resourceFilterStatementBuilder[qbtypes.TraceAggregation]{ - opts: opts, - signal: telemetrytypes.SignalTraces, + fieldMapper: fieldMapper, + conditionBuilder: conditionBuilder, + metadataStore: metadataStore, + signal: telemetrytypes.SignalTraces, } } -func NewLogResourceFilterStatementBuilder(opts ResourceFilterStatementBuilderOpts) *resourceFilterStatementBuilder[qbtypes.LogAggregation] { +func NewLogResourceFilterStatementBuilder( + fieldMapper qbtypes.FieldMapper, + conditionBuilder qbtypes.ConditionBuilder, + metadataStore telemetrytypes.MetadataStore, +) *resourceFilterStatementBuilder[qbtypes.LogAggregation] { return &resourceFilterStatementBuilder[qbtypes.LogAggregation]{ - opts: opts, - signal: telemetrytypes.SignalLogs, + fieldMapper: fieldMapper, + conditionBuilder: conditionBuilder, + metadataStore: metadataStore, + signal: telemetrytypes.SignalLogs, } } +func (b *resourceFilterStatementBuilder[T]) getKeySelectors(query qbtypes.QueryBuilderQuery[T]) []*telemetrytypes.FieldKeySelector { + var keySelectors []*telemetrytypes.FieldKeySelector + + if query.Filter != nil && query.Filter.Expression != "" { + whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression) + keySelectors = append(keySelectors, whereClauseSelectors...) + } + + for idx := range keySelectors { + keySelectors[idx].Signal = b.signal + } + + return keySelectors +} + // Build builds a SQL query based on the given parameters func (b *resourceFilterStatementBuilder[T]) Build( ctx context.Context, @@ -77,15 +101,21 @@ func (b *resourceFilterStatementBuilder[T]) Build( return nil, fmt.Errorf("%w: %s", ErrUnsupportedSignal, b.signal) } - q := sqlbuilder.ClickHouse.NewSelectBuilder() + q := sqlbuilder.NewSelectBuilder() q.Select("fingerprint") q.From(fmt.Sprintf("%s.%s", config.dbName, config.tableName)) - if err := b.addConditions(ctx, q, start, end, query); err != nil { + keySelectors := b.getKeySelectors(query) + keys, err := b.metadataStore.GetKeysMulti(ctx, keySelectors) + if err != nil { return nil, err } - stmt, args := q.Build() + if err := b.addConditions(ctx, q, start, end, query, keys); err != nil { + return nil, err + } + + stmt, args := q.BuildWithFlavor(sqlbuilder.ClickHouse) return &qbtypes.Statement{ Query: stmt, Args: args, @@ -94,14 +124,22 @@ func (b *resourceFilterStatementBuilder[T]) Build( // addConditions adds both filter and time conditions to the query func (b *resourceFilterStatementBuilder[T]) addConditions( - ctx context.Context, + _ context.Context, sb *sqlbuilder.SelectBuilder, start, end uint64, query qbtypes.QueryBuilderQuery[T], + keys map[string][]*telemetrytypes.TelemetryFieldKey, ) error { // Add filter condition if present if query.Filter != nil && query.Filter.Expression != "" { - filterWhereClause, _, err := b.opts.Compiler.Compile(ctx, query.Filter.Expression) + + // warnings would be encountered as part of the main condition already + filterWhereClause, _, err := querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{ + FieldMapper: b.fieldMapper, + ConditionBuilder: b.conditionBuilder, + FieldKeys: keys, + }) + if err != nil { return err } @@ -118,13 +156,9 @@ func (b *resourceFilterStatementBuilder[T]) addConditions( // addTimeFilter adds time-based filtering conditions func (b *resourceFilterStatementBuilder[T]) addTimeFilter(sb *sqlbuilder.SelectBuilder, start, end uint64) { // Convert nanoseconds to seconds and adjust start bucket - const ( - nsToSeconds = 1000000000 - bucketAdjustment = 1800 // 30 minutes - ) - startBucket := start/nsToSeconds - bucketAdjustment - endBucket := end / nsToSeconds + startBucket := start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment + endBucket := end / querybuilder.NsToSeconds sb.Where( sb.GE("seen_at_ts_bucket_start", startBucket), diff --git a/pkg/querybuilder/time.go b/pkg/querybuilder/time.go index 4c9fe46f04..02293da81e 100644 --- a/pkg/querybuilder/time.go +++ b/pkg/querybuilder/time.go @@ -2,6 +2,11 @@ package querybuilder import "math" +const ( + NsToSeconds = 1000000000 + BucketAdjustment = 1800 // 30 minutes +) + // ToNanoSecs takes epoch and returns it in ns func ToNanoSecs(epoch uint64) uint64 { temp := epoch diff --git a/pkg/telemetrylogs/condition_builder.go b/pkg/telemetrylogs/condition_builder.go index da171ca51f..d3329f81df 100644 --- a/pkg/telemetrylogs/condition_builder.go +++ b/pkg/telemetrylogs/condition_builder.go @@ -147,6 +147,11 @@ func (c *conditionBuilder) conditionFor( } } + // if the field is intrinsic, it always exists + if slices.Contains(IntrinsicFields, key.Name) { + return "true", nil + } + var value any switch column.Type { case schema.ColumnTypeString, schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}: diff --git a/pkg/telemetrylogs/condition_builder_test.go b/pkg/telemetrylogs/condition_builder_test.go index 45f61b2d03..f3519969ba 100644 --- a/pkg/telemetrylogs/condition_builder_test.go +++ b/pkg/telemetrylogs/condition_builder_test.go @@ -249,8 +249,7 @@ func TestConditionFor(t *testing.T) { }, operator: qbtypes.FilterOperatorExists, value: nil, - expectedSQL: "body <> ?", - expectedArgs: []any{""}, + expectedSQL: "true", expectedError: nil, }, { @@ -261,8 +260,7 @@ func TestConditionFor(t *testing.T) { }, operator: qbtypes.FilterOperatorNotExists, value: nil, - expectedSQL: "body = ?", - expectedArgs: []any{""}, + expectedSQL: "true", expectedError: nil, }, { @@ -273,8 +271,7 @@ func TestConditionFor(t *testing.T) { }, operator: qbtypes.FilterOperatorExists, value: nil, - expectedSQL: "timestamp <> ?", - expectedArgs: []any{0}, + expectedSQL: "true", expectedError: nil, }, { diff --git a/pkg/telemetrylogs/statement_builder.go b/pkg/telemetrylogs/statement_builder.go index 1806624a63..a0bdea7978 100644 --- a/pkg/telemetrylogs/statement_builder.go +++ b/pkg/telemetrylogs/statement_builder.go @@ -3,6 +3,7 @@ package telemetrylogs import ( "context" "fmt" + "log/slog" "strings" "github.com/SigNoz/signoz/pkg/errors" @@ -16,32 +17,32 @@ var ( ErrUnsupportedAggregation = errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported aggregation") ) -type LogQueryStatementBuilderOpts struct { - MetadataStore telemetrytypes.MetadataStore - FieldMapper qbtypes.FieldMapper - ConditionBuilder qbtypes.ConditionBuilder - ResourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation] - Compiler qbtypes.FilterCompiler - AggExprRewriter qbtypes.AggExprRewriter -} - type logQueryStatementBuilder struct { - opts LogQueryStatementBuilderOpts - fm qbtypes.FieldMapper - cb qbtypes.ConditionBuilder - compiler qbtypes.FilterCompiler - aggExprRewriter qbtypes.AggExprRewriter + logger *slog.Logger + metadataStore telemetrytypes.MetadataStore + fm qbtypes.FieldMapper + cb qbtypes.ConditionBuilder + resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation] + aggExprRewriter qbtypes.AggExprRewriter } var _ qbtypes.StatementBuilder[qbtypes.LogAggregation] = (*logQueryStatementBuilder)(nil) -func NewLogQueryStatementBuilder(opts LogQueryStatementBuilderOpts) *logQueryStatementBuilder { +func NewLogQueryStatementBuilder( + logger *slog.Logger, + metadataStore telemetrytypes.MetadataStore, + fieldMapper qbtypes.FieldMapper, + conditionBuilder qbtypes.ConditionBuilder, + resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation], + aggExprRewriter qbtypes.AggExprRewriter, +) *logQueryStatementBuilder { return &logQueryStatementBuilder{ - opts: opts, - fm: opts.FieldMapper, - cb: opts.ConditionBuilder, - compiler: opts.Compiler, - aggExprRewriter: opts.AggExprRewriter, + logger: logger, + metadataStore: metadataStore, + fm: fieldMapper, + cb: conditionBuilder, + resourceFilterStmtBuilder: resourceFilterStmtBuilder, + aggExprRewriter: aggExprRewriter, } } @@ -58,13 +59,13 @@ func (b *logQueryStatementBuilder) Build( end = querybuilder.ToNanoSecs(end) keySelectors := getKeySelectors(query) - keys, err := b.opts.MetadataStore.GetKeysMulti(ctx, keySelectors) + keys, err := b.metadataStore.GetKeysMulti(ctx, keySelectors) if err != nil { return nil, err } // Create SQL builder - q := sqlbuilder.ClickHouse.NewSelectBuilder() + q := sqlbuilder.NewSelectBuilder() switch requestType { case qbtypes.RequestTypeRaw: @@ -87,8 +88,29 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) [] keySelectors = append(keySelectors, selectors...) } - whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression) - keySelectors = append(keySelectors, whereClauseSelectors...) + if query.Filter != nil && query.Filter.Expression != "" { + whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression) + keySelectors = append(keySelectors, whereClauseSelectors...) + } + + for idx := range query.GroupBy { + groupBy := query.GroupBy[idx] + selectors := querybuilder.QueryStringToKeysSelectors(groupBy.TelemetryFieldKey.Name) + keySelectors = append(keySelectors, selectors...) + } + + for idx := range query.Order { + keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{ + Name: query.Order[idx].Key.Name, + Signal: telemetrytypes.SignalTraces, + FieldContext: query.Order[idx].Key.FieldContext, + FieldDataType: query.Order[idx].Key.FieldDataType, + }) + } + + for idx := range keySelectors { + keySelectors[idx].Signal = telemetrytypes.SignalLogs + } return keySelectors } @@ -99,7 +121,7 @@ func (b *logQueryStatementBuilder) buildListQuery( sb *sqlbuilder.SelectBuilder, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], start, end uint64, - _ map[string][]*telemetrytypes.TelemetryFieldKey, + keys map[string][]*telemetrytypes.TelemetryFieldKey, ) (*qbtypes.Statement, error) { var ( @@ -123,7 +145,7 @@ func (b *logQueryStatementBuilder) buildListQuery( sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName)) // Add filter conditions - warnings, err := b.addFilterCondition(ctx, sb, start, end, query) + warnings, err := b.addFilterCondition(ctx, sb, start, end, query, keys) if err != nil { return nil, err } @@ -144,8 +166,8 @@ func (b *logQueryStatementBuilder) buildListQuery( mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse) - finalSQL := combineCTEs(cteFragments) + mainSQL - finalArgs := prependArgs(cteArgs, mainArgs) + finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL + finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs) return &qbtypes.Statement{ Query: finalSQL, @@ -179,21 +201,29 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery( int64(query.StepInterval.Seconds()), )) + var allGroupByArgs []any + // Keep original column expressions so we can build the tuple fieldNames := make([]string, 0, len(query.GroupBy)) for _, gb := range query.GroupBy { - colExpr, err := b.fm.ColumnExpressionFor(ctx, &gb.TelemetryFieldKey, keys) + expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString) if err != nil { return nil, err } - sb.SelectMore(colExpr) + colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name) + allGroupByArgs = append(allGroupByArgs, args...) + sb.SelectMore(sqlbuilder.Escape(colExpr)) fieldNames = append(fieldNames, fmt.Sprintf("`%s`", gb.TelemetryFieldKey.Name)) } // Aggregations allAggChArgs := make([]any, 0) for i, agg := range query.Aggregations { - rewritten, chArgs, err := b.aggExprRewriter.Rewrite(ctx, agg.Expression) + rewritten, chArgs, err := b.aggExprRewriter.Rewrite( + ctx, agg.Expression, + uint64(query.StepInterval.Seconds()), + keys, + ) if err != nil { return nil, err } @@ -202,7 +232,7 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery( } sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName)) - warnings, err := b.addFilterCondition(ctx, sb, start, end, query) + warnings, err := b.addFilterCondition(ctx, sb, start, end, query, keys) if err != nil { return nil, err } @@ -212,7 +242,7 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery( if query.Limit > 0 { // build the scalar “top/bottom-N” query in its own builder. - cteSB := sqlbuilder.ClickHouse.NewSelectBuilder() + cteSB := sqlbuilder.NewSelectBuilder() cteStmt, err := b.buildScalarQuery(ctx, cteSB, query, start, end, keys, true) if err != nil { return nil, err @@ -231,11 +261,13 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery( sb.Having(query.Having.Expression) } - mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...) + combinedArgs := append(allGroupByArgs, allAggChArgs...) + + mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...) // Stitch it all together: WITH … SELECT … - finalSQL = combineCTEs(cteFragments) + mainSQL - finalArgs = prependArgs(cteArgs, mainArgs) + finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL + finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs) } else { sb.GroupBy("ALL") @@ -243,11 +275,13 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery( sb.Having(query.Having.Expression) } - mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...) + combinedArgs := append(allGroupByArgs, allAggChArgs...) + + mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...) // Stitch it all together: WITH … SELECT … - finalSQL = combineCTEs(cteFragments) + mainSQL - finalArgs = prependArgs(cteArgs, mainArgs) + finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL + finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs) } return &qbtypes.Statement{ @@ -281,20 +315,30 @@ func (b *logQueryStatementBuilder) buildScalarQuery( allAggChArgs := []any{} - // Add group by columns - for _, groupBy := range query.GroupBy { - colExpr, err := b.fm.ColumnExpressionFor(ctx, &groupBy.TelemetryFieldKey, keys) + var allGroupByArgs []any + + for _, gb := range query.GroupBy { + expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString) if err != nil { return nil, err } - sb.SelectMore(colExpr) + colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name) + allGroupByArgs = append(allGroupByArgs, args...) + sb.SelectMore(sqlbuilder.Escape(colExpr)) } + // for scalar queries, the rate would be end-start + rateInterval := (end - start) / querybuilder.NsToSeconds + // Add aggregation if len(query.Aggregations) > 0 { for idx := range query.Aggregations { aggExpr := query.Aggregations[idx] - rewritten, chArgs, err := b.aggExprRewriter.Rewrite(ctx, aggExpr.Expression) + rewritten, chArgs, err := b.aggExprRewriter.Rewrite( + ctx, aggExpr.Expression, + rateInterval, + keys, + ) if err != nil { return nil, err } @@ -307,7 +351,7 @@ func (b *logQueryStatementBuilder) buildScalarQuery( sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName)) // Add filter conditions - warnings, err := b.addFilterCondition(ctx, sb, start, end, query) + warnings, err := b.addFilterCondition(ctx, sb, start, end, query, keys) if err != nil { return nil, err } @@ -340,10 +384,12 @@ func (b *logQueryStatementBuilder) buildScalarQuery( sb.Limit(query.Limit) } - mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...) + combinedArgs := append(allGroupByArgs, allAggChArgs...) - finalSQL := combineCTEs(cteFragments) + mainSQL - finalArgs := prependArgs(cteArgs, mainArgs) + mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...) + + finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL + finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs) return &qbtypes.Statement{ Query: finalSQL, @@ -353,11 +399,21 @@ func (b *logQueryStatementBuilder) buildScalarQuery( } // buildFilterCondition builds SQL condition from filter expression -func (b *logQueryStatementBuilder) addFilterCondition(ctx context.Context, sb *sqlbuilder.SelectBuilder, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) ([]string, error) { +func (b *logQueryStatementBuilder) addFilterCondition( + _ context.Context, + sb *sqlbuilder.SelectBuilder, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], + keys map[string][]*telemetrytypes.TelemetryFieldKey, +) ([]string, error) { // add filter expression - - filterWhereClause, warnings, err := b.compiler.Compile(ctx, query.Filter.Expression) + filterWhereClause, warnings, err := querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{ + FieldMapper: b.fm, + ConditionBuilder: b.cb, + FieldKeys: keys, + SkipResourceFilter: true, + }) if err != nil { return nil, err @@ -368,36 +424,14 @@ func (b *logQueryStatementBuilder) addFilterCondition(ctx context.Context, sb *s } // add time filter - startBucket := start/1000000000 - 1800 - endBucket := end / 1000000000 + startBucket := start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment + endBucket := end / querybuilder.NsToSeconds - sb.Where(sb.GE("timestamp", start), sb.LE("timestamp", end), sb.GE("ts_bucket_start", startBucket), sb.LE("ts_bucket_start", endBucket)) + sb.Where(sb.GE("timestamp", fmt.Sprintf("%d", start)), sb.LE("timestamp", fmt.Sprintf("%d", end)), sb.GE("ts_bucket_start", startBucket), sb.LE("ts_bucket_start", endBucket)) return warnings, nil } -// combineCTEs takes any number of individual CTE fragments like -// -// "__resource_filter AS (...)", "__limit_cte AS (...)" -// -// and renders the final `WITH …` clause. -func combineCTEs(ctes []string) string { - if len(ctes) == 0 { - return "" - } - return "WITH " + strings.Join(ctes, ", ") + " " -} - -// prependArgs ensures CTE arguments appear before main-query arguments -// in the final slice so their ordinal positions match the SQL string. -func prependArgs(cteArgs [][]any, mainArgs []any) []any { - out := make([]any, 0, len(mainArgs)+len(cteArgs)) - for _, a := range cteArgs { // CTEs first, in declaration order - out = append(out, a...) - } - return append(out, mainArgs...) -} - func aggOrderBy(k qbtypes.OrderBy, q qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) (int, bool) { for i, agg := range q.Aggregations { if k.Key.Name == agg.Alias || @@ -432,7 +466,7 @@ func (b *logQueryStatementBuilder) buildResourceFilterCTE( start, end uint64, ) (*qbtypes.Statement, error) { - return b.opts.ResourceFilterStmtBuilder.Build( + return b.resourceFilterStmtBuilder.Build( ctx, start, end, diff --git a/pkg/telemetrylogs/stmt_builder_test.go b/pkg/telemetrylogs/stmt_builder_test.go index 418b2ce529..11ec03a397 100644 --- a/pkg/telemetrylogs/stmt_builder_test.go +++ b/pkg/telemetrylogs/stmt_builder_test.go @@ -2,6 +2,7 @@ package telemetrylogs import ( "context" + "log/slog" "testing" "time" @@ -17,18 +18,19 @@ func resourceFilterStmtBuilder() (qbtypes.StatementBuilder[qbtypes.LogAggregatio fm := resourcefilter.NewFieldMapper() cb := resourcefilter.NewConditionBuilder(fm) mockMetadataStore := telemetrytypestest.NewMockMetadataStore() - mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() - compiler := resourcefilter.NewFilterCompiler(resourcefilter.FilterCompilerOpts{ - FieldMapper: fm, - ConditionBuilder: cb, - MetadataStore: mockMetadataStore, - }) + keysMap := buildCompleteFieldKeyMap() + for _, keys := range keysMap { + for _, key := range keys { + key.Signal = telemetrytypes.SignalLogs + } + } + mockMetadataStore.KeysMap = keysMap - return resourcefilter.NewLogResourceFilterStatementBuilder(resourcefilter.ResourceFilterStatementBuilderOpts{ - FieldMapper: fm, - ConditionBuilder: cb, - Compiler: compiler, - }), nil + return resourcefilter.NewLogResourceFilterStatementBuilder( + fm, + cb, + mockMetadataStore, + ), nil } func TestStatementBuilder(t *testing.T) { @@ -63,8 +65,8 @@ func TestStatementBuilder(t *testing.T) { }, }, expected: qbtypes.Statement{ - Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT resources_string['service.name'] AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, resources_string['service.name'] AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL", - Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), uint64(1747947419000000000), uint64(1747983448000000000), uint64(1747945619), uint64(1747983448), 10, uint64(1747947419000000000), uint64(1747983448000000000), uint64(1747945619), uint64(1747983448)}, + Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL", + Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)}, }, expectedErr: nil, }, @@ -74,29 +76,20 @@ func TestStatementBuilder(t *testing.T) { cb := NewConditionBuilder(fm) mockMetadataStore := telemetrytypestest.NewMockMetadataStore() mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() - compiler := NewFilterCompiler(FilterCompilerOpts{ - FieldMapper: fm, - ConditionBuilder: cb, - MetadataStore: mockMetadataStore, - SkipResourceFilter: true, - }) - aggExprRewriter := querybuilder.NewAggExprRewriter(querybuilder.AggExprRewriterOptions{ - FieldMapper: fm, - ConditionBuilder: cb, - MetadataStore: mockMetadataStore, - }) + + aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil) resourceFilterStmtBuilder, err := resourceFilterStmtBuilder() require.NoError(t, err) - statementBuilder := NewLogQueryStatementBuilder(LogQueryStatementBuilderOpts{ - FieldMapper: fm, - ConditionBuilder: cb, - Compiler: compiler, - MetadataStore: mockMetadataStore, - AggExprRewriter: aggExprRewriter, - ResourceFilterStmtBuilder: resourceFilterStmtBuilder, - }) + statementBuilder := NewLogQueryStatementBuilder( + slog.Default(), + mockMetadataStore, + fm, + cb, + resourceFilterStmtBuilder, + aggExprRewriter, + ) for _, c := range cases { t.Run(c.name, func(t *testing.T) { diff --git a/pkg/telemetrylogs/test_data.go b/pkg/telemetrylogs/test_data.go index 3e728f291e..f8c9dd0d87 100644 --- a/pkg/telemetrylogs/test_data.go +++ b/pkg/telemetrylogs/test_data.go @@ -19,7 +19,7 @@ func limitString(s string, maxLen int) string { // Function to build a complete field key map for testing all scenarios func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey { - return map[string][]*telemetrytypes.TelemetryFieldKey{ + keysMap := map[string][]*telemetrytypes.TelemetryFieldKey{ "service.name": { { Name: "service.name", @@ -856,4 +856,11 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey { }, }, } + + for _, keys := range keysMap { + for _, key := range keys { + key.Signal = telemetrytypes.SignalLogs + } + } + return keysMap } diff --git a/pkg/telemetrymetadata/metadata.go b/pkg/telemetrymetadata/metadata.go index 7459ebdb56..80e195ba9b 100644 --- a/pkg/telemetrymetadata/metadata.go +++ b/pkg/telemetrymetadata/metadata.go @@ -6,6 +6,7 @@ import ( "log/slog" "github.com/SigNoz/signoz/pkg/errors" + "github.com/SigNoz/signoz/pkg/querybuilder" "github.com/SigNoz/signoz/pkg/telemetrystore" qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" "github.com/SigNoz/signoz/pkg/types/telemetrytypes" @@ -36,7 +37,6 @@ type telemetryMetaStore struct { fm qbtypes.FieldMapper conditionBuilder qbtypes.ConditionBuilder - compiler qbtypes.FilterCompiler } func NewTelemetryMetaStore( @@ -563,7 +563,20 @@ func (t *telemetryMetaStore) getRelatedValues(ctx context.Context, fieldValueSel sb := sqlbuilder.Select("DISTINCT " + selectColumn).From(t.relatedMetadataDBName + "." + t.relatedMetadataTblName) if len(fieldValueSelector.ExistingQuery) != 0 { - whereClause, _, err := t.compiler.Compile(ctx, fieldValueSelector.ExistingQuery) + keySelectors := querybuilder.QueryStringToKeysSelectors(fieldValueSelector.ExistingQuery) + for _, keySelector := range keySelectors { + keySelector.Signal = fieldValueSelector.Signal + } + keys, err := t.GetKeysMulti(ctx, keySelectors) + if err != nil { + return nil, err + } + + whereClause, _, err := querybuilder.PrepareWhereClause(fieldValueSelector.ExistingQuery, querybuilder.FilterExprVisitorOpts{ + FieldMapper: t.fm, + ConditionBuilder: t.conditionBuilder, + FieldKeys: keys, + }) if err == nil { sb.AddWhereClause(whereClause) } else { diff --git a/pkg/telemetrytraces/condition_builder.go b/pkg/telemetrytraces/condition_builder.go index eacb3d24ae..ebe1e6d1f1 100644 --- a/pkg/telemetrytraces/condition_builder.go +++ b/pkg/telemetrytraces/condition_builder.go @@ -126,6 +126,11 @@ func (c *conditionBuilder) conditionFor( // in the query builder, `exists` and `not exists` are used for // key membership checks, so depending on the column type, the condition changes case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists: + // if the field is intrinsic, it always exists + if slices.Contains(IntrinsicFields, tblFieldName) || slices.Contains(CalculatedFields, tblFieldName) { + return "true", nil + } + var value any switch column.Type { case schema.ColumnTypeString, diff --git a/pkg/telemetrytraces/field_mapper.go b/pkg/telemetrytraces/field_mapper.go index e7630ce9c2..7229f9b02a 100644 --- a/pkg/telemetrytraces/field_mapper.go +++ b/pkg/telemetrytraces/field_mapper.go @@ -149,7 +149,7 @@ func (m *defaultFieldMapper) getColumn( case telemetrytypes.FieldDataTypeBool: return indexV3Columns["attributes_bool"], nil } - case telemetrytypes.FieldContextSpan: + case telemetrytypes.FieldContextSpan, telemetrytypes.FieldContextUnspecified: if col, ok := indexV3Columns[key.Name]; ok { return col, nil } diff --git a/pkg/telemetrytraces/filter_compiler.go b/pkg/telemetrytraces/filter_compiler.go deleted file mode 100644 index a91e231264..0000000000 --- a/pkg/telemetrytraces/filter_compiler.go +++ /dev/null @@ -1,55 +0,0 @@ -package telemetrytraces - -import ( - "context" - - "github.com/SigNoz/signoz/pkg/querybuilder" - qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" - "github.com/SigNoz/signoz/pkg/types/telemetrytypes" - "github.com/huandu/go-sqlbuilder" -) - -type FilterCompilerOpts struct { - FieldMapper qbtypes.FieldMapper - ConditionBuilder qbtypes.ConditionBuilder - MetadataStore telemetrytypes.MetadataStore - FullTextColumn *telemetrytypes.TelemetryFieldKey - JsonBodyPrefix string - JsonKeyToKey qbtypes.JsonKeyToFieldFunc - SkipResourceFilter bool -} - -type filterCompiler struct { - opts FilterCompilerOpts -} - -func NewFilterCompiler(opts FilterCompilerOpts) *filterCompiler { - return &filterCompiler{ - opts: opts, - } -} - -func (c *filterCompiler) Compile(ctx context.Context, expr string) (*sqlbuilder.WhereClause, []string, error) { - selectors := querybuilder.QueryStringToKeysSelectors(expr) - - keys, err := c.opts.MetadataStore.GetKeysMulti(ctx, selectors) - if err != nil { - return nil, nil, err - } - - filterWhereClause, warnings, err := querybuilder.PrepareWhereClause(expr, querybuilder.FilterExprVisitorOpts{ - FieldMapper: c.opts.FieldMapper, - ConditionBuilder: c.opts.ConditionBuilder, - FieldKeys: keys, - FullTextColumn: c.opts.FullTextColumn, - JsonBodyPrefix: c.opts.JsonBodyPrefix, - JsonKeyToKey: c.opts.JsonKeyToKey, - SkipResourceFilter: c.opts.SkipResourceFilter, - }) - - if err != nil { - return nil, nil, err - } - - return filterWhereClause, warnings, nil -} diff --git a/pkg/telemetrytraces/statement_builder.go b/pkg/telemetrytraces/statement_builder.go index c6313f820c..0d7d05cbe2 100644 --- a/pkg/telemetrytraces/statement_builder.go +++ b/pkg/telemetrytraces/statement_builder.go @@ -3,6 +3,7 @@ package telemetrytraces import ( "context" "fmt" + "log/slog" "strings" "github.com/SigNoz/signoz/pkg/errors" @@ -16,32 +17,32 @@ var ( ErrUnsupportedAggregation = errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported aggregation") ) -type TraceQueryStatementBuilderOpts struct { - MetadataStore telemetrytypes.MetadataStore - FieldMapper qbtypes.FieldMapper - ConditionBuilder qbtypes.ConditionBuilder - ResourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation] - Compiler qbtypes.FilterCompiler - AggExprRewriter qbtypes.AggExprRewriter -} - type traceQueryStatementBuilder struct { - opts TraceQueryStatementBuilderOpts - fm qbtypes.FieldMapper - cb qbtypes.ConditionBuilder - compiler qbtypes.FilterCompiler - aggExprRewriter qbtypes.AggExprRewriter + logger *slog.Logger + metadataStore telemetrytypes.MetadataStore + fm qbtypes.FieldMapper + cb qbtypes.ConditionBuilder + resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation] + aggExprRewriter qbtypes.AggExprRewriter } var _ qbtypes.StatementBuilder[qbtypes.TraceAggregation] = (*traceQueryStatementBuilder)(nil) -func NewTraceQueryStatementBuilder(opts TraceQueryStatementBuilderOpts) *traceQueryStatementBuilder { +func NewTraceQueryStatementBuilder( + logger *slog.Logger, + metadataStore telemetrytypes.MetadataStore, + fieldMapper qbtypes.FieldMapper, + conditionBuilder qbtypes.ConditionBuilder, + resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation], + aggExprRewriter qbtypes.AggExprRewriter, +) *traceQueryStatementBuilder { return &traceQueryStatementBuilder{ - opts: opts, - fm: opts.FieldMapper, - cb: opts.ConditionBuilder, - compiler: opts.Compiler, - aggExprRewriter: opts.AggExprRewriter, + logger: logger, + metadataStore: metadataStore, + fm: fieldMapper, + cb: conditionBuilder, + resourceFilterStmtBuilder: resourceFilterStmtBuilder, + aggExprRewriter: aggExprRewriter, } } @@ -58,13 +59,14 @@ func (b *traceQueryStatementBuilder) Build( end = querybuilder.ToNanoSecs(end) keySelectors := getKeySelectors(query) - keys, err := b.opts.MetadataStore.GetKeysMulti(ctx, keySelectors) + + keys, err := b.metadataStore.GetKeysMulti(ctx, keySelectors) if err != nil { return nil, err } // Create SQL builder - q := sqlbuilder.ClickHouse.NewSelectBuilder() + q := sqlbuilder.NewSelectBuilder() switch requestType { case qbtypes.RequestTypeRaw: @@ -87,8 +89,38 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]) keySelectors = append(keySelectors, selectors...) } - whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression) - keySelectors = append(keySelectors, whereClauseSelectors...) + if query.Filter != nil && query.Filter.Expression != "" { + whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression) + keySelectors = append(keySelectors, whereClauseSelectors...) + } + + for idx := range query.GroupBy { + groupBy := query.GroupBy[idx] + selectors := querybuilder.QueryStringToKeysSelectors(groupBy.TelemetryFieldKey.Name) + keySelectors = append(keySelectors, selectors...) + } + + for idx := range query.SelectFields { + keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{ + Name: query.SelectFields[idx].Name, + Signal: telemetrytypes.SignalTraces, + FieldContext: query.SelectFields[idx].FieldContext, + FieldDataType: query.SelectFields[idx].FieldDataType, + }) + } + + for idx := range query.Order { + keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{ + Name: query.Order[idx].Key.Name, + Signal: telemetrytypes.SignalTraces, + FieldContext: query.Order[idx].Key.FieldContext, + FieldDataType: query.Order[idx].Key.FieldDataType, + }) + } + + for idx := range keySelectors { + keySelectors[idx].Signal = telemetrytypes.SignalTraces + } return keySelectors } @@ -120,31 +152,32 @@ func (b *traceQueryStatementBuilder) buildListQuery( "trace_id", "span_id", "name", - "resource_string_service$$name", + sqlbuilder.Escape("resource_string_service$$name"), "duration_nano", "response_status_code", ) + // TODO: should we deprecate `SelectFields` and return everything from a span like we do for logs? for _, field := range query.SelectFields { colExpr, err := b.fm.ColumnExpressionFor(ctx, &field, keys) if err != nil { return nil, err } - sb.SelectMore(colExpr) + sb.SelectMore(sqlbuilder.Escape(colExpr)) } // From table sb.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName)) // Add filter conditions - warnings, err := b.addFilterCondition(ctx, sb, start, end, query) + warnings, err := b.addFilterCondition(ctx, sb, start, end, query, keys) if err != nil { return nil, err } // Add order by for _, orderBy := range query.Order { - sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction)) + sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue())) } // Add limit and offset @@ -158,8 +191,8 @@ func (b *traceQueryStatementBuilder) buildListQuery( mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse) - finalSQL := combineCTEs(cteFragments) + mainSQL - finalArgs := prependArgs(cteArgs, mainArgs) + finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL + finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs) return &qbtypes.Statement{ Query: finalSQL, @@ -193,21 +226,29 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery( int64(query.StepInterval.Seconds()), )) + var allGroupByArgs []any + // Keep original column expressions so we can build the tuple fieldNames := make([]string, 0, len(query.GroupBy)) for _, gb := range query.GroupBy { - colExpr, err := b.fm.ColumnExpressionFor(ctx, &gb.TelemetryFieldKey, keys) + expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString) if err != nil { return nil, err } - sb.SelectMore(colExpr) + colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name) + allGroupByArgs = append(allGroupByArgs, args...) + sb.SelectMore(sqlbuilder.Escape(colExpr)) fieldNames = append(fieldNames, fmt.Sprintf("`%s`", gb.TelemetryFieldKey.Name)) } // Aggregations allAggChArgs := make([]any, 0) for i, agg := range query.Aggregations { - rewritten, chArgs, err := b.aggExprRewriter.Rewrite(ctx, agg.Expression) + rewritten, chArgs, err := b.aggExprRewriter.Rewrite( + ctx, agg.Expression, + uint64(query.StepInterval.Seconds()), + keys, + ) if err != nil { return nil, err } @@ -216,7 +257,7 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery( } sb.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName)) - warnings, err := b.addFilterCondition(ctx, sb, start, end, query) + warnings, err := b.addFilterCondition(ctx, sb, start, end, query, keys) if err != nil { return nil, err } @@ -226,7 +267,7 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery( if query.Limit > 0 { // build the scalar “top/bottom-N” query in its own builder. - cteSB := sqlbuilder.ClickHouse.NewSelectBuilder() + cteSB := sqlbuilder.NewSelectBuilder() cteStmt, err := b.buildScalarQuery(ctx, cteSB, query, start, end, keys, true) if err != nil { return nil, err @@ -245,11 +286,12 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery( sb.Having(query.Having.Expression) } - mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...) + combinedArgs := append(allGroupByArgs, allAggChArgs...) + mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...) // Stitch it all together: WITH … SELECT … - finalSQL = combineCTEs(cteFragments) + mainSQL - finalArgs = prependArgs(cteArgs, mainArgs) + finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL + finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs) } else { sb.GroupBy("ALL") @@ -257,11 +299,12 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery( sb.Having(query.Having.Expression) } - mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...) + combinedArgs := append(allGroupByArgs, allAggChArgs...) + mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...) // Stitch it all together: WITH … SELECT … - finalSQL = combineCTEs(cteFragments) + mainSQL - finalArgs = prependArgs(cteArgs, mainArgs) + finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL + finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs) } return &qbtypes.Statement{ @@ -295,20 +338,29 @@ func (b *traceQueryStatementBuilder) buildScalarQuery( allAggChArgs := []any{} - // Add group by columns - for _, groupBy := range query.GroupBy { - colExpr, err := b.fm.ColumnExpressionFor(ctx, &groupBy.TelemetryFieldKey, keys) + var allGroupByArgs []any + for _, gb := range query.GroupBy { + expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString) if err != nil { return nil, err } - sb.SelectMore(colExpr) + colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name) + allGroupByArgs = append(allGroupByArgs, args...) + sb.SelectMore(sqlbuilder.Escape(colExpr)) } + // for scalar queries, the rate would be end-start + rateInterval := (end - start) / querybuilder.NsToSeconds + // Add aggregation if len(query.Aggregations) > 0 { for idx := range query.Aggregations { aggExpr := query.Aggregations[idx] - rewritten, chArgs, err := b.aggExprRewriter.Rewrite(ctx, aggExpr.Expression) + rewritten, chArgs, err := b.aggExprRewriter.Rewrite( + ctx, aggExpr.Expression, + rateInterval, + keys, + ) if err != nil { return nil, err } @@ -321,7 +373,7 @@ func (b *traceQueryStatementBuilder) buildScalarQuery( sb.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName)) // Add filter conditions - warnings, err := b.addFilterCondition(ctx, sb, start, end, query) + warnings, err := b.addFilterCondition(ctx, sb, start, end, query, keys) if err != nil { return nil, err } @@ -338,9 +390,9 @@ func (b *traceQueryStatementBuilder) buildScalarQuery( for _, orderBy := range query.Order { idx, ok := aggOrderBy(orderBy, query) if ok { - sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction)) + sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction.StringValue())) } else { - sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction)) + sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue())) } } @@ -354,10 +406,12 @@ func (b *traceQueryStatementBuilder) buildScalarQuery( sb.Limit(query.Limit) } - mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, allAggChArgs...) + combinedArgs := append(allGroupByArgs, allAggChArgs...) - finalSQL := combineCTEs(cteFragments) + mainSQL - finalArgs := prependArgs(cteArgs, mainArgs) + mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...) + + finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL + finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs) return &qbtypes.Statement{ Query: finalSQL, @@ -367,14 +421,30 @@ func (b *traceQueryStatementBuilder) buildScalarQuery( } // buildFilterCondition builds SQL condition from filter expression -func (b *traceQueryStatementBuilder) addFilterCondition(ctx context.Context, sb *sqlbuilder.SelectBuilder, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]) ([]string, error) { +func (b *traceQueryStatementBuilder) addFilterCondition( + _ context.Context, + sb *sqlbuilder.SelectBuilder, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], + keys map[string][]*telemetrytypes.TelemetryFieldKey, +) ([]string, error) { - // add filter expression + var filterWhereClause *sqlbuilder.WhereClause + var warnings []string + var err error - filterWhereClause, warnings, err := b.compiler.Compile(ctx, query.Filter.Expression) + if query.Filter != nil && query.Filter.Expression != "" { + // add filter expression + filterWhereClause, warnings, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{ + FieldMapper: b.fm, + ConditionBuilder: b.cb, + FieldKeys: keys, + SkipResourceFilter: true, + }) - if err != nil { - return nil, err + if err != nil { + return nil, err + } } if filterWhereClause != nil { @@ -382,36 +452,14 @@ func (b *traceQueryStatementBuilder) addFilterCondition(ctx context.Context, sb } // add time filter - startBucket := start/1000000000 - 1800 - endBucket := end / 1000000000 + startBucket := start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment + endBucket := end / querybuilder.NsToSeconds - sb.Where(sb.GE("timestamp", start), sb.LE("timestamp", end), sb.GE("ts_bucket_start", startBucket), sb.LE("ts_bucket_start", endBucket)) + sb.Where(sb.GE("timestamp", fmt.Sprintf("%d", start)), sb.LE("timestamp", fmt.Sprintf("%d", end)), sb.GE("ts_bucket_start", startBucket), sb.LE("ts_bucket_start", endBucket)) return warnings, nil } -// combineCTEs takes any number of individual CTE fragments like -// -// "__resource_filter AS (...)", "__limit_cte AS (...)" -// -// and renders the final `WITH …` clause. -func combineCTEs(ctes []string) string { - if len(ctes) == 0 { - return "" - } - return "WITH " + strings.Join(ctes, ", ") + " " -} - -// prependArgs ensures CTE arguments appear before main-query arguments -// in the final slice so their ordinal positions match the SQL string. -func prependArgs(cteArgs [][]any, mainArgs []any) []any { - out := make([]any, 0, len(mainArgs)+len(cteArgs)) - for _, a := range cteArgs { // CTEs first, in declaration order - out = append(out, a...) - } - return append(out, mainArgs...) -} - func aggOrderBy(k qbtypes.OrderBy, q qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]) (int, bool) { for i, agg := range q.Aggregations { if k.Key.Name == agg.Alias || @@ -446,7 +494,7 @@ func (b *traceQueryStatementBuilder) buildResourceFilterCTE( start, end uint64, ) (*qbtypes.Statement, error) { - return b.opts.ResourceFilterStmtBuilder.Build( + return b.resourceFilterStmtBuilder.Build( ctx, start, end, diff --git a/pkg/telemetrytraces/stmt_builder_test.go b/pkg/telemetrytraces/stmt_builder_test.go index 864838cf30..8448472382 100644 --- a/pkg/telemetrytraces/stmt_builder_test.go +++ b/pkg/telemetrytraces/stmt_builder_test.go @@ -2,6 +2,7 @@ package telemetrytraces import ( "context" + "log/slog" "testing" "time" @@ -18,17 +19,12 @@ func resourceFilterStmtBuilder() (qbtypes.StatementBuilder[qbtypes.TraceAggregat cb := resourcefilter.NewConditionBuilder(fm) mockMetadataStore := telemetrytypestest.NewMockMetadataStore() mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() - compiler := resourcefilter.NewFilterCompiler(resourcefilter.FilterCompilerOpts{ - FieldMapper: fm, - ConditionBuilder: cb, - MetadataStore: mockMetadataStore, - }) - return resourcefilter.NewTraceResourceFilterStatementBuilder(resourcefilter.ResourceFilterStatementBuilderOpts{ - FieldMapper: fm, - ConditionBuilder: cb, - Compiler: compiler, - }), nil + return resourcefilter.NewTraceResourceFilterStatementBuilder( + fm, + cb, + mockMetadataStore, + ), nil } func TestStatementBuilder(t *testing.T) { @@ -63,8 +59,8 @@ func TestStatementBuilder(t *testing.T) { }, }, expected: qbtypes.Statement{ - Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT resources_string['service.name'] AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, resources_string['service.name'] AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL", - Args: []any{"redis-manual", "%service.name%", "%service.name%redis-manual%", uint64(1747945619), uint64(1747983448), uint64(1747947419000000000), uint64(1747983448000000000), uint64(1747945619), uint64(1747983448), 10, uint64(1747947419000000000), uint64(1747983448000000000), uint64(1747945619), uint64(1747983448)}, + Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL", + Args: []any{"redis-manual", "%service.name%", "%service.name%redis-manual%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)}, }, expectedErr: nil, }, @@ -74,29 +70,19 @@ func TestStatementBuilder(t *testing.T) { cb := NewConditionBuilder(fm) mockMetadataStore := telemetrytypestest.NewMockMetadataStore() mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() - compiler := NewFilterCompiler(FilterCompilerOpts{ - FieldMapper: fm, - ConditionBuilder: cb, - MetadataStore: mockMetadataStore, - SkipResourceFilter: true, - }) - aggExprRewriter := querybuilder.NewAggExprRewriter(querybuilder.AggExprRewriterOptions{ - FieldMapper: fm, - ConditionBuilder: cb, - MetadataStore: mockMetadataStore, - }) + aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil) resourceFilterStmtBuilder, err := resourceFilterStmtBuilder() require.NoError(t, err) - statementBuilder := NewTraceQueryStatementBuilder(TraceQueryStatementBuilderOpts{ - FieldMapper: fm, - ConditionBuilder: cb, - Compiler: compiler, - MetadataStore: mockMetadataStore, - AggExprRewriter: aggExprRewriter, - ResourceFilterStmtBuilder: resourceFilterStmtBuilder, - }) + statementBuilder := NewTraceQueryStatementBuilder( + slog.Default(), + mockMetadataStore, + fm, + cb, + resourceFilterStmtBuilder, + aggExprRewriter, + ) for _, c := range cases { t.Run(c.name, func(t *testing.T) { diff --git a/pkg/telemetrytraces/test_data.go b/pkg/telemetrytraces/test_data.go index 051af18cc7..926fc61aae 100644 --- a/pkg/telemetrytraces/test_data.go +++ b/pkg/telemetrytraces/test_data.go @@ -5,7 +5,7 @@ import ( ) func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey { - return map[string][]*telemetrytypes.TelemetryFieldKey{ + keysMap := map[string][]*telemetrytypes.TelemetryFieldKey{ "service.name": { { Name: "service.name", @@ -35,4 +35,10 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey { }, }, } + for _, keys := range keysMap { + for _, key := range keys { + key.Signal = telemetrytypes.SignalTraces + } + } + return keysMap } diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/qb.go b/pkg/types/querybuildertypes/querybuildertypesv5/qb.go index a4706b0ede..bc0f7ce071 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/qb.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/qb.go @@ -34,24 +34,10 @@ type ConditionBuilder interface { ConditionFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, operator FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) } -type FilterCompiler interface { - // Compile compiles the filter into a sqlbuilder.WhereClause. - Compile(ctx context.Context, filter string) (*sqlbuilder.WhereClause, []string, error) -} - -type RewriteCtx struct { - RateInterval uint64 -} - -type RewriteOption func(*RewriteCtx) - -func WithRateInterval(interval uint64) RewriteOption { - return func(c *RewriteCtx) { c.RateInterval = interval } -} - type AggExprRewriter interface { // Rewrite rewrites the aggregation expression to be used in the query. - Rewrite(ctx context.Context, expr string, opts ...RewriteOption) (string, []any, error) + Rewrite(ctx context.Context, expr string, rateInterval uint64, keys map[string][]*telemetrytypes.TelemetryFieldKey) (string, []any, error) + RewriteMulti(ctx context.Context, exprs []string, rateInterval uint64, keys map[string][]*telemetrytypes.TelemetryFieldKey) ([]string, [][]any, error) } type Statement struct { diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/query.go b/pkg/types/querybuildertypes/querybuildertypesv5/query.go index 070fd5dfa2..d56e7b0fa6 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/query.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/query.go @@ -11,19 +11,20 @@ type Query interface { // Window returns [from, to) in epoch‑ms so cache can slice/merge. Window() (startMS, endMS uint64) // Execute runs the query; implementors must be side‑effect‑free. - Execute(ctx context.Context) (Result, error) + Execute(ctx context.Context) (*Result, error) } type Result struct { - Type RequestType - Value any // concrete Go value (to be type asserted based on the RequestType) - Stats ExecStats + Type RequestType + Value any // concrete Go value (to be type asserted based on the RequestType) + Stats ExecStats + Warnings []string } type ExecStats struct { - RowsScanned int64 `json:"rowsScanned"` - BytesScanned int64 `json:"bytesScanned"` - DurationMS int64 `json:"durationMs"` + RowsScanned uint64 `json:"rowsScanned"` + BytesScanned uint64 `json:"bytesScanned"` + DurationMS uint64 `json:"durationMs"` } type TimeRange struct{ From, To uint64 } // ms since epoch diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/resp.go b/pkg/types/querybuildertypes/querybuildertypesv5/resp.go index 1e9bc69aac..bde7722939 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/resp.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/resp.go @@ -14,19 +14,19 @@ type QueryRangeResponse struct { } type TimeSeriesData struct { - QueryName string `json:"queryName"` - Aggregations []AggregationBucket `json:"aggregations"` + QueryName string `json:"queryName"` + Aggregations []*AggregationBucket `json:"aggregations"` } type AggregationBucket struct { - Index int `json:"index"` // or string Alias - Alias string `json:"alias"` - Series []TimeSeries `json:"series"` // no extra nesting + Index int `json:"index"` // or string Alias + Alias string `json:"alias"` + Series []*TimeSeries `json:"series"` // no extra nesting } type TimeSeries struct { - Labels []Label `json:"labels,omitempty"` - Values []TimeSeriesValue `json:"values"` + Labels []*Label `json:"labels,omitempty"` + Values []*TimeSeriesValue `json:"values"` } type Label struct { @@ -36,10 +36,10 @@ type Label struct { type TimeSeriesValue struct { Timestamp int64 `json:"timestamp"` - Value float64 `json:"value,omitempty"` + Value float64 `json:"value"` // for the heatmap type chart Values []float64 `json:"values,omitempty"` - Bucket Bucket `json:"bucket,omitempty"` + Bucket *Bucket `json:"bucket,omitempty"` } type Bucket struct { @@ -65,16 +65,17 @@ type ColumnDescriptor struct { } type ScalarData struct { - Columns []ColumnDescriptor `json:"columns"` - Data [][]any `json:"data"` + Columns []*ColumnDescriptor `json:"columns"` + Data [][]any `json:"data"` } type RawData struct { - QueryName string `json:"queryName"` - Rows []RawRow `json:"rows"` + QueryName string `json:"queryName"` + NextCursor string `json:"nextCursor"` + Rows []*RawRow `json:"rows"` } type RawRow struct { - Timestamp time.Time `json:"timestamp"` - Data map[string]any `json:"data"` + Timestamp time.Time `json:"timestamp"` + Data map[string]*any `json:"data"` }