package querier import ( "fmt" "math" "reflect" "regexp" "sort" "strconv" "strings" "time" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" "github.com/SigNoz/signoz/pkg/types/telemetrytypes" ) var ( aggRe = regexp.MustCompile(`^__result_(\d+)$`) ) // consume reads every row and shapes it into the payload expected for the // given request type. // // * Time-series - []*qbtypes.TimeSeriesData // * Scalar - []*qbtypes.ScalarData // * Raw - []*qbtypes.RawData // * Distribution- []*qbtypes.DistributionData func consume(rows driver.Rows, kind qbtypes.RequestType) (any, error) { var ( payload any err error ) switch kind { case qbtypes.RequestTypeTimeSeries: payload, err = readAsTimeSeries(rows) case qbtypes.RequestTypeScalar: payload, err = readAsScalar(rows) case qbtypes.RequestTypeRaw: payload, err = readAsRaw(rows) // TODO: add support for other request types } return payload, err } func readAsTimeSeries(rows driver.Rows) ([]*qbtypes.TimeSeriesData, error) { colTypes := rows.ColumnTypes() colNames := rows.Columns() slots := make([]any, len(colTypes)) numericColsCount := 0 for i, ct := range colTypes { slots[i] = reflect.New(ct.ScanType()).Interface() if numericKind(ct.ScanType().Kind()) { numericColsCount++ } } type sKey struct { agg int key string // deterministic join of label values } seriesMap := map[sKey]*qbtypes.TimeSeries{} for rows.Next() { if err := rows.Scan(slots...); err != nil { return nil, err } var ( ts int64 lblVals []string lblObjs []*qbtypes.Label aggValues = map[int]float64{} // all __result_N in this row fallbackValue float64 // value when NO __result_N columns exist fallbackSeen bool ) for idx, ptr := range slots { name := colNames[idx] switch v := ptr.(type) { case *time.Time: ts = v.UnixMilli() case *float64, *float32, *int64, *int32, *uint64, *uint32: val := numericAsFloat(reflect.ValueOf(ptr).Elem().Interface()) if m := aggRe.FindStringSubmatch(name); m != nil { id, _ := strconv.Atoi(m[1]) aggValues[id] = val } else if numericColsCount == 1 { // classic single-value query fallbackValue = val fallbackSeen = true } else { // numeric label lblVals = append(lblVals, fmt.Sprint(val)) lblObjs = append(lblObjs, &qbtypes.Label{ Key: telemetrytypes.TelemetryFieldKey{Name: name}, Value: val, }) } case **float64, **float32, **int64, **int32, **uint64, **uint32: tempVal := reflect.ValueOf(ptr) if tempVal.IsValid() && !tempVal.IsNil() && !tempVal.Elem().IsNil() { val := numericAsFloat(tempVal.Elem().Elem().Interface()) if m := aggRe.FindStringSubmatch(name); m != nil { id, _ := strconv.Atoi(m[1]) aggValues[id] = val } else if numericColsCount == 1 { // classic single-value query fallbackValue = val fallbackSeen = true } else { // numeric label lblVals = append(lblVals, fmt.Sprint(val)) lblObjs = append(lblObjs, &qbtypes.Label{ Key: telemetrytypes.TelemetryFieldKey{Name: name}, Value: val, }) } } case *string: lblVals = append(lblVals, *v) lblObjs = append(lblObjs, &qbtypes.Label{ Key: telemetrytypes.TelemetryFieldKey{Name: name}, Value: *v, }) case **string: val := *v if val == nil { var empty string val = &empty } lblVals = append(lblVals, *val) lblObjs = append(lblObjs, &qbtypes.Label{ Key: telemetrytypes.TelemetryFieldKey{Name: name}, Value: val, }) default: continue } } // Edge-case: no __result_N columns, but a single numeric column present if len(aggValues) == 0 && fallbackSeen { aggValues[0] = fallbackValue } if ts == 0 || len(aggValues) == 0 { continue // nothing useful } sort.Strings(lblVals) labelsKey := strings.Join(lblVals, ",") // one point per aggregation in this row for aggIdx, val := range aggValues { if math.IsNaN(val) || math.IsInf(val, 0) { continue } key := sKey{agg: aggIdx, key: labelsKey} series, ok := seriesMap[key] if !ok { series = &qbtypes.TimeSeries{Labels: lblObjs} seriesMap[key] = series } series.Values = append(series.Values, &qbtypes.TimeSeriesValue{ Timestamp: ts, Value: val, }) } } if err := rows.Err(); err != nil { return nil, err } maxAgg := -1 for k := range seriesMap { if k.agg > maxAgg { maxAgg = k.agg } } if maxAgg < 0 { return nil, nil // empty result-set } buckets := make([]*qbtypes.AggregationBucket, maxAgg+1) for i := range buckets { buckets[i] = &qbtypes.AggregationBucket{ Index: i, Alias: "__result_" + strconv.Itoa(i), } } for k, s := range seriesMap { buckets[k.agg].Series = append(buckets[k.agg].Series, s) } var nonEmpty []*qbtypes.AggregationBucket for _, b := range buckets { if len(b.Series) > 0 { nonEmpty = append(nonEmpty, b) } } return []*qbtypes.TimeSeriesData{{ Aggregations: nonEmpty, }}, nil } func numericKind(k reflect.Kind) bool { switch k { case reflect.Float32, reflect.Float64, reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: return true default: return false } } func readAsScalar(rows driver.Rows) (*qbtypes.ScalarData, error) { colNames := rows.Columns() colTypes := rows.ColumnTypes() cd := make([]*qbtypes.ColumnDescriptor, len(colNames)) for i, name := range colNames { colType := qbtypes.ColumnTypeGroup if aggRe.MatchString(name) { colType = qbtypes.ColumnTypeAggregation } cd[i] = &qbtypes.ColumnDescriptor{ TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: name}, AggregationIndex: int64(i), Type: colType, } } var data [][]any for rows.Next() { scan := make([]any, len(colTypes)) for i := range scan { scan[i] = reflect.New(colTypes[i].ScanType()).Interface() } if err := rows.Scan(scan...); err != nil { return nil, err } // 2. deref each slot into the output row row := make([]any, len(scan)) for i, cell := range scan { valPtr := reflect.ValueOf(cell) if valPtr.Kind() == reflect.Pointer && !valPtr.IsNil() { row[i] = valPtr.Elem().Interface() } else { row[i] = nil // Nullable columns come back as nil pointers } } data = append(data, row) } if err := rows.Err(); err != nil { return nil, err } return &qbtypes.ScalarData{ Columns: cd, Data: data, }, nil } func readAsRaw(rows driver.Rows) (*qbtypes.RawData, error) { colNames := rows.Columns() colTypes := rows.ColumnTypes() colCnt := len(colNames) // Build a template slice of correctly-typed pointers once scanTpl := make([]any, colCnt) for i, ct := range colTypes { scanTpl[i] = reflect.New(ct.ScanType()).Interface() } var outRows []*qbtypes.RawRow for rows.Next() { // fresh copy of the scan slice (otherwise the driver reuses pointers) scan := make([]any, colCnt) for i := range scanTpl { scan[i] = reflect.New(colTypes[i].ScanType()).Interface() } if err := rows.Scan(scan...); err != nil { return nil, err } rr := qbtypes.RawRow{ Data: make(map[string]*any, colCnt), } for i, cellPtr := range scan { name := colNames[i] // de-reference the typed pointer to any val := reflect.ValueOf(cellPtr).Elem().Interface() // special-case: timestamp column if name == "timestamp" || name == "timestamp_datetime" { switch t := val.(type) { case time.Time: rr.Timestamp = t case uint64: // epoch-ns stored as integer rr.Timestamp = time.Unix(0, int64(t)) case int64: rr.Timestamp = time.Unix(0, t) default: // leave zero time if unrecognised } } // store value in map as *any, to match the schema v := any(val) rr.Data[name] = &v } outRows = append(outRows, &rr) } if err := rows.Err(); err != nil { return nil, err } return &qbtypes.RawData{ Rows: outRows, }, nil } func numericAsFloat(v any) float64 { switch x := v.(type) { case float64: return x case float32: return float64(x) case int64: return float64(x) case int32: return float64(x) case int16: return float64(x) case int8: return float64(x) case int: return float64(x) case uint64: return float64(x) case uint32: return float64(x) case uint16: return float64(x) case uint8: return float64(x) case uint: return float64(x) default: return math.NaN() } }