signoz/pkg/querier/clickhouse_query.go
2025-05-27 20:54:48 +05:30

78 lines
1.8 KiB
Go

package querier
import (
"context"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/telemetrystore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type chSQLQuery struct {
telemetryStore telemetrystore.TelemetryStore
query qbtypes.ClickHouseQuery
args []any
fromMS uint64
toMS uint64
kind qbtypes.RequestType
}
var _ qbtypes.Query = (*chSQLQuery)(nil)
func newchSQLQuery(
telemetryStore telemetrystore.TelemetryStore,
query qbtypes.ClickHouseQuery,
args []any,
tr qbtypes.TimeRange,
kind qbtypes.RequestType,
) *chSQLQuery {
return &chSQLQuery{
telemetryStore: telemetryStore,
query: query,
args: args,
fromMS: tr.From,
toMS: tr.To,
kind: kind,
}
}
// TODO: use the same query hash scheme as ClickHouse
func (q *chSQLQuery) Fingerprint() string { return q.query.Query }
func (q *chSQLQuery) Window() (uint64, uint64) { return q.fromMS, q.toMS }
func (q *chSQLQuery) Execute(ctx context.Context) (*qbtypes.Result, error) {
totalRows := uint64(0)
totalBytes := uint64(0)
elapsed := time.Duration(0)
ctx = clickhouse.Context(ctx, clickhouse.WithProgress(func(p *clickhouse.Progress) {
totalRows += p.Rows
totalBytes += p.Bytes
elapsed += p.Elapsed
}))
rows, err := q.telemetryStore.ClickhouseDB().Query(ctx, q.query.Query, q.args...)
if err != nil {
return nil, err
}
defer rows.Close()
// TODO: map the errors from ClickHouse to our error types
payload, err := consume(rows, q.kind)
if err != nil {
return nil, err
}
return &qbtypes.Result{
Type: q.kind,
Value: payload,
Stats: qbtypes.ExecStats{
RowsScanned: totalRows,
BytesScanned: totalBytes,
DurationMS: uint64(elapsed.Milliseconds()),
},
}, nil
}