signoz/pkg/querier/consume.go
2025-05-27 20:54:48 +05:30

374 lines
8.5 KiB
Go

package querier
import (
"fmt"
"math"
"reflect"
"regexp"
"sort"
"strconv"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
var (
aggRe = regexp.MustCompile(`^__result_(\d+)$`)
)
// consume reads every row and shapes it into the payload expected for the
// given request type.
//
// * Time-series - []*qbtypes.TimeSeriesData
// * Scalar - []*qbtypes.ScalarData
// * Raw - []*qbtypes.RawData
// * Distribution- []*qbtypes.DistributionData
func consume(rows driver.Rows, kind qbtypes.RequestType) (any, error) {
var (
payload any
err error
)
switch kind {
case qbtypes.RequestTypeTimeSeries:
payload, err = readAsTimeSeries(rows)
case qbtypes.RequestTypeScalar:
payload, err = readAsScalar(rows)
case qbtypes.RequestTypeRaw:
payload, err = readAsRaw(rows)
// TODO: add support for other request types
}
return payload, err
}
func readAsTimeSeries(rows driver.Rows) ([]*qbtypes.TimeSeriesData, error) {
colTypes := rows.ColumnTypes()
colNames := rows.Columns()
slots := make([]any, len(colTypes))
numericColsCount := 0
for i, ct := range colTypes {
slots[i] = reflect.New(ct.ScanType()).Interface()
if numericKind(ct.ScanType().Kind()) {
numericColsCount++
}
}
type sKey struct {
agg int
key string // deterministic join of label values
}
seriesMap := map[sKey]*qbtypes.TimeSeries{}
for rows.Next() {
if err := rows.Scan(slots...); err != nil {
return nil, err
}
var (
ts int64
lblVals []string
lblObjs []*qbtypes.Label
aggValues = map[int]float64{} // all __result_N in this row
fallbackValue float64 // value when NO __result_N columns exist
fallbackSeen bool
)
for idx, ptr := range slots {
name := colNames[idx]
switch v := ptr.(type) {
case *time.Time:
ts = v.UnixMilli()
case *float64, *float32, *int64, *int32, *uint64, *uint32:
val := numericAsFloat(reflect.ValueOf(ptr).Elem().Interface())
if m := aggRe.FindStringSubmatch(name); m != nil {
id, _ := strconv.Atoi(m[1])
aggValues[id] = val
} else if numericColsCount == 1 { // classic single-value query
fallbackValue = val
fallbackSeen = true
} else {
// numeric label
lblVals = append(lblVals, fmt.Sprint(val))
lblObjs = append(lblObjs, &qbtypes.Label{
Key: telemetrytypes.TelemetryFieldKey{Name: name},
Value: val,
})
}
case **float64, **float32, **int64, **int32, **uint64, **uint32:
tempVal := reflect.ValueOf(ptr)
if tempVal.IsValid() && !tempVal.IsNil() && !tempVal.Elem().IsNil() {
val := numericAsFloat(tempVal.Elem().Elem().Interface())
if m := aggRe.FindStringSubmatch(name); m != nil {
id, _ := strconv.Atoi(m[1])
aggValues[id] = val
} else if numericColsCount == 1 { // classic single-value query
fallbackValue = val
fallbackSeen = true
} else {
// numeric label
lblVals = append(lblVals, fmt.Sprint(val))
lblObjs = append(lblObjs, &qbtypes.Label{
Key: telemetrytypes.TelemetryFieldKey{Name: name},
Value: val,
})
}
}
case *string:
lblVals = append(lblVals, *v)
lblObjs = append(lblObjs, &qbtypes.Label{
Key: telemetrytypes.TelemetryFieldKey{Name: name},
Value: *v,
})
case **string:
val := *v
if val == nil {
var empty string
val = &empty
}
lblVals = append(lblVals, *val)
lblObjs = append(lblObjs, &qbtypes.Label{
Key: telemetrytypes.TelemetryFieldKey{Name: name},
Value: val,
})
default:
continue
}
}
// Edge-case: no __result_N columns, but a single numeric column present
if len(aggValues) == 0 && fallbackSeen {
aggValues[0] = fallbackValue
}
if ts == 0 || len(aggValues) == 0 {
continue // nothing useful
}
sort.Strings(lblVals)
labelsKey := strings.Join(lblVals, ",")
// one point per aggregation in this row
for aggIdx, val := range aggValues {
if math.IsNaN(val) || math.IsInf(val, 0) {
continue
}
key := sKey{agg: aggIdx, key: labelsKey}
series, ok := seriesMap[key]
if !ok {
series = &qbtypes.TimeSeries{Labels: lblObjs}
seriesMap[key] = series
}
series.Values = append(series.Values, &qbtypes.TimeSeriesValue{
Timestamp: ts,
Value: val,
})
}
}
if err := rows.Err(); err != nil {
return nil, err
}
maxAgg := -1
for k := range seriesMap {
if k.agg > maxAgg {
maxAgg = k.agg
}
}
if maxAgg < 0 {
return nil, nil // empty result-set
}
buckets := make([]*qbtypes.AggregationBucket, maxAgg+1)
for i := range buckets {
buckets[i] = &qbtypes.AggregationBucket{
Index: i,
Alias: "__result_" + strconv.Itoa(i),
}
}
for k, s := range seriesMap {
buckets[k.agg].Series = append(buckets[k.agg].Series, s)
}
var nonEmpty []*qbtypes.AggregationBucket
for _, b := range buckets {
if len(b.Series) > 0 {
nonEmpty = append(nonEmpty, b)
}
}
return []*qbtypes.TimeSeriesData{{
Aggregations: nonEmpty,
}}, nil
}
func numericKind(k reflect.Kind) bool {
switch k {
case reflect.Float32, reflect.Float64,
reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
return true
default:
return false
}
}
func readAsScalar(rows driver.Rows) (*qbtypes.ScalarData, error) {
colNames := rows.Columns()
colTypes := rows.ColumnTypes()
cd := make([]*qbtypes.ColumnDescriptor, len(colNames))
for i, name := range colNames {
colType := qbtypes.ColumnTypeGroup
if aggRe.MatchString(name) {
colType = qbtypes.ColumnTypeAggregation
}
cd[i] = &qbtypes.ColumnDescriptor{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: name},
AggregationIndex: int64(i),
Type: colType,
}
}
var data [][]any
for rows.Next() {
scan := make([]any, len(colTypes))
for i := range scan {
scan[i] = reflect.New(colTypes[i].ScanType()).Interface()
}
if err := rows.Scan(scan...); err != nil {
return nil, err
}
// 2. deref each slot into the output row
row := make([]any, len(scan))
for i, cell := range scan {
valPtr := reflect.ValueOf(cell)
if valPtr.Kind() == reflect.Pointer && !valPtr.IsNil() {
row[i] = valPtr.Elem().Interface()
} else {
row[i] = nil // Nullable columns come back as nil pointers
}
}
data = append(data, row)
}
if err := rows.Err(); err != nil {
return nil, err
}
return &qbtypes.ScalarData{
Columns: cd,
Data: data,
}, nil
}
func readAsRaw(rows driver.Rows) (*qbtypes.RawData, error) {
colNames := rows.Columns()
colTypes := rows.ColumnTypes()
colCnt := len(colNames)
// Build a template slice of correctly-typed pointers once
scanTpl := make([]any, colCnt)
for i, ct := range colTypes {
scanTpl[i] = reflect.New(ct.ScanType()).Interface()
}
var outRows []*qbtypes.RawRow
for rows.Next() {
// fresh copy of the scan slice (otherwise the driver reuses pointers)
scan := make([]any, colCnt)
for i := range scanTpl {
scan[i] = reflect.New(colTypes[i].ScanType()).Interface()
}
if err := rows.Scan(scan...); err != nil {
return nil, err
}
rr := qbtypes.RawRow{
Data: make(map[string]*any, colCnt),
}
for i, cellPtr := range scan {
name := colNames[i]
// de-reference the typed pointer to any
val := reflect.ValueOf(cellPtr).Elem().Interface()
// special-case: timestamp column
if name == "timestamp" || name == "timestamp_datetime" {
switch t := val.(type) {
case time.Time:
rr.Timestamp = t
case uint64: // epoch-ns stored as integer
rr.Timestamp = time.Unix(0, int64(t))
case int64:
rr.Timestamp = time.Unix(0, t)
default:
// leave zero time if unrecognised
}
}
// store value in map as *any, to match the schema
v := any(val)
rr.Data[name] = &v
}
outRows = append(outRows, &rr)
}
if err := rows.Err(); err != nil {
return nil, err
}
return &qbtypes.RawData{
Rows: outRows,
}, nil
}
func numericAsFloat(v any) float64 {
switch x := v.(type) {
case float64:
return x
case float32:
return float64(x)
case int64:
return float64(x)
case int32:
return float64(x)
case int16:
return float64(x)
case int8:
return float64(x)
case int:
return float64(x)
case uint64:
return float64(x)
case uint32:
return float64(x)
case uint16:
return float64(x)
case uint8:
return float64(x)
case uint:
return float64(x)
default:
return math.NaN()
}
}