mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-07-29 16:32:00 +08:00
205 lines
4.7 KiB
Go
205 lines
4.7 KiB
Go
package querier
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
|
)
|
|
|
|
type builderQuery[T any] struct {
|
|
telemetryStore telemetrystore.TelemetryStore
|
|
stmtBuilder qbtypes.StatementBuilder[T]
|
|
spec qbtypes.QueryBuilderQuery[T]
|
|
|
|
fromMS uint64
|
|
toMS uint64
|
|
kind qbtypes.RequestType
|
|
}
|
|
|
|
var _ qbtypes.Query = (*builderQuery[any])(nil)
|
|
|
|
func newBuilderQuery[T any](
|
|
telemetryStore telemetrystore.TelemetryStore,
|
|
stmtBuilder qbtypes.StatementBuilder[T],
|
|
spec qbtypes.QueryBuilderQuery[T],
|
|
tr qbtypes.TimeRange,
|
|
kind qbtypes.RequestType,
|
|
) *builderQuery[T] {
|
|
return &builderQuery[T]{
|
|
telemetryStore: telemetryStore,
|
|
stmtBuilder: stmtBuilder,
|
|
spec: spec,
|
|
fromMS: tr.From,
|
|
toMS: tr.To,
|
|
kind: kind,
|
|
}
|
|
}
|
|
|
|
func (q *builderQuery[T]) Fingerprint() string {
|
|
// TODO: implement this
|
|
return ""
|
|
}
|
|
|
|
func (q *builderQuery[T]) Window() (uint64, uint64) {
|
|
return q.fromMS, q.toMS
|
|
}
|
|
|
|
// must be a single query, ordered by timestamp (logs need an id tie-break).
|
|
func (q *builderQuery[T]) isWindowList() bool {
|
|
if len(q.spec.Order) == 0 {
|
|
return false
|
|
}
|
|
|
|
// first ORDER BY must be `timestamp`
|
|
if q.spec.Order[0].Key.Name != "timestamp" {
|
|
return false
|
|
}
|
|
|
|
if q.spec.Signal == telemetrytypes.SignalLogs {
|
|
// logs require timestamp,id with identical direction
|
|
if len(q.spec.Order) != 2 || q.spec.Order[1].Key.Name != "id" ||
|
|
q.spec.Order[1].Direction != q.spec.Order[0].Direction {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error) {
|
|
|
|
// can we do window based pagination?
|
|
if q.kind == qbtypes.RequestTypeRaw && q.isWindowList() {
|
|
return q.executeWindowList(ctx)
|
|
}
|
|
|
|
stmt, err := q.stmtBuilder.Build(ctx, q.fromMS, q.toMS, q.kind, q.spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
chQuery := qbtypes.ClickHouseQuery{
|
|
Name: q.spec.Name,
|
|
Query: stmt.Query,
|
|
}
|
|
|
|
chExec := newchSQLQuery(q.telemetryStore, chQuery, stmt.Args, qbtypes.TimeRange{From: q.fromMS, To: q.toMS}, q.kind)
|
|
result, err := chExec.Execute(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result.Warnings = stmt.Warnings
|
|
return result, nil
|
|
}
|
|
|
|
func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Result, error) {
|
|
isAsc := len(q.spec.Order) > 0 &&
|
|
strings.ToLower(string(q.spec.Order[0].Direction.StringValue())) == "asc"
|
|
|
|
// Adjust [fromMS,toMS] window if a cursor was supplied
|
|
if cur := strings.TrimSpace(q.spec.Cursor); cur != "" {
|
|
if ts, err := decodeCursor(cur); err == nil {
|
|
if isAsc {
|
|
if uint64(ts) >= q.fromMS {
|
|
q.fromMS = uint64(ts + 1)
|
|
}
|
|
} else { // DESC
|
|
if uint64(ts) <= q.toMS {
|
|
q.toMS = uint64(ts - 1)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
reqLimit := q.spec.Limit
|
|
if reqLimit == 0 {
|
|
reqLimit = 10_000 // sane upper-bound default
|
|
}
|
|
offsetLeft := q.spec.Offset
|
|
need := reqLimit + offsetLeft // rows to fetch from ClickHouse
|
|
|
|
var rows []*qbtypes.RawRow
|
|
|
|
totalRows := uint64(0)
|
|
totalBytes := uint64(0)
|
|
start := time.Now()
|
|
|
|
for _, r := range makeBuckets(q.fromMS, q.toMS) {
|
|
q.spec.Offset = 0
|
|
q.spec.Limit = need
|
|
|
|
stmt, err := q.stmtBuilder.Build(ctx, r.fromNS/1e6, r.toNS/1e6, q.kind, q.spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
chExec := newchSQLQuery(
|
|
q.telemetryStore,
|
|
qbtypes.ClickHouseQuery{Name: q.spec.Name, Query: stmt.Query},
|
|
stmt.Args,
|
|
qbtypes.TimeRange{From: q.fromMS, To: q.toMS},
|
|
q.kind,
|
|
)
|
|
res, err := chExec.Execute(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
totalRows += res.Stats.RowsScanned
|
|
totalBytes += res.Stats.BytesScanned
|
|
|
|
rawRows := res.Value.(*qbtypes.RawData).Rows
|
|
need -= len(rawRows)
|
|
|
|
for _, rr := range rawRows {
|
|
if offsetLeft > 0 { // client-requested initial offset
|
|
offsetLeft--
|
|
continue
|
|
}
|
|
rows = append(rows, rr)
|
|
if len(rows) >= reqLimit { // page filled
|
|
break
|
|
}
|
|
}
|
|
if len(rows) >= reqLimit {
|
|
break
|
|
}
|
|
}
|
|
|
|
nextCursor := ""
|
|
if len(rows) == reqLimit {
|
|
lastTS := rows[len(rows)-1].Timestamp.UnixMilli()
|
|
nextCursor = encodeCursor(lastTS)
|
|
}
|
|
|
|
return &qbtypes.Result{
|
|
Type: qbtypes.RequestTypeRaw,
|
|
Value: &qbtypes.RawData{
|
|
QueryName: q.spec.Name,
|
|
Rows: rows,
|
|
NextCursor: nextCursor,
|
|
},
|
|
Stats: qbtypes.ExecStats{
|
|
RowsScanned: totalRows,
|
|
BytesScanned: totalBytes,
|
|
DurationMS: uint64(time.Since(start).Milliseconds()),
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func encodeCursor(tsMilli int64) string {
|
|
return base64.StdEncoding.EncodeToString([]byte(strconv.FormatInt(tsMilli, 10)))
|
|
}
|
|
|
|
func decodeCursor(cur string) (int64, error) {
|
|
b, err := base64.StdEncoding.DecodeString(cur)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return strconv.ParseInt(string(b), 10, 64)
|
|
}
|