signoz/pkg/querier/builder_query.go
2025-05-27 20:54:48 +05:30

205 lines
4.7 KiB
Go

package querier
import (
"context"
"encoding/base64"
"strconv"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/telemetrystore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
type builderQuery[T any] struct {
telemetryStore telemetrystore.TelemetryStore
stmtBuilder qbtypes.StatementBuilder[T]
spec qbtypes.QueryBuilderQuery[T]
fromMS uint64
toMS uint64
kind qbtypes.RequestType
}
var _ qbtypes.Query = (*builderQuery[any])(nil)
func newBuilderQuery[T any](
telemetryStore telemetrystore.TelemetryStore,
stmtBuilder qbtypes.StatementBuilder[T],
spec qbtypes.QueryBuilderQuery[T],
tr qbtypes.TimeRange,
kind qbtypes.RequestType,
) *builderQuery[T] {
return &builderQuery[T]{
telemetryStore: telemetryStore,
stmtBuilder: stmtBuilder,
spec: spec,
fromMS: tr.From,
toMS: tr.To,
kind: kind,
}
}
func (q *builderQuery[T]) Fingerprint() string {
// TODO: implement this
return ""
}
func (q *builderQuery[T]) Window() (uint64, uint64) {
return q.fromMS, q.toMS
}
// must be a single query, ordered by timestamp (logs need an id tie-break).
func (q *builderQuery[T]) isWindowList() bool {
if len(q.spec.Order) == 0 {
return false
}
// first ORDER BY must be `timestamp`
if q.spec.Order[0].Key.Name != "timestamp" {
return false
}
if q.spec.Signal == telemetrytypes.SignalLogs {
// logs require timestamp,id with identical direction
if len(q.spec.Order) != 2 || q.spec.Order[1].Key.Name != "id" ||
q.spec.Order[1].Direction != q.spec.Order[0].Direction {
return false
}
}
return true
}
func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error) {
// can we do window based pagination?
if q.kind == qbtypes.RequestTypeRaw && q.isWindowList() {
return q.executeWindowList(ctx)
}
stmt, err := q.stmtBuilder.Build(ctx, q.fromMS, q.toMS, q.kind, q.spec)
if err != nil {
return nil, err
}
chQuery := qbtypes.ClickHouseQuery{
Name: q.spec.Name,
Query: stmt.Query,
}
chExec := newchSQLQuery(q.telemetryStore, chQuery, stmt.Args, qbtypes.TimeRange{From: q.fromMS, To: q.toMS}, q.kind)
result, err := chExec.Execute(ctx)
if err != nil {
return nil, err
}
result.Warnings = stmt.Warnings
return result, nil
}
func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Result, error) {
isAsc := len(q.spec.Order) > 0 &&
strings.ToLower(string(q.spec.Order[0].Direction.StringValue())) == "asc"
// Adjust [fromMS,toMS] window if a cursor was supplied
if cur := strings.TrimSpace(q.spec.Cursor); cur != "" {
if ts, err := decodeCursor(cur); err == nil {
if isAsc {
if uint64(ts) >= q.fromMS {
q.fromMS = uint64(ts + 1)
}
} else { // DESC
if uint64(ts) <= q.toMS {
q.toMS = uint64(ts - 1)
}
}
}
}
reqLimit := q.spec.Limit
if reqLimit == 0 {
reqLimit = 10_000 // sane upper-bound default
}
offsetLeft := q.spec.Offset
need := reqLimit + offsetLeft // rows to fetch from ClickHouse
var rows []*qbtypes.RawRow
totalRows := uint64(0)
totalBytes := uint64(0)
start := time.Now()
for _, r := range makeBuckets(q.fromMS, q.toMS) {
q.spec.Offset = 0
q.spec.Limit = need
stmt, err := q.stmtBuilder.Build(ctx, r.fromNS/1e6, r.toNS/1e6, q.kind, q.spec)
if err != nil {
return nil, err
}
chExec := newchSQLQuery(
q.telemetryStore,
qbtypes.ClickHouseQuery{Name: q.spec.Name, Query: stmt.Query},
stmt.Args,
qbtypes.TimeRange{From: q.fromMS, To: q.toMS},
q.kind,
)
res, err := chExec.Execute(ctx)
if err != nil {
return nil, err
}
totalRows += res.Stats.RowsScanned
totalBytes += res.Stats.BytesScanned
rawRows := res.Value.(*qbtypes.RawData).Rows
need -= len(rawRows)
for _, rr := range rawRows {
if offsetLeft > 0 { // client-requested initial offset
offsetLeft--
continue
}
rows = append(rows, rr)
if len(rows) >= reqLimit { // page filled
break
}
}
if len(rows) >= reqLimit {
break
}
}
nextCursor := ""
if len(rows) == reqLimit {
lastTS := rows[len(rows)-1].Timestamp.UnixMilli()
nextCursor = encodeCursor(lastTS)
}
return &qbtypes.Result{
Type: qbtypes.RequestTypeRaw,
Value: &qbtypes.RawData{
QueryName: q.spec.Name,
Rows: rows,
NextCursor: nextCursor,
},
Stats: qbtypes.ExecStats{
RowsScanned: totalRows,
BytesScanned: totalBytes,
DurationMS: uint64(time.Since(start).Milliseconds()),
},
}, nil
}
func encodeCursor(tsMilli int64) string {
return base64.StdEncoding.EncodeToString([]byte(strconv.FormatInt(tsMilli, 10)))
}
func decodeCursor(cur string) (int64, error) {
b, err := base64.StdEncoding.DecodeString(cur)
if err != nil {
return 0, err
}
return strconv.ParseInt(string(b), 10, 64)
}