From 1af688e61b102563bc05a2471c89966fabe137a9 Mon Sep 17 00:00:00 2001 From: srikanthccv Date: Sat, 31 May 2025 12:55:34 +0530 Subject: [PATCH] chore: add reduce to --- .../querybuildertypesv5/builder_elements.go | 155 ++++++ .../querybuildertypesv5/reduce_test.go | 517 ++++++++++++++++++ 2 files changed, 672 insertions(+) create mode 100644 pkg/types/querybuildertypes/querybuildertypesv5/reduce_test.go diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go b/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go index 3e503b49f7..3d1f5b0c0c 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go @@ -2,6 +2,8 @@ package querybuildertypesv5 import ( "encoding/json" + "math" + "sort" "time" "github.com/SigNoz/signoz/pkg/errors" @@ -135,6 +137,159 @@ var ( ReduceToMedian = ReduceTo{valuer.NewString("median")} ) +// FunctionReduceTo applies the reduceTo operator to a time series and returns a new series with the reduced value +// reduceTo can be one of: last, sum, avg, min, max, count, median +// if reduceTo is not recognized, the function returns the original series +func FunctionReduceTo(result *TimeSeries, reduceTo ReduceTo) *TimeSeries { + if len(result.Values) == 0 { + return result + } + + var reducedValue float64 + var reducedTimestamp int64 + + switch reduceTo { + case ReduceToLast: + // Take the last point's value and timestamp + lastPoint := result.Values[len(result.Values)-1] + reducedValue = lastPoint.Value + reducedTimestamp = lastPoint.Timestamp + + case ReduceToSum: + // Sum all values, use last timestamp + var sum float64 + for _, point := range result.Values { + if !math.IsNaN(point.Value) { + sum += point.Value + } + } + reducedValue = sum + reducedTimestamp = result.Values[len(result.Values)-1].Timestamp + + case ReduceToAvg: + // Calculate average of all values, use last timestamp + var sum float64 + var count int + for _, point := range result.Values { + if !math.IsNaN(point.Value) { + sum += point.Value + count++ + } + } + if count > 0 { + reducedValue = sum / float64(count) + } else { + reducedValue = math.NaN() + } + reducedTimestamp = result.Values[len(result.Values)-1].Timestamp + + case ReduceToMin: + // Find minimum value, use its timestamp + var min float64 = math.Inf(1) + var minTimestamp int64 + for _, point := range result.Values { + if !math.IsNaN(point.Value) && point.Value < min { + min = point.Value + minTimestamp = point.Timestamp + } + } + if math.IsInf(min, 1) { + reducedValue = math.NaN() + reducedTimestamp = result.Values[len(result.Values)-1].Timestamp + } else { + reducedValue = min + reducedTimestamp = minTimestamp + } + + case ReduceToMax: + // Find maximum value, use its timestamp + var max float64 = math.Inf(-1) + var maxTimestamp int64 + for _, point := range result.Values { + if !math.IsNaN(point.Value) && point.Value > max { + max = point.Value + maxTimestamp = point.Timestamp + } + } + if math.IsInf(max, -1) { + reducedValue = math.NaN() + reducedTimestamp = result.Values[len(result.Values)-1].Timestamp + } else { + reducedValue = max + reducedTimestamp = maxTimestamp + } + + case ReduceToCount: + // Count non-NaN values, use last timestamp + var count float64 + for _, point := range result.Values { + if !math.IsNaN(point.Value) { + count++ + } + } + reducedValue = count + reducedTimestamp = result.Values[len(result.Values)-1].Timestamp + + case ReduceToMedian: + // Calculate median of all non-NaN values + // maintain pair of value and timestamp and sort by value + var values []struct { + Value float64 + Timestamp int64 + } + for _, point := range result.Values { + if !math.IsNaN(point.Value) { + values = append(values, struct { + Value float64 + Timestamp int64 + }{ + Value: point.Value, + Timestamp: point.Timestamp, + }) + } + } + + if len(values) == 0 { + reducedValue = math.NaN() + reducedTimestamp = result.Values[len(result.Values)-1].Timestamp + } else { + sort.Slice(values, func(i, j int) bool { + return values[i].Value < values[j].Value + }) + + if len(values)%2 == 0 { + // Even number of values - average of middle two + mid := len(values) / 2 + reducedValue = (values[mid-1].Value + values[mid].Value) / 2 + reducedTimestamp = (values[mid-1].Timestamp + values[mid].Timestamp) / 2 + } else { + // Odd number of values - middle value + reducedValue = values[len(values)/2].Value + reducedTimestamp = values[len(values)/2].Timestamp + } + } + + case ReduceToUnknown: + fallthrough + default: + // No reduction, return original series + return result + } + + // Create new TimeSeries with single reduced point + reducedSeries := &TimeSeries{ + Labels: result.Labels, // Preserve original labels + Values: []*TimeSeriesValue{ + { + Timestamp: reducedTimestamp, + Value: reducedValue, + }, + }, + } + + return reducedSeries +} + type TraceAggregation struct { // aggregation expression - example: count(), sum(item_price), countIf(day > 10) Expression string `json:"expression"` diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/reduce_test.go b/pkg/types/querybuildertypes/querybuildertypesv5/reduce_test.go new file mode 100644 index 0000000000..12e1898dcc --- /dev/null +++ b/pkg/types/querybuildertypes/querybuildertypesv5/reduce_test.go @@ -0,0 +1,517 @@ +package querybuildertypesv5 + +import ( + "math" + "testing" + + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/SigNoz/signoz/pkg/valuer" + "github.com/stretchr/testify/assert" +) + +func TestFunctionReduceTo(t *testing.T) { + createLabels := func() []*Label { + return []*Label{ + { + Key: telemetrytypes.TelemetryFieldKey{Name: "service", FieldDataType: telemetrytypes.FieldDataTypeString}, + Value: "test-service", + }, + } + } + + createValues := func(pairs ...struct { + ts int64 + val float64 + }) []*TimeSeriesValue { + values := make([]*TimeSeriesValue, len(pairs)) + for i, pair := range pairs { + values[i] = &TimeSeriesValue{ + Timestamp: pair.ts, + Value: pair.val, + } + } + return values + } + + t.Run("Empty series", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: []*TimeSeriesValue{}, + } + + result := FunctionReduceTo(ts, ReduceToSum) + assert.Equal(t, ts, result, "Empty series should return unchanged") + }) + + t.Run("ReduceToLast", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, 20.0}, + struct { + ts int64 + val float64 + }{3000, 30.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToLast) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(3000), result.Values[0].Timestamp) + assert.Equal(t, 30.0, result.Values[0].Value) + assert.Equal(t, ts.Labels, result.Labels) + }) + + t.Run("ReduceToSum", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, 20.0}, + struct { + ts int64 + val float64 + }{3000, 30.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToSum) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(3000), result.Values[0].Timestamp) // Last timestamp + assert.Equal(t, 60.0, result.Values[0].Value) // 10 + 20 + 30 + }) + + t.Run("ReduceToSum with NaN values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, math.NaN()}, + struct { + ts int64 + val float64 + }{3000, 30.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToSum) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(3000), result.Values[0].Timestamp) + assert.Equal(t, 40.0, result.Values[0].Value) // 10 + 30 (NaN skipped) + }) + + t.Run("ReduceToAvg", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, 20.0}, + struct { + ts int64 + val float64 + }{3000, 30.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToAvg) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(3000), result.Values[0].Timestamp) + assert.Equal(t, 20.0, result.Values[0].Value) // (10 + 20 + 30) / 3 + }) + + t.Run("ReduceToAvg with all NaN values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, math.NaN()}, + struct { + ts int64 + val float64 + }{2000, math.NaN()}, + ), + } + + result := FunctionReduceTo(ts, ReduceToAvg) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(2000), result.Values[0].Timestamp) + assert.True(t, math.IsNaN(result.Values[0].Value)) + }) + + t.Run("ReduceToMin", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 30.0}, + struct { + ts int64 + val float64 + }{2000, 10.0}, // minimum + struct { + ts int64 + val float64 + }{3000, 20.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToMin) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Timestamp of minimum value + assert.Equal(t, 10.0, result.Values[0].Value) + }) + + t.Run("ReduceToMin with all NaN values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, math.NaN()}, + struct { + ts int64 + val float64 + }{2000, math.NaN()}, + ), + } + + result := FunctionReduceTo(ts, ReduceToMin) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Last timestamp + assert.True(t, math.IsNaN(result.Values[0].Value)) + }) + + t.Run("ReduceToMax", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, 30.0}, // maximum + struct { + ts int64 + val float64 + }{3000, 20.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToMax) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Timestamp of maximum value + assert.Equal(t, 30.0, result.Values[0].Value) + }) + + t.Run("ReduceToMax with all NaN values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, math.NaN()}, + struct { + ts int64 + val float64 + }{2000, math.NaN()}, + ), + } + + result := FunctionReduceTo(ts, ReduceToMax) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Last timestamp + assert.True(t, math.IsNaN(result.Values[0].Value)) + }) + + t.Run("ReduceToCount", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, math.NaN()}, + struct { + ts int64 + val float64 + }{3000, 30.0}, + struct { + ts int64 + val float64 + }{4000, 40.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToCount) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(4000), result.Values[0].Timestamp) // Last timestamp + assert.Equal(t, 3.0, result.Values[0].Value) // 3 non-NaN values + }) + + t.Run("ReduceToMedian odd number of values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 30.0}, + struct { + ts int64 + val float64 + }{2000, 10.0}, + struct { + ts int64 + val float64 + }{3000, 20.0}, // median when sorted: 10, 20, 30 + ), + } + + result := FunctionReduceTo(ts, ReduceToMedian) + + assert.Len(t, result.Values, 1) + assert.Equal(t, 20.0, result.Values[0].Value) // Middle value + assert.Equal(t, int64(3000), result.Values[0].Timestamp) // Timestamp of median value + }) + + t.Run("ReduceToMedian even number of values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 40.0}, + struct { + ts int64 + val float64 + }{2000, 10.0}, + struct { + ts int64 + val float64 + }{3000, 30.0}, + struct { + ts int64 + val float64 + }{4000, 20.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToMedian) + + assert.Len(t, result.Values, 1) + assert.Equal(t, 25.0, result.Values[0].Value) // (20 + 30) / 2 when sorted: 10, 20, 30, 40 + expectedTimestamp := (int64(4000) + int64(3000)) / 2 // Average of middle timestamps + assert.Equal(t, expectedTimestamp, result.Values[0].Timestamp) + }) + + t.Run("ReduceToMedian with NaN values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 30.0}, + struct { + ts int64 + val float64 + }{2000, math.NaN()}, + struct { + ts int64 + val float64 + }{3000, 10.0}, + struct { + ts int64 + val float64 + }{4000, 20.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToMedian) + + assert.Len(t, result.Values, 1) + assert.Equal(t, 20.0, result.Values[0].Value) // Median of [10, 20, 30] + assert.Equal(t, int64(4000), result.Values[0].Timestamp) + }) + + t.Run("ReduceToMedian with all NaN values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, math.NaN()}, + struct { + ts int64 + val float64 + }{2000, math.NaN()}, + ), + } + + result := FunctionReduceTo(ts, ReduceToMedian) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Last timestamp + assert.True(t, math.IsNaN(result.Values[0].Value)) + }) + + t.Run("ReduceToUnknown", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, 20.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToUnknown) + + assert.Equal(t, ts, result, "Unknown reduce operation should return original series") + }) + + t.Run("Invalid ReduceTo", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, 20.0}, + ), + } + + // Create an invalid ReduceTo value + invalidReduceTo := ReduceTo{valuer.NewString("invalid")} + result := FunctionReduceTo(ts, invalidReduceTo) + + assert.Equal(t, ts, result, "Invalid reduce operation should return original series") + }) + + t.Run("Single value series", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 42.0}, + ), + } + + testCases := []struct { + reduceTo ReduceTo + expected float64 + }{ + {ReduceToLast, 42.0}, + {ReduceToSum, 42.0}, + {ReduceToAvg, 42.0}, + {ReduceToMin, 42.0}, + {ReduceToMax, 42.0}, + {ReduceToCount, 1.0}, + {ReduceToMedian, 42.0}, + } + + for _, tc := range testCases { + t.Run(tc.reduceTo.StringValue(), func(t *testing.T) { + result := FunctionReduceTo(ts, tc.reduceTo) + assert.Len(t, result.Values, 1) + assert.Equal(t, tc.expected, result.Values[0].Value) + assert.Equal(t, int64(1000), result.Values[0].Timestamp) + }) + } + }) + + t.Run("Labels preservation", func(t *testing.T) { + originalLabels := []*Label{ + { + Key: telemetrytypes.TelemetryFieldKey{Name: "service", FieldDataType: telemetrytypes.FieldDataTypeString}, + Value: "test-service", + }, + { + Key: telemetrytypes.TelemetryFieldKey{Name: "instance", FieldDataType: telemetrytypes.FieldDataTypeString}, + Value: "test-instance", + }, + } + + ts := &TimeSeries{ + Labels: originalLabels, + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, 20.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToSum) + + assert.Equal(t, originalLabels, result.Labels, "Labels should be preserved") + assert.Len(t, result.Labels, 2) + assert.Equal(t, "test-service", result.Labels[0].Value) + assert.Equal(t, "test-instance", result.Labels[1].Value) + }) +}