mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-07-29 11:41:58 +08:00
374 lines
8.5 KiB
Go
374 lines
8.5 KiB
Go
package querier
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"reflect"
|
|
"regexp"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
|
)
|
|
|
|
var (
|
|
aggRe = regexp.MustCompile(`^__result_(\d+)$`)
|
|
)
|
|
|
|
// consume reads every row and shapes it into the payload expected for the
|
|
// given request type.
|
|
//
|
|
// * Time-series - []*qbtypes.TimeSeriesData
|
|
// * Scalar - []*qbtypes.ScalarData
|
|
// * Raw - []*qbtypes.RawData
|
|
// * Distribution- []*qbtypes.DistributionData
|
|
func consume(rows driver.Rows, kind qbtypes.RequestType) (any, error) {
|
|
var (
|
|
payload any
|
|
err error
|
|
)
|
|
|
|
switch kind {
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
payload, err = readAsTimeSeries(rows)
|
|
case qbtypes.RequestTypeScalar:
|
|
payload, err = readAsScalar(rows)
|
|
case qbtypes.RequestTypeRaw:
|
|
payload, err = readAsRaw(rows)
|
|
// TODO: add support for other request types
|
|
}
|
|
|
|
return payload, err
|
|
}
|
|
|
|
func readAsTimeSeries(rows driver.Rows) ([]*qbtypes.TimeSeriesData, error) {
|
|
|
|
colTypes := rows.ColumnTypes()
|
|
colNames := rows.Columns()
|
|
|
|
slots := make([]any, len(colTypes))
|
|
numericColsCount := 0
|
|
for i, ct := range colTypes {
|
|
slots[i] = reflect.New(ct.ScanType()).Interface()
|
|
if numericKind(ct.ScanType().Kind()) {
|
|
numericColsCount++
|
|
}
|
|
}
|
|
|
|
type sKey struct {
|
|
agg int
|
|
key string // deterministic join of label values
|
|
}
|
|
seriesMap := map[sKey]*qbtypes.TimeSeries{}
|
|
|
|
for rows.Next() {
|
|
if err := rows.Scan(slots...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var (
|
|
ts int64
|
|
lblVals []string
|
|
lblObjs []*qbtypes.Label
|
|
aggValues = map[int]float64{} // all __result_N in this row
|
|
fallbackValue float64 // value when NO __result_N columns exist
|
|
fallbackSeen bool
|
|
)
|
|
|
|
for idx, ptr := range slots {
|
|
name := colNames[idx]
|
|
|
|
switch v := ptr.(type) {
|
|
case *time.Time:
|
|
ts = v.UnixMilli()
|
|
|
|
case *float64, *float32, *int64, *int32, *uint64, *uint32:
|
|
val := numericAsFloat(reflect.ValueOf(ptr).Elem().Interface())
|
|
if m := aggRe.FindStringSubmatch(name); m != nil {
|
|
id, _ := strconv.Atoi(m[1])
|
|
aggValues[id] = val
|
|
} else if numericColsCount == 1 { // classic single-value query
|
|
fallbackValue = val
|
|
fallbackSeen = true
|
|
} else {
|
|
// numeric label
|
|
lblVals = append(lblVals, fmt.Sprint(val))
|
|
lblObjs = append(lblObjs, &qbtypes.Label{
|
|
Key: telemetrytypes.TelemetryFieldKey{Name: name},
|
|
Value: val,
|
|
})
|
|
}
|
|
|
|
case **float64, **float32, **int64, **int32, **uint64, **uint32:
|
|
tempVal := reflect.ValueOf(ptr)
|
|
if tempVal.IsValid() && !tempVal.IsNil() && !tempVal.Elem().IsNil() {
|
|
val := numericAsFloat(tempVal.Elem().Elem().Interface())
|
|
if m := aggRe.FindStringSubmatch(name); m != nil {
|
|
id, _ := strconv.Atoi(m[1])
|
|
aggValues[id] = val
|
|
} else if numericColsCount == 1 { // classic single-value query
|
|
fallbackValue = val
|
|
fallbackSeen = true
|
|
} else {
|
|
// numeric label
|
|
lblVals = append(lblVals, fmt.Sprint(val))
|
|
lblObjs = append(lblObjs, &qbtypes.Label{
|
|
Key: telemetrytypes.TelemetryFieldKey{Name: name},
|
|
Value: val,
|
|
})
|
|
}
|
|
}
|
|
|
|
case *string:
|
|
lblVals = append(lblVals, *v)
|
|
lblObjs = append(lblObjs, &qbtypes.Label{
|
|
Key: telemetrytypes.TelemetryFieldKey{Name: name},
|
|
Value: *v,
|
|
})
|
|
|
|
case **string:
|
|
val := *v
|
|
if val == nil {
|
|
var empty string
|
|
val = &empty
|
|
}
|
|
lblVals = append(lblVals, *val)
|
|
lblObjs = append(lblObjs, &qbtypes.Label{
|
|
Key: telemetrytypes.TelemetryFieldKey{Name: name},
|
|
Value: val,
|
|
})
|
|
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Edge-case: no __result_N columns, but a single numeric column present
|
|
if len(aggValues) == 0 && fallbackSeen {
|
|
aggValues[0] = fallbackValue
|
|
}
|
|
|
|
if ts == 0 || len(aggValues) == 0 {
|
|
continue // nothing useful
|
|
}
|
|
|
|
sort.Strings(lblVals)
|
|
labelsKey := strings.Join(lblVals, ",")
|
|
|
|
// one point per aggregation in this row
|
|
for aggIdx, val := range aggValues {
|
|
if math.IsNaN(val) || math.IsInf(val, 0) {
|
|
continue
|
|
}
|
|
|
|
key := sKey{agg: aggIdx, key: labelsKey}
|
|
|
|
series, ok := seriesMap[key]
|
|
if !ok {
|
|
series = &qbtypes.TimeSeries{Labels: lblObjs}
|
|
seriesMap[key] = series
|
|
}
|
|
series.Values = append(series.Values, &qbtypes.TimeSeriesValue{
|
|
Timestamp: ts,
|
|
Value: val,
|
|
})
|
|
}
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
maxAgg := -1
|
|
for k := range seriesMap {
|
|
if k.agg > maxAgg {
|
|
maxAgg = k.agg
|
|
}
|
|
}
|
|
if maxAgg < 0 {
|
|
return nil, nil // empty result-set
|
|
}
|
|
|
|
buckets := make([]*qbtypes.AggregationBucket, maxAgg+1)
|
|
for i := range buckets {
|
|
buckets[i] = &qbtypes.AggregationBucket{
|
|
Index: i,
|
|
Alias: "__result_" + strconv.Itoa(i),
|
|
}
|
|
}
|
|
for k, s := range seriesMap {
|
|
buckets[k.agg].Series = append(buckets[k.agg].Series, s)
|
|
}
|
|
|
|
var nonEmpty []*qbtypes.AggregationBucket
|
|
for _, b := range buckets {
|
|
if len(b.Series) > 0 {
|
|
nonEmpty = append(nonEmpty, b)
|
|
}
|
|
}
|
|
|
|
return []*qbtypes.TimeSeriesData{{
|
|
Aggregations: nonEmpty,
|
|
}}, nil
|
|
}
|
|
|
|
func numericKind(k reflect.Kind) bool {
|
|
switch k {
|
|
case reflect.Float32, reflect.Float64,
|
|
reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
|
|
reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func readAsScalar(rows driver.Rows) (*qbtypes.ScalarData, error) {
|
|
colNames := rows.Columns()
|
|
colTypes := rows.ColumnTypes()
|
|
|
|
cd := make([]*qbtypes.ColumnDescriptor, len(colNames))
|
|
|
|
for i, name := range colNames {
|
|
colType := qbtypes.ColumnTypeGroup
|
|
if aggRe.MatchString(name) {
|
|
colType = qbtypes.ColumnTypeAggregation
|
|
}
|
|
cd[i] = &qbtypes.ColumnDescriptor{
|
|
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: name},
|
|
AggregationIndex: int64(i),
|
|
Type: colType,
|
|
}
|
|
}
|
|
|
|
var data [][]any
|
|
|
|
for rows.Next() {
|
|
scan := make([]any, len(colTypes))
|
|
for i := range scan {
|
|
scan[i] = reflect.New(colTypes[i].ScanType()).Interface()
|
|
}
|
|
if err := rows.Scan(scan...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// 2. deref each slot into the output row
|
|
row := make([]any, len(scan))
|
|
for i, cell := range scan {
|
|
valPtr := reflect.ValueOf(cell)
|
|
if valPtr.Kind() == reflect.Pointer && !valPtr.IsNil() {
|
|
row[i] = valPtr.Elem().Interface()
|
|
} else {
|
|
row[i] = nil // Nullable columns come back as nil pointers
|
|
}
|
|
}
|
|
data = append(data, row)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &qbtypes.ScalarData{
|
|
Columns: cd,
|
|
Data: data,
|
|
}, nil
|
|
}
|
|
|
|
func readAsRaw(rows driver.Rows) (*qbtypes.RawData, error) {
|
|
|
|
colNames := rows.Columns()
|
|
colTypes := rows.ColumnTypes()
|
|
colCnt := len(colNames)
|
|
|
|
// Build a template slice of correctly-typed pointers once
|
|
scanTpl := make([]any, colCnt)
|
|
for i, ct := range colTypes {
|
|
scanTpl[i] = reflect.New(ct.ScanType()).Interface()
|
|
}
|
|
|
|
var outRows []*qbtypes.RawRow
|
|
|
|
for rows.Next() {
|
|
// fresh copy of the scan slice (otherwise the driver reuses pointers)
|
|
scan := make([]any, colCnt)
|
|
for i := range scanTpl {
|
|
scan[i] = reflect.New(colTypes[i].ScanType()).Interface()
|
|
}
|
|
|
|
if err := rows.Scan(scan...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rr := qbtypes.RawRow{
|
|
Data: make(map[string]*any, colCnt),
|
|
}
|
|
|
|
for i, cellPtr := range scan {
|
|
name := colNames[i]
|
|
|
|
// de-reference the typed pointer to any
|
|
val := reflect.ValueOf(cellPtr).Elem().Interface()
|
|
|
|
// special-case: timestamp column
|
|
if name == "timestamp" || name == "timestamp_datetime" {
|
|
switch t := val.(type) {
|
|
case time.Time:
|
|
rr.Timestamp = t
|
|
case uint64: // epoch-ns stored as integer
|
|
rr.Timestamp = time.Unix(0, int64(t))
|
|
case int64:
|
|
rr.Timestamp = time.Unix(0, t)
|
|
default:
|
|
// leave zero time if unrecognised
|
|
}
|
|
}
|
|
|
|
// store value in map as *any, to match the schema
|
|
v := any(val)
|
|
rr.Data[name] = &v
|
|
}
|
|
outRows = append(outRows, &rr)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &qbtypes.RawData{
|
|
Rows: outRows,
|
|
}, nil
|
|
}
|
|
|
|
func numericAsFloat(v any) float64 {
|
|
switch x := v.(type) {
|
|
case float64:
|
|
return x
|
|
case float32:
|
|
return float64(x)
|
|
case int64:
|
|
return float64(x)
|
|
case int32:
|
|
return float64(x)
|
|
case int16:
|
|
return float64(x)
|
|
case int8:
|
|
return float64(x)
|
|
case int:
|
|
return float64(x)
|
|
case uint64:
|
|
return float64(x)
|
|
case uint32:
|
|
return float64(x)
|
|
case uint16:
|
|
return float64(x)
|
|
case uint8:
|
|
return float64(x)
|
|
case uint:
|
|
return float64(x)
|
|
default:
|
|
return math.NaN()
|
|
}
|
|
}
|