feat: add new params labelsArray to series (#3214)

This commit is contained in:
Vishal Sharma 2023-07-28 10:00:16 +05:30 committed by GitHub
parent 2c5c972801
commit b915f9ef7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 9 deletions

View File

@ -4128,7 +4128,7 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi
}
func readRow(vars []interface{}, columnNames []string) ([]string, map[string]string, v3.Point) {
func readRow(vars []interface{}, columnNames []string) ([]string, map[string]string, []map[string]string, v3.Point) {
// Each row will have a value and a timestamp, and an optional list of label values
// example: {Timestamp: ..., Value: ...}
// The timestamp may also not present in some cases where the time series is reduced to single value
@ -4138,6 +4138,7 @@ func readRow(vars []interface{}, columnNames []string) ([]string, map[string]str
// example: ["frontend", "/fetch"]
var groupBy []string
var groupAttributesArray []map[string]string
// groupAttributes is a container to hold the key-value pairs for the current
// metric point.
// example: {"serviceName": "frontend", "operation": "/fetch"}
@ -4156,10 +4157,16 @@ func readRow(vars []interface{}, columnNames []string) ([]string, map[string]str
}
for key, val := range metric {
groupBy = append(groupBy, val)
if _, ok := groupAttributes[key]; !ok {
groupAttributesArray = append(groupAttributesArray, map[string]string{key: val})
}
groupAttributes[key] = val
}
} else {
groupBy = append(groupBy, *v)
if _, ok := groupAttributes[colName]; !ok {
groupAttributesArray = append(groupAttributesArray, map[string]string{colName: *v})
}
groupAttributes[colName] = *v
}
case *time.Time:
@ -4169,6 +4176,9 @@ func readRow(vars []interface{}, columnNames []string) ([]string, map[string]str
point.Value = float64(reflect.ValueOf(v).Elem().Float())
} else {
groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Float()))
if _, ok := groupAttributes[colName]; !ok {
groupAttributesArray = append(groupAttributesArray, map[string]string{colName: fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Float())})
}
groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Float())
}
case *uint8, *uint64, *uint16, *uint32:
@ -4176,6 +4186,9 @@ func readRow(vars []interface{}, columnNames []string) ([]string, map[string]str
point.Value = float64(reflect.ValueOf(v).Elem().Uint())
} else {
groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint()))
if _, ok := groupAttributes[colName]; !ok {
groupAttributesArray = append(groupAttributesArray, map[string]string{colName: fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint())})
}
groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint())
}
case *int8, *int16, *int32, *int64:
@ -4183,17 +4196,23 @@ func readRow(vars []interface{}, columnNames []string) ([]string, map[string]str
point.Value = float64(reflect.ValueOf(v).Elem().Int())
} else {
groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int()))
if _, ok := groupAttributes[colName]; !ok {
groupAttributesArray = append(groupAttributesArray, map[string]string{colName: fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int())})
}
groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int())
}
case *bool:
groupBy = append(groupBy, fmt.Sprintf("%v", *v))
if _, ok := groupAttributes[colName]; !ok {
groupAttributesArray = append(groupAttributesArray, map[string]string{colName: fmt.Sprintf("%v", *v)})
}
groupAttributes[colName] = fmt.Sprintf("%v", *v)
default:
zap.S().Errorf("unsupported var type %v found in query builder query result for column %s", v, colName)
}
}
return groupBy, groupAttributes, point
return groupBy, groupAttributes, groupAttributesArray, point
}
func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNames []string) ([]*v3.Series, error) {
@ -4226,25 +4245,25 @@ func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNam
// "order,/order": {"serviceName": "order", "operation": "/order"},
// }
seriesToAttrs := make(map[string]map[string]string)
labelsArray := make(map[string][]map[string]string)
for rows.Next() {
if err := rows.Scan(vars...); err != nil {
return nil, err
}
groupBy, groupAttributes, metricPoint := readRow(vars, columnNames)
groupBy, groupAttributes, groupAttributesArray, metricPoint := readRow(vars, columnNames)
sort.Strings(groupBy)
key := strings.Join(groupBy, "")
if _, exists := seriesToAttrs[key]; !exists {
keys = append(keys, key)
}
seriesToAttrs[key] = groupAttributes
labelsArray[key] = groupAttributesArray
seriesToPoints[key] = append(seriesToPoints[key], metricPoint)
}
var seriesList []*v3.Series
for _, key := range keys {
points := seriesToPoints[key]
// find the grouping sets point for the series
// this is the point with the zero timestamp
// if there is no such point, then the series is not grouped
@ -4258,7 +4277,7 @@ func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNam
break
}
}
series := v3.Series{Labels: seriesToAttrs[key], Points: points, GroupingSetsPoint: groupingSetsPoint}
series := v3.Series{Labels: seriesToAttrs[key], Points: points, GroupingSetsPoint: groupingSetsPoint, LabelsArray: labelsArray[key]}
seriesList = append(seriesList, &series)
}
return seriesList, nil

View File

@ -593,9 +593,10 @@ type LogsLiveTailClient struct {
}
type Series struct {
Labels map[string]string `json:"labels"`
Points []Point `json:"values"`
GroupingSetsPoint *Point `json:"-"`
Labels map[string]string `json:"labels"`
LabelsArray []map[string]string `json:"labelsArray"`
Points []Point `json:"values"`
GroupingSetsPoint *Point `json:"-"`
}
func (s *Series) SortPoints() {