chore: add reduce to

This commit is contained in:
srikanthccv 2025-05-31 12:55:34 +05:30
parent c397f57956
commit 1af688e61b
2 changed files with 672 additions and 0 deletions

View File

@ -2,6 +2,8 @@ package querybuildertypesv5
import (
"encoding/json"
"math"
"sort"
"time"
"github.com/SigNoz/signoz/pkg/errors"
@ -135,6 +137,159 @@ var (
ReduceToMedian = ReduceTo{valuer.NewString("median")}
)
// FunctionReduceTo applies the reduceTo operator to a time series and returns a new series with the reduced value
// reduceTo can be one of: last, sum, avg, min, max, count, median
// if reduceTo is not recognized, the function returns the original series
func FunctionReduceTo(result *TimeSeries, reduceTo ReduceTo) *TimeSeries {
if len(result.Values) == 0 {
return result
}
var reducedValue float64
var reducedTimestamp int64
switch reduceTo {
case ReduceToLast:
// Take the last point's value and timestamp
lastPoint := result.Values[len(result.Values)-1]
reducedValue = lastPoint.Value
reducedTimestamp = lastPoint.Timestamp
case ReduceToSum:
// Sum all values, use last timestamp
var sum float64
for _, point := range result.Values {
if !math.IsNaN(point.Value) {
sum += point.Value
}
}
reducedValue = sum
reducedTimestamp = result.Values[len(result.Values)-1].Timestamp
case ReduceToAvg:
// Calculate average of all values, use last timestamp
var sum float64
var count int
for _, point := range result.Values {
if !math.IsNaN(point.Value) {
sum += point.Value
count++
}
}
if count > 0 {
reducedValue = sum / float64(count)
} else {
reducedValue = math.NaN()
}
reducedTimestamp = result.Values[len(result.Values)-1].Timestamp
case ReduceToMin:
// Find minimum value, use its timestamp
var min float64 = math.Inf(1)
var minTimestamp int64
for _, point := range result.Values {
if !math.IsNaN(point.Value) && point.Value < min {
min = point.Value
minTimestamp = point.Timestamp
}
}
if math.IsInf(min, 1) {
reducedValue = math.NaN()
reducedTimestamp = result.Values[len(result.Values)-1].Timestamp
} else {
reducedValue = min
reducedTimestamp = minTimestamp
}
case ReduceToMax:
// Find maximum value, use its timestamp
var max float64 = math.Inf(-1)
var maxTimestamp int64
for _, point := range result.Values {
if !math.IsNaN(point.Value) && point.Value > max {
max = point.Value
maxTimestamp = point.Timestamp
}
}
if math.IsInf(max, -1) {
reducedValue = math.NaN()
reducedTimestamp = result.Values[len(result.Values)-1].Timestamp
} else {
reducedValue = max
reducedTimestamp = maxTimestamp
}
case ReduceToCount:
// Count non-NaN values, use last timestamp
var count float64
for _, point := range result.Values {
if !math.IsNaN(point.Value) {
count++
}
}
reducedValue = count
reducedTimestamp = result.Values[len(result.Values)-1].Timestamp
case ReduceToMedian:
// Calculate median of all non-NaN values
// maintain pair of value and timestamp and sort by value
var values []struct {
Value float64
Timestamp int64
}
for _, point := range result.Values {
if !math.IsNaN(point.Value) {
values = append(values, struct {
Value float64
Timestamp int64
}{
Value: point.Value,
Timestamp: point.Timestamp,
})
}
}
if len(values) == 0 {
reducedValue = math.NaN()
reducedTimestamp = result.Values[len(result.Values)-1].Timestamp
} else {
sort.Slice(values, func(i, j int) bool {
return values[i].Value < values[j].Value
})
if len(values)%2 == 0 {
// Even number of values - average of middle two
mid := len(values) / 2
reducedValue = (values[mid-1].Value + values[mid].Value) / 2
reducedTimestamp = (values[mid-1].Timestamp + values[mid].Timestamp) / 2
} else {
// Odd number of values - middle value
reducedValue = values[len(values)/2].Value
reducedTimestamp = values[len(values)/2].Timestamp
}
}
case ReduceToUnknown:
fallthrough
default:
// No reduction, return original series
return result
}
// Create new TimeSeries with single reduced point
reducedSeries := &TimeSeries{
Labels: result.Labels, // Preserve original labels
Values: []*TimeSeriesValue{
{
Timestamp: reducedTimestamp,
Value: reducedValue,
},
},
}
return reducedSeries
}
type TraceAggregation struct {
// aggregation expression - example: count(), sum(item_price), countIf(day > 10)
Expression string `json:"expression"`

View File

@ -0,0 +1,517 @@
package querybuildertypesv5
import (
"math"
"testing"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/assert"
)
func TestFunctionReduceTo(t *testing.T) {
createLabels := func() []*Label {
return []*Label{
{
Key: telemetrytypes.TelemetryFieldKey{Name: "service", FieldDataType: telemetrytypes.FieldDataTypeString},
Value: "test-service",
},
}
}
createValues := func(pairs ...struct {
ts int64
val float64
}) []*TimeSeriesValue {
values := make([]*TimeSeriesValue, len(pairs))
for i, pair := range pairs {
values[i] = &TimeSeriesValue{
Timestamp: pair.ts,
Value: pair.val,
}
}
return values
}
t.Run("Empty series", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: []*TimeSeriesValue{},
}
result := FunctionReduceTo(ts, ReduceToSum)
assert.Equal(t, ts, result, "Empty series should return unchanged")
})
t.Run("ReduceToLast", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, 10.0},
struct {
ts int64
val float64
}{2000, 20.0},
struct {
ts int64
val float64
}{3000, 30.0},
),
}
result := FunctionReduceTo(ts, ReduceToLast)
assert.Len(t, result.Values, 1)
assert.Equal(t, int64(3000), result.Values[0].Timestamp)
assert.Equal(t, 30.0, result.Values[0].Value)
assert.Equal(t, ts.Labels, result.Labels)
})
t.Run("ReduceToSum", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, 10.0},
struct {
ts int64
val float64
}{2000, 20.0},
struct {
ts int64
val float64
}{3000, 30.0},
),
}
result := FunctionReduceTo(ts, ReduceToSum)
assert.Len(t, result.Values, 1)
assert.Equal(t, int64(3000), result.Values[0].Timestamp) // Last timestamp
assert.Equal(t, 60.0, result.Values[0].Value) // 10 + 20 + 30
})
t.Run("ReduceToSum with NaN values", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, 10.0},
struct {
ts int64
val float64
}{2000, math.NaN()},
struct {
ts int64
val float64
}{3000, 30.0},
),
}
result := FunctionReduceTo(ts, ReduceToSum)
assert.Len(t, result.Values, 1)
assert.Equal(t, int64(3000), result.Values[0].Timestamp)
assert.Equal(t, 40.0, result.Values[0].Value) // 10 + 30 (NaN skipped)
})
t.Run("ReduceToAvg", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, 10.0},
struct {
ts int64
val float64
}{2000, 20.0},
struct {
ts int64
val float64
}{3000, 30.0},
),
}
result := FunctionReduceTo(ts, ReduceToAvg)
assert.Len(t, result.Values, 1)
assert.Equal(t, int64(3000), result.Values[0].Timestamp)
assert.Equal(t, 20.0, result.Values[0].Value) // (10 + 20 + 30) / 3
})
t.Run("ReduceToAvg with all NaN values", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, math.NaN()},
struct {
ts int64
val float64
}{2000, math.NaN()},
),
}
result := FunctionReduceTo(ts, ReduceToAvg)
assert.Len(t, result.Values, 1)
assert.Equal(t, int64(2000), result.Values[0].Timestamp)
assert.True(t, math.IsNaN(result.Values[0].Value))
})
t.Run("ReduceToMin", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, 30.0},
struct {
ts int64
val float64
}{2000, 10.0}, // minimum
struct {
ts int64
val float64
}{3000, 20.0},
),
}
result := FunctionReduceTo(ts, ReduceToMin)
assert.Len(t, result.Values, 1)
assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Timestamp of minimum value
assert.Equal(t, 10.0, result.Values[0].Value)
})
t.Run("ReduceToMin with all NaN values", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, math.NaN()},
struct {
ts int64
val float64
}{2000, math.NaN()},
),
}
result := FunctionReduceTo(ts, ReduceToMin)
assert.Len(t, result.Values, 1)
assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Last timestamp
assert.True(t, math.IsNaN(result.Values[0].Value))
})
t.Run("ReduceToMax", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, 10.0},
struct {
ts int64
val float64
}{2000, 30.0}, // maximum
struct {
ts int64
val float64
}{3000, 20.0},
),
}
result := FunctionReduceTo(ts, ReduceToMax)
assert.Len(t, result.Values, 1)
assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Timestamp of maximum value
assert.Equal(t, 30.0, result.Values[0].Value)
})
t.Run("ReduceToMax with all NaN values", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, math.NaN()},
struct {
ts int64
val float64
}{2000, math.NaN()},
),
}
result := FunctionReduceTo(ts, ReduceToMax)
assert.Len(t, result.Values, 1)
assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Last timestamp
assert.True(t, math.IsNaN(result.Values[0].Value))
})
t.Run("ReduceToCount", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, 10.0},
struct {
ts int64
val float64
}{2000, math.NaN()},
struct {
ts int64
val float64
}{3000, 30.0},
struct {
ts int64
val float64
}{4000, 40.0},
),
}
result := FunctionReduceTo(ts, ReduceToCount)
assert.Len(t, result.Values, 1)
assert.Equal(t, int64(4000), result.Values[0].Timestamp) // Last timestamp
assert.Equal(t, 3.0, result.Values[0].Value) // 3 non-NaN values
})
t.Run("ReduceToMedian odd number of values", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, 30.0},
struct {
ts int64
val float64
}{2000, 10.0},
struct {
ts int64
val float64
}{3000, 20.0}, // median when sorted: 10, 20, 30
),
}
result := FunctionReduceTo(ts, ReduceToMedian)
assert.Len(t, result.Values, 1)
assert.Equal(t, 20.0, result.Values[0].Value) // Middle value
assert.Equal(t, int64(3000), result.Values[0].Timestamp) // Timestamp of median value
})
t.Run("ReduceToMedian even number of values", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, 40.0},
struct {
ts int64
val float64
}{2000, 10.0},
struct {
ts int64
val float64
}{3000, 30.0},
struct {
ts int64
val float64
}{4000, 20.0},
),
}
result := FunctionReduceTo(ts, ReduceToMedian)
assert.Len(t, result.Values, 1)
assert.Equal(t, 25.0, result.Values[0].Value) // (20 + 30) / 2 when sorted: 10, 20, 30, 40
expectedTimestamp := (int64(4000) + int64(3000)) / 2 // Average of middle timestamps
assert.Equal(t, expectedTimestamp, result.Values[0].Timestamp)
})
t.Run("ReduceToMedian with NaN values", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, 30.0},
struct {
ts int64
val float64
}{2000, math.NaN()},
struct {
ts int64
val float64
}{3000, 10.0},
struct {
ts int64
val float64
}{4000, 20.0},
),
}
result := FunctionReduceTo(ts, ReduceToMedian)
assert.Len(t, result.Values, 1)
assert.Equal(t, 20.0, result.Values[0].Value) // Median of [10, 20, 30]
assert.Equal(t, int64(4000), result.Values[0].Timestamp)
})
t.Run("ReduceToMedian with all NaN values", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, math.NaN()},
struct {
ts int64
val float64
}{2000, math.NaN()},
),
}
result := FunctionReduceTo(ts, ReduceToMedian)
assert.Len(t, result.Values, 1)
assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Last timestamp
assert.True(t, math.IsNaN(result.Values[0].Value))
})
t.Run("ReduceToUnknown", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, 10.0},
struct {
ts int64
val float64
}{2000, 20.0},
),
}
result := FunctionReduceTo(ts, ReduceToUnknown)
assert.Equal(t, ts, result, "Unknown reduce operation should return original series")
})
t.Run("Invalid ReduceTo", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, 10.0},
struct {
ts int64
val float64
}{2000, 20.0},
),
}
// Create an invalid ReduceTo value
invalidReduceTo := ReduceTo{valuer.NewString("invalid")}
result := FunctionReduceTo(ts, invalidReduceTo)
assert.Equal(t, ts, result, "Invalid reduce operation should return original series")
})
t.Run("Single value series", func(t *testing.T) {
ts := &TimeSeries{
Labels: createLabels(),
Values: createValues(
struct {
ts int64
val float64
}{1000, 42.0},
),
}
testCases := []struct {
reduceTo ReduceTo
expected float64
}{
{ReduceToLast, 42.0},
{ReduceToSum, 42.0},
{ReduceToAvg, 42.0},
{ReduceToMin, 42.0},
{ReduceToMax, 42.0},
{ReduceToCount, 1.0},
{ReduceToMedian, 42.0},
}
for _, tc := range testCases {
t.Run(tc.reduceTo.StringValue(), func(t *testing.T) {
result := FunctionReduceTo(ts, tc.reduceTo)
assert.Len(t, result.Values, 1)
assert.Equal(t, tc.expected, result.Values[0].Value)
assert.Equal(t, int64(1000), result.Values[0].Timestamp)
})
}
})
t.Run("Labels preservation", func(t *testing.T) {
originalLabels := []*Label{
{
Key: telemetrytypes.TelemetryFieldKey{Name: "service", FieldDataType: telemetrytypes.FieldDataTypeString},
Value: "test-service",
},
{
Key: telemetrytypes.TelemetryFieldKey{Name: "instance", FieldDataType: telemetrytypes.FieldDataTypeString},
Value: "test-instance",
},
}
ts := &TimeSeries{
Labels: originalLabels,
Values: createValues(
struct {
ts int64
val float64
}{1000, 10.0},
struct {
ts int64
val float64
}{2000, 20.0},
),
}
result := FunctionReduceTo(ts, ReduceToSum)
assert.Equal(t, originalLabels, result.Labels, "Labels should be preserved")
assert.Len(t, result.Labels, 2)
assert.Equal(t, "test-service", result.Labels[0].Value)
assert.Equal(t, "test-instance", result.Labels[1].Value)
})
}