diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go b/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go index eec4248ec7..c88c05c5b9 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go @@ -2,6 +2,8 @@ package querybuildertypesv5 import ( "encoding/json" + "math" + "slices" "time" "github.com/SigNoz/signoz/pkg/errors" @@ -135,6 +137,168 @@ var ( ReduceToMedian = ReduceTo{valuer.NewString("median")} ) +// FunctionReduceTo applies the reduceTo operator to a time series and returns a new series with the reduced value +// reduceTo can be one of: last, sum, avg, min, max, count, median +// if reduceTo is not recognized, the function returns the original series +func FunctionReduceTo(result *TimeSeries, reduceTo ReduceTo) *TimeSeries { + if len(result.Values) == 0 { + return result + } + + var reducedValue float64 + var reducedTimestamp int64 + + switch reduceTo { + case ReduceToLast: + // Take the last point's value and timestamp + lastPoint := result.Values[len(result.Values)-1] + reducedValue = lastPoint.Value + reducedTimestamp = lastPoint.Timestamp + + case ReduceToSum: + // Sum all values, use last timestamp + var sum float64 + for _, point := range result.Values { + if !math.IsNaN(point.Value) { + sum += point.Value + } + } + reducedValue = sum + reducedTimestamp = result.Values[len(result.Values)-1].Timestamp + + case ReduceToAvg: + // Calculate average of all values, use last timestamp + var sum float64 + var count int + for _, point := range result.Values { + if !math.IsNaN(point.Value) { + sum += point.Value + count++ + } + } + if count > 0 { + reducedValue = sum / float64(count) + } else { + reducedValue = math.NaN() + } + reducedTimestamp = result.Values[len(result.Values)-1].Timestamp + + case ReduceToMin: + // Find minimum value, use its timestamp + var min float64 = math.Inf(1) + var minTimestamp int64 + for _, point := range result.Values { + if !math.IsNaN(point.Value) && point.Value < min { + min = point.Value + minTimestamp = point.Timestamp + } + } + if math.IsInf(min, 1) { + reducedValue = math.NaN() + reducedTimestamp = result.Values[len(result.Values)-1].Timestamp + } else { + reducedValue = min + reducedTimestamp = minTimestamp + } + + case ReduceToMax: + // Find maximum value, use its timestamp + var max float64 = math.Inf(-1) + var maxTimestamp int64 + for _, point := range result.Values { + if !math.IsNaN(point.Value) && point.Value > max { + max = point.Value + maxTimestamp = point.Timestamp + } + } + if math.IsInf(max, -1) { + reducedValue = math.NaN() + reducedTimestamp = result.Values[len(result.Values)-1].Timestamp + } else { + reducedValue = max + reducedTimestamp = maxTimestamp + } + + case ReduceToCount: + // Count non-NaN values, use last timestamp + var count float64 + for _, point := range result.Values { + if !math.IsNaN(point.Value) { + count++ + } + } + reducedValue = count + reducedTimestamp = result.Values[len(result.Values)-1].Timestamp + + case ReduceToMedian: + // Calculate median of all non-NaN values + // maintain pair of value and timestamp and sort by value + var values []struct { + Value float64 + Timestamp int64 + } + for _, point := range result.Values { + if !math.IsNaN(point.Value) { + values = append(values, struct { + Value float64 + Timestamp int64 + }{ + Value: point.Value, + Timestamp: point.Timestamp, + }) + } + } + + if len(values) == 0 { + reducedValue = math.NaN() + reducedTimestamp = result.Values[len(result.Values)-1].Timestamp + } else { + slices.SortFunc(values, func(i, j struct { + Value float64 + Timestamp int64 + }) int { + if i.Value < j.Value { + return -1 + } + if i.Value > j.Value { + return 1 + } + return 0 + }) + + if len(values)%2 == 0 { + // Even number of values - average of middle two + mid := len(values) / 2 + reducedValue = (values[mid-1].Value + values[mid].Value) / 2 + reducedTimestamp = (values[mid-1].Timestamp + values[mid].Timestamp) / 2 + } else { + // Odd number of values - middle value + reducedValue = values[len(values)/2].Value + reducedTimestamp = values[len(values)/2].Timestamp + } + } + + case ReduceToUnknown: + fallthrough + default: + // No reduction, return original series + return result + } + + // Create new TimeSeries with single reduced point + reducedSeries := &TimeSeries{ + Labels: result.Labels, // Preserve original labels + Values: []*TimeSeriesValue{ + { + Timestamp: reducedTimestamp, + Value: reducedValue, + }, + }, + } + + return reducedSeries +} + type TraceAggregation struct { // aggregation expression - example: count(), sum(item_price), countIf(day > 10) Expression string `json:"expression"` @@ -205,17 +369,19 @@ type SecondaryAggregation struct { LimitBy LimitBy `json:"limitBy,omitempty"` } +type FunctionArg struct { + // name of the argument + Name string `json:"name,omitempty"` + // value of the argument + Value string `json:"value"` +} + type Function struct { // name of the function - Name string `json:"name"` + Name FunctionName `json:"name"` // args is the arguments to the function - Args []struct { - // name of the argument - Name string `json:"name,omitempty"` - // value of the argument - Value string `json:"value"` - } `json:"args,omitempty"` + Args []FunctionArg `json:"args,omitempty"` } type LimitBy struct { diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/functions.go b/pkg/types/querybuildertypes/querybuildertypesv5/functions.go index a3c58349c6..f0f70e64ef 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/functions.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/functions.go @@ -1,27 +1,348 @@ package querybuildertypesv5 -import "github.com/SigNoz/signoz/pkg/valuer" +import ( + "math" + "slices" + "strconv" + + "github.com/SigNoz/signoz/pkg/valuer" +) type FunctionName struct { valuer.String } var ( - FunctionNameCutOffMin = FunctionName{valuer.NewString("cutOffMin")} - FunctionNameCutOffMax = FunctionName{valuer.NewString("cutOffMax")} - FunctionNameClampMin = FunctionName{valuer.NewString("clampMin")} - FunctionNameClampMax = FunctionName{valuer.NewString("clampMax")} - FunctionNameAbsolute = FunctionName{valuer.NewString("absolute")} - FunctionNameRunningDiff = FunctionName{valuer.NewString("runningDiff")} - FunctionNameLog2 = FunctionName{valuer.NewString("log2")} - FunctionNameLog10 = FunctionName{valuer.NewString("log10")} - FunctionNameCumSum = FunctionName{valuer.NewString("cumSum")} - FunctionNameEWMA3 = FunctionName{valuer.NewString("ewma3")} - FunctionNameEWMA5 = FunctionName{valuer.NewString("ewma5")} - FunctionNameEWMA7 = FunctionName{valuer.NewString("ewma7")} - FunctionNameMedian3 = FunctionName{valuer.NewString("median3")} - FunctionNameMedian5 = FunctionName{valuer.NewString("median5")} - FunctionNameMedian7 = FunctionName{valuer.NewString("median7")} - FunctionNameTimeShift = FunctionName{valuer.NewString("timeShift")} - FunctionNameAnomaly = FunctionName{valuer.NewString("anomaly")} + FunctionNameCutOffMin = FunctionName{valuer.NewString("cutOffMin")} + FunctionNameCutOffMax = FunctionName{valuer.NewString("cutOffMax")} + FunctionNameClampMin = FunctionName{valuer.NewString("clampMin")} + FunctionNameClampMax = FunctionName{valuer.NewString("clampMax")} + FunctionNameAbsolute = FunctionName{valuer.NewString("absolute")} + FunctionNameRunningDiff = FunctionName{valuer.NewString("runningDiff")} + FunctionNameLog2 = FunctionName{valuer.NewString("log2")} + FunctionNameLog10 = FunctionName{valuer.NewString("log10")} + FunctionNameCumulativeSum = FunctionName{valuer.NewString("cumulativeSum")} + FunctionNameEWMA3 = FunctionName{valuer.NewString("ewma3")} + FunctionNameEWMA5 = FunctionName{valuer.NewString("ewma5")} + FunctionNameEWMA7 = FunctionName{valuer.NewString("ewma7")} + FunctionNameMedian3 = FunctionName{valuer.NewString("median3")} + FunctionNameMedian5 = FunctionName{valuer.NewString("median5")} + FunctionNameMedian7 = FunctionName{valuer.NewString("median7")} + FunctionNameTimeShift = FunctionName{valuer.NewString("timeShift")} + FunctionNameAnomaly = FunctionName{valuer.NewString("anomaly")} ) + +// ApplyFunction applies the given function to the result data +func ApplyFunction(fn Function, result *TimeSeries) *TimeSeries { + // Extract the function name and arguments + name := fn.Name + args := fn.Args + + switch name { + case FunctionNameCutOffMin, FunctionNameCutOffMax, FunctionNameClampMin, FunctionNameClampMax: + if len(args) == 0 { + return result + } + threshold, err := parseFloat64Arg(args[0].Value) + if err != nil { + return result + } + switch name { + case FunctionNameCutOffMin: + return funcCutOffMin(result, threshold) + case FunctionNameCutOffMax: + return funcCutOffMax(result, threshold) + case FunctionNameClampMin: + return funcClampMin(result, threshold) + case FunctionNameClampMax: + return funcClampMax(result, threshold) + } + case FunctionNameAbsolute: + return funcAbsolute(result) + case FunctionNameRunningDiff: + return funcRunningDiff(result) + case FunctionNameLog2: + return funcLog2(result) + case FunctionNameLog10: + return funcLog10(result) + case FunctionNameCumulativeSum: + return funcCumulativeSum(result) + case FunctionNameEWMA3, FunctionNameEWMA5, FunctionNameEWMA7: + alpha := getEWMAAlpha(name, args) + return funcEWMA(result, alpha) + case FunctionNameMedian3: + return funcMedian3(result) + case FunctionNameMedian5: + return funcMedian5(result) + case FunctionNameMedian7: + return funcMedian7(result) + case FunctionNameTimeShift: + if len(args) == 0 { + return result + } + shift, err := parseFloat64Arg(args[0].Value) + if err != nil { + return result + } + return funcTimeShift(result, shift) + case FunctionNameAnomaly: + // Placeholder for anomaly detection as function that can be used in dashboards other than + // the anomaly alert + return result + } + return result +} + +// parseFloat64Arg parses a string argument to float64 +func parseFloat64Arg(value string) (float64, error) { + return strconv.ParseFloat(value, 64) +} + +// getEWMAAlpha calculates the alpha value for EWMA functions +func getEWMAAlpha(name FunctionName, args []FunctionArg) float64 { + // Try to get alpha from arguments first + if len(args) > 0 { + if alpha, err := parseFloat64Arg(args[0].Value); err == nil { + return alpha + } + } + + // Default alpha values: alpha = 2 / (n + 1) where n is the window size + switch name { + case FunctionNameEWMA3: + return 0.5 // 2 / (3 + 1) + case FunctionNameEWMA5: + return 1.0 / 3.0 // 2 / (5 + 1) + case FunctionNameEWMA7: + return 0.25 // 2 / (7 + 1) + } + return 0.5 // default +} + +// funcCutOffMin cuts off values below the threshold and replaces them with NaN +func funcCutOffMin(result *TimeSeries, threshold float64) *TimeSeries { + for idx, point := range result.Values { + if point.Value < threshold { + point.Value = math.NaN() + } + result.Values[idx] = point + } + return result +} + +// funcCutOffMax cuts off values above the threshold and replaces them with NaN +func funcCutOffMax(result *TimeSeries, threshold float64) *TimeSeries { + for idx, point := range result.Values { + if point.Value > threshold { + point.Value = math.NaN() + } + result.Values[idx] = point + } + return result +} + +// funcClampMin cuts off values below the threshold and replaces them with the threshold +func funcClampMin(result *TimeSeries, threshold float64) *TimeSeries { + for idx, point := range result.Values { + if point.Value < threshold { + point.Value = threshold + } + result.Values[idx] = point + } + return result +} + +// funcClampMax cuts off values above the threshold and replaces them with the threshold +func funcClampMax(result *TimeSeries, threshold float64) *TimeSeries { + for idx, point := range result.Values { + if point.Value > threshold { + point.Value = threshold + } + result.Values[idx] = point + } + return result +} + +// funcAbsolute returns the absolute value of each point +func funcAbsolute(result *TimeSeries) *TimeSeries { + for idx, point := range result.Values { + point.Value = math.Abs(point.Value) + result.Values[idx] = point + } + return result +} + +// funcRunningDiff returns the running difference of each point +func funcRunningDiff(result *TimeSeries) *TimeSeries { + // iterate over the points in reverse order + for idx := len(result.Values) - 1; idx >= 0; idx-- { + if idx > 0 { + result.Values[idx].Value = result.Values[idx].Value - result.Values[idx-1].Value + } + } + // remove the first point + result.Values = result.Values[1:] + return result +} + +// funcLog2 returns the log2 of each point +func funcLog2(result *TimeSeries) *TimeSeries { + for idx, point := range result.Values { + point.Value = math.Log2(point.Value) + result.Values[idx] = point + } + return result +} + +// funcLog10 returns the log10 of each point +func funcLog10(result *TimeSeries) *TimeSeries { + for idx, point := range result.Values { + point.Value = math.Log10(point.Value) + result.Values[idx] = point + } + return result +} + +// funcCumulativeSum returns the cumulative sum for each point in a series +func funcCumulativeSum(result *TimeSeries) *TimeSeries { + var sum float64 + for idx, point := range result.Values { + if !math.IsNaN(point.Value) { + sum += point.Value + } + point.Value = sum + result.Values[idx] = point + } + + return result +} + +// funcEWMA calculates the Exponentially Weighted Moving Average +func funcEWMA(result *TimeSeries, alpha float64) *TimeSeries { + var ewma float64 + var initialized bool + + for i, point := range result.Values { + if !initialized { + if !math.IsNaN(point.Value) { + // Initialize EWMA with the first non-NaN value + ewma = point.Value + initialized = true + } + // Continue until the EWMA is initialized + continue + } + + if !math.IsNaN(point.Value) { + // Update EWMA with the current value + ewma = alpha*point.Value + (1-alpha)*ewma + } + // Set the EWMA value for the current point + result.Values[i].Value = ewma + } + return result +} + +// funcMedian3 returns the median of 3 points for each point in a series +func funcMedian3(result *TimeSeries) *TimeSeries { + return funcMedianN(result, 3) +} + +// funcMedian5 returns the median of 5 points for each point in a series +func funcMedian5(result *TimeSeries) *TimeSeries { + return funcMedianN(result, 5) +} + +// funcMedian7 returns the median of 7 points for each point in a series +func funcMedian7(result *TimeSeries) *TimeSeries { + return funcMedianN(result, 7) +} + +// funcMedianN returns the median of N points for each point in a series +func funcMedianN(result *TimeSeries, n int) *TimeSeries { + if len(result.Values) == 0 { + return result + } + + // For series shorter than window size, return original values + if len(result.Values) < n { + return result + } + + halfWindow := n / 2 + newValues := make([]*TimeSeriesValue, len(result.Values)) + + // Copy edge values that can't have a full window + for i := 0; i < halfWindow; i++ { + newValues[i] = &TimeSeriesValue{ + Timestamp: result.Values[i].Timestamp, + Value: result.Values[i].Value, + } + } + for i := len(result.Values) - halfWindow; i < len(result.Values); i++ { + newValues[i] = &TimeSeriesValue{ + Timestamp: result.Values[i].Timestamp, + Value: result.Values[i].Value, + } + } + + // Calculate median for points that have a full window + for i := halfWindow; i < len(result.Values)-halfWindow; i++ { + values := make([]float64, 0, n) + + // Add non-NaN values to the slice + for j := -halfWindow; j <= halfWindow; j++ { + if !math.IsNaN(result.Values[i+j].Value) { + values = append(values, result.Values[i+j].Value) + } + } + + newValues[i] = &TimeSeriesValue{ + Timestamp: result.Values[i].Timestamp, + } + + // Handle the case where there are not enough values to calculate a median + if len(values) == 0 { + newValues[i].Value = math.NaN() + } else { + newValues[i].Value = median(values) + } + } + + result.Values = newValues + return result +} + +// median calculates the median of a slice of float64 values +func median(values []float64) float64 { + if len(values) == 0 { + return math.NaN() + } + + slices.Sort(values) + medianIndex := len(values) / 2 + if len(values)%2 == 0 { + return (values[medianIndex-1] + values[medianIndex]) / 2 + } + return values[medianIndex] +} + +// funcTimeShift shifts all timestamps by the given amount (in seconds) +func funcTimeShift(result *TimeSeries, shift float64) *TimeSeries { + shiftMs := int64(shift * 1000) // Convert seconds to milliseconds + + for idx, point := range result.Values { + point.Timestamp = point.Timestamp + shiftMs + result.Values[idx] = point + } + + return result +} + +// ApplyFunctions applies a list of functions sequentially to the result +func ApplyFunctions(functions []Function, result *TimeSeries) *TimeSeries { + for _, fn := range functions { + result = ApplyFunction(fn, result) + } + return result +} diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/functions_test.go b/pkg/types/querybuildertypes/querybuildertypesv5/functions_test.go new file mode 100644 index 0000000000..277587d0e8 --- /dev/null +++ b/pkg/types/querybuildertypes/querybuildertypesv5/functions_test.go @@ -0,0 +1,679 @@ +package querybuildertypesv5 + +import ( + "math" + "testing" +) + +// Helper function to create test time series data +func createTestTimeSeriesData(values []float64) *TimeSeries { + timeSeriesValues := make([]*TimeSeriesValue, len(values)) + for i, val := range values { + timeSeriesValues[i] = &TimeSeriesValue{ + Timestamp: int64(i + 1), + Value: val, + } + } + + series := &TimeSeries{ + Values: timeSeriesValues, + } + + return series +} + +// Helper function to extract values from result for comparison +func extractValues(result *TimeSeries) []float64 { + values := make([]float64, len(result.Values)) + for i, point := range result.Values { + values[i] = point.Value + } + return values +} + +func TestFuncCutOffMin(t *testing.T) { + tests := []struct { + name string + values []float64 + threshold float64 + want []float64 + }{ + { + name: "test funcCutOffMin", + values: []float64{0.5, 0.4, 0.3, 0.2, 0.1}, + threshold: 0.3, + want: []float64{0.5, 0.4, 0.3, math.NaN(), math.NaN()}, + }, + { + name: "test funcCutOffMin with threshold 0", + values: []float64{0.5, 0.4, 0.3, 0.2, 0.1}, + threshold: 0, + want: []float64{0.5, 0.4, 0.3, 0.2, 0.1}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createTestTimeSeriesData(tt.values) + newResult := funcCutOffMin(result, tt.threshold) + got := extractValues(newResult) + + if len(got) != len(tt.want) { + t.Errorf("funcCutOffMin() got length %d, want length %d", len(got), len(tt.want)) + return + } + + for i := range got { + if math.IsNaN(tt.want[i]) { + if !math.IsNaN(got[i]) { + t.Errorf("funcCutOffMin() at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } else { + if got[i] != tt.want[i] { + t.Errorf("funcCutOffMin() at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } + } + }) + } +} + +func TestFuncCutOffMax(t *testing.T) { + tests := []struct { + name string + values []float64 + threshold float64 + want []float64 + }{ + { + name: "test funcCutOffMax", + values: []float64{0.5, 0.4, 0.3, 0.2, 0.1}, + threshold: 0.3, + want: []float64{math.NaN(), math.NaN(), 0.3, 0.2, 0.1}, + }, + { + name: "test funcCutOffMax with threshold 0", + values: []float64{0.5, 0.4, 0.3, 0.2, 0.1}, + threshold: 0, + want: []float64{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createTestTimeSeriesData(tt.values) + newResult := funcCutOffMax(result, tt.threshold) + got := extractValues(newResult) + + if len(got) != len(tt.want) { + t.Errorf("funcCutOffMax() got length %d, want length %d", len(got), len(tt.want)) + return + } + + for i := range got { + if math.IsNaN(tt.want[i]) { + if !math.IsNaN(got[i]) { + t.Errorf("funcCutOffMax() at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } else { + if got[i] != tt.want[i] { + t.Errorf("funcCutOffMax() at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } + } + }) + } +} + +func TestCutOffMinCumSum(t *testing.T) { + tests := []struct { + name string + values []float64 + threshold float64 + want []float64 + }{ + { + name: "test funcCutOffMin followed by funcCumulativeSum", + values: []float64{0.5, 0.2, 0.1, 0.4, 0.3}, + threshold: 0.3, + want: []float64{0.5, 0.5, 0.5, 0.9, 1.2}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createTestTimeSeriesData(tt.values) + newResult := funcCutOffMin(result, tt.threshold) + newResult = funcCumulativeSum(newResult) + got := extractValues(newResult) + + if len(got) != len(tt.want) { + t.Errorf("CutOffMin+CumSum got length %d, want length %d", len(got), len(tt.want)) + return + } + + for i := range got { + if math.IsNaN(tt.want[i]) { + if !math.IsNaN(got[i]) { + t.Errorf("CutOffMin+CumSum at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } else { + if got[i] != tt.want[i] { + t.Errorf("CutOffMin+CumSum at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } + } + }) + } +} + +func TestFuncMedian3(t *testing.T) { + tests := []struct { + name string + values []float64 + want []float64 + }{ + { + name: "Values", + values: []float64{5, 3, 8, 2, 7}, + want: []float64{5, 5, 3, 7, 7}, // edge values unchanged, middle values are median of 3 + }, + { + name: "NaNHandling", + values: []float64{math.NaN(), 3, math.NaN(), 7, 9}, + want: []float64{math.NaN(), 3, 5, 8, 9}, // median of available values + }, + { + name: "UniformValues", + values: []float64{7, 7, 7, 7, 7}, + want: []float64{7, 7, 7, 7, 7}, + }, + { + name: "SingleValueSeries", + values: []float64{9}, + want: []float64{9}, + }, + { + name: "EmptySeries", + values: []float64{}, + want: []float64{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createTestTimeSeriesData(tt.values) + got := funcMedian3(result) + gotValues := extractValues(got) + + if len(gotValues) != len(tt.want) { + t.Errorf("funcMedian3() got length %d, want length %d", len(gotValues), len(tt.want)) + return + } + + for i := range gotValues { + if math.IsNaN(tt.want[i]) { + if !math.IsNaN(gotValues[i]) { + t.Errorf("funcMedian3() at index %d = %v, want %v", i, gotValues[i], tt.want[i]) + } + } else { + if gotValues[i] != tt.want[i] { + t.Errorf("funcMedian3() at index %d = %v, want %v", i, gotValues[i], tt.want[i]) + } + } + } + }) + } +} + +func TestFuncMedian5(t *testing.T) { + tests := []struct { + name string + values []float64 + want []float64 + }{ + { + name: "Values", + values: []float64{5, 3, 8, 2, 7, 9, 1, 4, 6, 10}, + want: []float64{5, 3, 5, 7, 7, 4, 6, 6, 6, 10}, // edge values unchanged + }, + { + name: "NaNHandling", + values: []float64{math.NaN(), 3, math.NaN(), 7, 9, 1, 4, 6, 10, 2}, + want: []float64{math.NaN(), 3, 7, 5, 5.5, 6, 6, 4, 10, 2}, // median of available values + }, + { + name: "UniformValues", + values: []float64{7, 7, 7, 7, 7}, + want: []float64{7, 7, 7, 7, 7}, + }, + { + name: "SingleValueSeries", + values: []float64{9}, + want: []float64{9}, + }, + { + name: "EmptySeries", + values: []float64{}, + want: []float64{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createTestTimeSeriesData(tt.values) + got := funcMedian5(result) + gotValues := extractValues(got) + + if len(gotValues) != len(tt.want) { + t.Errorf("funcMedian5() got length %d, want length %d", len(gotValues), len(tt.want)) + return + } + + for i := range gotValues { + if math.IsNaN(tt.want[i]) { + if !math.IsNaN(gotValues[i]) { + t.Errorf("funcMedian5() at index %d = %v, want %v", i, gotValues[i], tt.want[i]) + } + } else { + if gotValues[i] != tt.want[i] { + t.Errorf("funcMedian5() at index %d = %v, want %v", i, gotValues[i], tt.want[i]) + } + } + } + }) + } +} + +func TestFuncRunningDiff(t *testing.T) { + tests := []struct { + name string + values []float64 + want []float64 + }{ + { + name: "test funcRunningDiff", + values: []float64{1, 2, 3}, + want: []float64{1, 1}, // diff removes first element + }, + { + name: "test funcRunningDiff with start number as 8", + values: []float64{8, 8, 8}, + want: []float64{0, 0}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createTestTimeSeriesData(tt.values) + got := funcRunningDiff(result) + gotValues := extractValues(got) + + if len(gotValues) != len(tt.want) { + t.Errorf("funcRunningDiff() got length %d, want length %d", len(gotValues), len(tt.want)) + return + } + + for i := range gotValues { + if gotValues[i] != tt.want[i] { + t.Errorf("funcRunningDiff() at index %d = %v, want %v", i, gotValues[i], tt.want[i]) + } + } + }) + } +} + +func TestFuncClampMin(t *testing.T) { + tests := []struct { + name string + values []float64 + threshold float64 + want []float64 + }{ + { + name: "test funcClampMin", + values: []float64{0.5, 0.4, 0.3, 0.2, 0.1}, + threshold: 0.3, + want: []float64{0.5, 0.4, 0.3, 0.3, 0.3}, + }, + { + name: "test funcClampMin with threshold 0", + values: []float64{-0.5, -0.4, 0.3, 0.2, 0.1}, + threshold: 0, + want: []float64{0, 0, 0.3, 0.2, 0.1}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createTestTimeSeriesData(tt.values) + newResult := funcClampMin(result, tt.threshold) + got := extractValues(newResult) + + if len(got) != len(tt.want) { + t.Errorf("funcClampMin() got length %d, want length %d", len(got), len(tt.want)) + return + } + + for i := range got { + if got[i] != tt.want[i] { + t.Errorf("funcClampMin() at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } + }) + } +} + +func TestFuncClampMax(t *testing.T) { + tests := []struct { + name string + values []float64 + threshold float64 + want []float64 + }{ + { + name: "test funcClampMax", + values: []float64{0.5, 0.4, 0.3, 0.2, 0.1}, + threshold: 0.3, + want: []float64{0.3, 0.3, 0.3, 0.2, 0.1}, + }, + { + name: "test funcClampMax with threshold 1.0", + values: []float64{2.5, 0.4, 1.3, 0.2, 0.1}, + threshold: 1.0, + want: []float64{1.0, 0.4, 1.0, 0.2, 0.1}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createTestTimeSeriesData(tt.values) + newResult := funcClampMax(result, tt.threshold) + got := extractValues(newResult) + + if len(got) != len(tt.want) { + t.Errorf("funcClampMax() got length %d, want length %d", len(got), len(tt.want)) + return + } + + for i := range got { + if got[i] != tt.want[i] { + t.Errorf("funcClampMax() at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } + }) + } +} + +func TestFuncAbsolute(t *testing.T) { + tests := []struct { + name string + values []float64 + want []float64 + }{ + { + name: "test funcAbsolute", + values: []float64{-0.5, 0.4, -0.3, 0.2, -0.1}, + want: []float64{0.5, 0.4, 0.3, 0.2, 0.1}, + }, + { + name: "test funcAbsolute with all positive", + values: []float64{0.5, 0.4, 0.3, 0.2, 0.1}, + want: []float64{0.5, 0.4, 0.3, 0.2, 0.1}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createTestTimeSeriesData(tt.values) + newResult := funcAbsolute(result) + got := extractValues(newResult) + + if len(got) != len(tt.want) { + t.Errorf("funcAbsolute() got length %d, want length %d", len(got), len(tt.want)) + return + } + + for i := range got { + if got[i] != tt.want[i] { + t.Errorf("funcAbsolute() at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } + }) + } +} + +func TestFuncLog2(t *testing.T) { + tests := []struct { + name string + values []float64 + want []float64 + }{ + { + name: "test funcLog2", + values: []float64{1, 2, 4, 8, 16}, + want: []float64{0, 1, 2, 3, 4}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createTestTimeSeriesData(tt.values) + newResult := funcLog2(result) + got := extractValues(newResult) + + if len(got) != len(tt.want) { + t.Errorf("funcLog2() got length %d, want length %d", len(got), len(tt.want)) + return + } + + for i := range got { + if math.Abs(got[i]-tt.want[i]) > 1e-10 { + t.Errorf("funcLog2() at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } + }) + } +} + +func TestFuncLog10(t *testing.T) { + tests := []struct { + name string + values []float64 + want []float64 + }{ + { + name: "test funcLog10", + values: []float64{1, 10, 100, 1000}, + want: []float64{0, 1, 2, 3}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createTestTimeSeriesData(tt.values) + newResult := funcLog10(result) + got := extractValues(newResult) + + if len(got) != len(tt.want) { + t.Errorf("funcLog10() got length %d, want length %d", len(got), len(tt.want)) + return + } + + for i := range got { + if math.Abs(got[i]-tt.want[i]) > 1e-10 { + t.Errorf("funcLog10() at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } + }) + } +} + +func TestFuncCumSum(t *testing.T) { + tests := []struct { + name string + values []float64 + want []float64 + }{ + { + name: "test funcCumSum", + values: []float64{1, 2, 3, 4, 5}, + want: []float64{1, 3, 6, 10, 15}, + }, + { + name: "test funcCumSum with NaN", + values: []float64{1, math.NaN(), 3, 4, 5}, + want: []float64{1, 1, 4, 8, 13}, // NaN is ignored + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createTestTimeSeriesData(tt.values) + newResult := funcCumulativeSum(result) + got := extractValues(newResult) + + if len(got) != len(tt.want) { + t.Errorf("funcCumSum() got length %d, want length %d", len(got), len(tt.want)) + return + } + + for i := range got { + if got[i] != tt.want[i] { + t.Errorf("funcCumSum() at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } + }) + } +} + +func TestFuncTimeShift(t *testing.T) { + tests := []struct { + name string + values []float64 + shift float64 + want []int64 // expected timestamps + }{ + { + name: "test funcTimeShift positive", + values: []float64{1, 2, 3}, + shift: 5.0, // 5 seconds + want: []int64{5001, 5002, 5003}, // original timestamps (1,2,3) + 5000ms + }, + { + name: "test funcTimeShift negative", + values: []float64{1, 2, 3}, + shift: -2.0, // -2 seconds + want: []int64{-1999, -1998, -1997}, // original timestamps (1,2,3) - 2000ms + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createTestTimeSeriesData(tt.values) + newResult := funcTimeShift(result, tt.shift) + + got := make([]int64, len(newResult.Values)) + for i, point := range newResult.Values { + got[i] = point.Timestamp + } + + if len(got) != len(tt.want) { + t.Errorf("funcTimeShift() got length %d, want length %d", len(got), len(tt.want)) + return + } + + for i := range got { + if got[i] != tt.want[i] { + t.Errorf("funcTimeShift() at index %d timestamp = %v, want %v", i, got[i], tt.want[i]) + } + } + }) + } +} + +func TestApplyFunction(t *testing.T) { + tests := []struct { + name string + function Function + values []float64 + want []float64 + }{ + { + name: "cutOffMin function", + function: Function{ + Name: FunctionNameCutOffMin, + Args: []FunctionArg{ + {Value: "0.3"}, + }, + }, + values: []float64{0.5, 0.4, 0.3, 0.2, 0.1}, + want: []float64{0.5, 0.4, 0.3, math.NaN(), math.NaN()}, + }, + { + name: "absolute function", + function: Function{ + Name: FunctionNameAbsolute, + }, + values: []float64{-0.5, 0.4, -0.3, 0.2, -0.1}, + want: []float64{0.5, 0.4, 0.3, 0.2, 0.1}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createTestTimeSeriesData(tt.values) + newResult := ApplyFunction(tt.function, result) + got := extractValues(newResult) + + if len(got) != len(tt.want) { + t.Errorf("ApplyFunction() got length %d, want length %d", len(got), len(tt.want)) + return + } + + for i := range got { + if math.IsNaN(tt.want[i]) { + if !math.IsNaN(got[i]) { + t.Errorf("ApplyFunction() at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } else { + if got[i] != tt.want[i] { + t.Errorf("ApplyFunction() at index %d = %v, want %v", i, got[i], tt.want[i]) + } + } + } + }) + } +} + +func TestApplyFunctions(t *testing.T) { + functions := []Function{ + { + Name: FunctionNameCutOffMin, + Args: []FunctionArg{ + {Value: "0.3"}, + }, + }, + { + Name: FunctionNameCumulativeSum, + }, + } + + values := []float64{0.5, 0.2, 0.1, 0.4, 0.3} + want := []float64{0.5, 0.5, 0.5, 0.9, 1.2} + + result := createTestTimeSeriesData(values) + newResult := ApplyFunctions(functions, result) + got := extractValues(newResult) + + if len(got) != len(want) { + t.Errorf("ApplyFunctions() got length %d, want length %d", len(got), len(want)) + return + } + + for i := range got { + if got[i] != want[i] { + t.Errorf("ApplyFunctions() at index %d = %v, want %v", i, got[i], want[i]) + } + } +} diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/reduce_test.go b/pkg/types/querybuildertypes/querybuildertypesv5/reduce_test.go new file mode 100644 index 0000000000..12e1898dcc --- /dev/null +++ b/pkg/types/querybuildertypes/querybuildertypesv5/reduce_test.go @@ -0,0 +1,517 @@ +package querybuildertypesv5 + +import ( + "math" + "testing" + + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/SigNoz/signoz/pkg/valuer" + "github.com/stretchr/testify/assert" +) + +func TestFunctionReduceTo(t *testing.T) { + createLabels := func() []*Label { + return []*Label{ + { + Key: telemetrytypes.TelemetryFieldKey{Name: "service", FieldDataType: telemetrytypes.FieldDataTypeString}, + Value: "test-service", + }, + } + } + + createValues := func(pairs ...struct { + ts int64 + val float64 + }) []*TimeSeriesValue { + values := make([]*TimeSeriesValue, len(pairs)) + for i, pair := range pairs { + values[i] = &TimeSeriesValue{ + Timestamp: pair.ts, + Value: pair.val, + } + } + return values + } + + t.Run("Empty series", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: []*TimeSeriesValue{}, + } + + result := FunctionReduceTo(ts, ReduceToSum) + assert.Equal(t, ts, result, "Empty series should return unchanged") + }) + + t.Run("ReduceToLast", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, 20.0}, + struct { + ts int64 + val float64 + }{3000, 30.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToLast) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(3000), result.Values[0].Timestamp) + assert.Equal(t, 30.0, result.Values[0].Value) + assert.Equal(t, ts.Labels, result.Labels) + }) + + t.Run("ReduceToSum", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, 20.0}, + struct { + ts int64 + val float64 + }{3000, 30.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToSum) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(3000), result.Values[0].Timestamp) // Last timestamp + assert.Equal(t, 60.0, result.Values[0].Value) // 10 + 20 + 30 + }) + + t.Run("ReduceToSum with NaN values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, math.NaN()}, + struct { + ts int64 + val float64 + }{3000, 30.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToSum) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(3000), result.Values[0].Timestamp) + assert.Equal(t, 40.0, result.Values[0].Value) // 10 + 30 (NaN skipped) + }) + + t.Run("ReduceToAvg", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, 20.0}, + struct { + ts int64 + val float64 + }{3000, 30.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToAvg) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(3000), result.Values[0].Timestamp) + assert.Equal(t, 20.0, result.Values[0].Value) // (10 + 20 + 30) / 3 + }) + + t.Run("ReduceToAvg with all NaN values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, math.NaN()}, + struct { + ts int64 + val float64 + }{2000, math.NaN()}, + ), + } + + result := FunctionReduceTo(ts, ReduceToAvg) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(2000), result.Values[0].Timestamp) + assert.True(t, math.IsNaN(result.Values[0].Value)) + }) + + t.Run("ReduceToMin", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 30.0}, + struct { + ts int64 + val float64 + }{2000, 10.0}, // minimum + struct { + ts int64 + val float64 + }{3000, 20.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToMin) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Timestamp of minimum value + assert.Equal(t, 10.0, result.Values[0].Value) + }) + + t.Run("ReduceToMin with all NaN values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, math.NaN()}, + struct { + ts int64 + val float64 + }{2000, math.NaN()}, + ), + } + + result := FunctionReduceTo(ts, ReduceToMin) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Last timestamp + assert.True(t, math.IsNaN(result.Values[0].Value)) + }) + + t.Run("ReduceToMax", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, 30.0}, // maximum + struct { + ts int64 + val float64 + }{3000, 20.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToMax) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Timestamp of maximum value + assert.Equal(t, 30.0, result.Values[0].Value) + }) + + t.Run("ReduceToMax with all NaN values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, math.NaN()}, + struct { + ts int64 + val float64 + }{2000, math.NaN()}, + ), + } + + result := FunctionReduceTo(ts, ReduceToMax) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Last timestamp + assert.True(t, math.IsNaN(result.Values[0].Value)) + }) + + t.Run("ReduceToCount", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, math.NaN()}, + struct { + ts int64 + val float64 + }{3000, 30.0}, + struct { + ts int64 + val float64 + }{4000, 40.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToCount) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(4000), result.Values[0].Timestamp) // Last timestamp + assert.Equal(t, 3.0, result.Values[0].Value) // 3 non-NaN values + }) + + t.Run("ReduceToMedian odd number of values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 30.0}, + struct { + ts int64 + val float64 + }{2000, 10.0}, + struct { + ts int64 + val float64 + }{3000, 20.0}, // median when sorted: 10, 20, 30 + ), + } + + result := FunctionReduceTo(ts, ReduceToMedian) + + assert.Len(t, result.Values, 1) + assert.Equal(t, 20.0, result.Values[0].Value) // Middle value + assert.Equal(t, int64(3000), result.Values[0].Timestamp) // Timestamp of median value + }) + + t.Run("ReduceToMedian even number of values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 40.0}, + struct { + ts int64 + val float64 + }{2000, 10.0}, + struct { + ts int64 + val float64 + }{3000, 30.0}, + struct { + ts int64 + val float64 + }{4000, 20.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToMedian) + + assert.Len(t, result.Values, 1) + assert.Equal(t, 25.0, result.Values[0].Value) // (20 + 30) / 2 when sorted: 10, 20, 30, 40 + expectedTimestamp := (int64(4000) + int64(3000)) / 2 // Average of middle timestamps + assert.Equal(t, expectedTimestamp, result.Values[0].Timestamp) + }) + + t.Run("ReduceToMedian with NaN values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 30.0}, + struct { + ts int64 + val float64 + }{2000, math.NaN()}, + struct { + ts int64 + val float64 + }{3000, 10.0}, + struct { + ts int64 + val float64 + }{4000, 20.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToMedian) + + assert.Len(t, result.Values, 1) + assert.Equal(t, 20.0, result.Values[0].Value) // Median of [10, 20, 30] + assert.Equal(t, int64(4000), result.Values[0].Timestamp) + }) + + t.Run("ReduceToMedian with all NaN values", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, math.NaN()}, + struct { + ts int64 + val float64 + }{2000, math.NaN()}, + ), + } + + result := FunctionReduceTo(ts, ReduceToMedian) + + assert.Len(t, result.Values, 1) + assert.Equal(t, int64(2000), result.Values[0].Timestamp) // Last timestamp + assert.True(t, math.IsNaN(result.Values[0].Value)) + }) + + t.Run("ReduceToUnknown", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, 20.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToUnknown) + + assert.Equal(t, ts, result, "Unknown reduce operation should return original series") + }) + + t.Run("Invalid ReduceTo", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, 20.0}, + ), + } + + // Create an invalid ReduceTo value + invalidReduceTo := ReduceTo{valuer.NewString("invalid")} + result := FunctionReduceTo(ts, invalidReduceTo) + + assert.Equal(t, ts, result, "Invalid reduce operation should return original series") + }) + + t.Run("Single value series", func(t *testing.T) { + ts := &TimeSeries{ + Labels: createLabels(), + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 42.0}, + ), + } + + testCases := []struct { + reduceTo ReduceTo + expected float64 + }{ + {ReduceToLast, 42.0}, + {ReduceToSum, 42.0}, + {ReduceToAvg, 42.0}, + {ReduceToMin, 42.0}, + {ReduceToMax, 42.0}, + {ReduceToCount, 1.0}, + {ReduceToMedian, 42.0}, + } + + for _, tc := range testCases { + t.Run(tc.reduceTo.StringValue(), func(t *testing.T) { + result := FunctionReduceTo(ts, tc.reduceTo) + assert.Len(t, result.Values, 1) + assert.Equal(t, tc.expected, result.Values[0].Value) + assert.Equal(t, int64(1000), result.Values[0].Timestamp) + }) + } + }) + + t.Run("Labels preservation", func(t *testing.T) { + originalLabels := []*Label{ + { + Key: telemetrytypes.TelemetryFieldKey{Name: "service", FieldDataType: telemetrytypes.FieldDataTypeString}, + Value: "test-service", + }, + { + Key: telemetrytypes.TelemetryFieldKey{Name: "instance", FieldDataType: telemetrytypes.FieldDataTypeString}, + Value: "test-instance", + }, + } + + ts := &TimeSeries{ + Labels: originalLabels, + Values: createValues( + struct { + ts int64 + val float64 + }{1000, 10.0}, + struct { + ts int64 + val float64 + }{2000, 20.0}, + ), + } + + result := FunctionReduceTo(ts, ReduceToSum) + + assert.Equal(t, originalLabels, result.Labels, "Labels should be preserved") + assert.Len(t, result.Labels, 2) + assert.Equal(t, "test-service", result.Labels[0].Value) + assert.Equal(t, "test-instance", result.Labels[1].Value) + }) +} diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go b/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go index 60dc24b825..6c48a90ae5 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go @@ -249,11 +249,8 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) { Name: "error_rate", Expression: "A / B * 100", Functions: []Function{{ - Name: "absolute", - Args: []struct { - Name string `json:"name,omitempty"` - Value string `json:"value"` - }{}, + Name: FunctionNameAbsolute, + Args: []FunctionArg{}, }}, }, }}, @@ -261,6 +258,53 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) { }, wantErr: false, }, + { + name: "function cut off min", + jsonData: `{ + "schemaVersion": "v1", + "start": 1640995200000, + "end": 1640998800000, + "requestType": "time_series", + "compositeQuery": { + "queries": [{ + "name": "F1", + "type": "builder_formula", + "spec": { + "name": "error_rate", + "expression": "A / B * 100", + "functions": [{ + "name": "cut_off_min", + "args": [{ + "value": "0.3" + }] + }] + } + }] + } + }`, + expected: QueryRangeRequest{ + SchemaVersion: "v1", + Start: 1640995200000, + End: 1640998800000, + RequestType: RequestTypeTimeSeries, + CompositeQuery: CompositeQuery{ + Queries: []QueryEnvelope{{ + Name: "F1", + Type: QueryTypeFormula, + Spec: QueryBuilderFormula{ + Name: "error_rate", + Expression: "A / B * 100", + Functions: []Function{{ + Name: FunctionNameCutOffMin, + Args: []FunctionArg{{ + Value: "0.3", + }}, + }}, + }, + }}, + }, + }, + }, { name: "valid join query", jsonData: `{ diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/series_limit.go b/pkg/types/querybuildertypes/querybuildertypesv5/series_limit.go new file mode 100644 index 0000000000..2650859d21 --- /dev/null +++ b/pkg/types/querybuildertypes/querybuildertypesv5/series_limit.go @@ -0,0 +1,176 @@ +package querybuildertypesv5 + +import ( + "math" + "slices" + "strconv" + "strings" + + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" +) + +const ( + DefaultOrderByKey = "__result" +) + +// ApplyLimit applies limit and ordering to a list of time series +// This function sorts the series based on the provided order criteria and applies the limit +func ApplySeriesLimit(series []*TimeSeries, orderBy []OrderBy, limit int) []*TimeSeries { + if len(series) == 0 { + return series + } + + // If no orderBy is specified, sort by value in descending order + effectiveOrderBy := orderBy + if len(effectiveOrderBy) == 0 { + effectiveOrderBy = []OrderBy{ + { + Key: OrderByKey{ + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: DefaultOrderByKey, + FieldDataType: telemetrytypes.FieldDataTypeFloat64, + }, + }, + Direction: OrderDirectionDesc, + }, + } + } + + // Cache series values and labels + seriesValues := make(map[*TimeSeries]float64, len(series)) + seriesLabels := make(map[*TimeSeries]map[string]string, len(series)) + + for _, s := range series { + seriesValues[s] = calculateSeriesValue(s) + // Cache all labels for this series + labelMap := make(map[string]string) + for _, label := range s.Labels { + if strVal, ok := label.Value.(string); ok { + labelMap[label.Key.Name] = strVal + } else { + labelMap[label.Key.Name] = convertValueToString(label.Value) + } + } + seriesLabels[s] = labelMap + } + + // Sort the series based on the order criteria + slices.SortStableFunc(series, func(i, j *TimeSeries) int { + if compareSeries(i, j, effectiveOrderBy, seriesValues, seriesLabels) { + return -1 + } + if compareSeries(j, i, effectiveOrderBy, seriesValues, seriesLabels) { + return 1 + } + return 0 + }) + + // Apply limit if specified + if limit > 0 && len(series) > limit { + return series[:limit] + } + + return series +} + +// compareSeries compares two time series based on the order criteria +// Returns true if series i should come before series j +func compareSeries(seriesI, seriesJ *TimeSeries, orderBy []OrderBy, seriesValues map[*TimeSeries]float64, seriesLabels map[*TimeSeries]map[string]string) bool { + for _, order := range orderBy { + columnName := order.Key.Name + direction := order.Direction + + if columnName == DefaultOrderByKey { + valueI := seriesValues[seriesI] + valueJ := seriesValues[seriesJ] + + if valueI != valueJ { + if direction == OrderDirectionAsc { + return valueI < valueJ + } else { // desc + return valueI > valueJ + } + } + } else { + labelI, existsI := seriesLabels[seriesI][columnName] + labelJ, existsJ := seriesLabels[seriesJ][columnName] + + if existsI != existsJ { + // Handle missing labels - non-existent labels come first + return !existsI + } + + if existsI && existsJ { + comparison := strings.Compare(labelI, labelJ) + if comparison != 0 { + if direction == OrderDirectionAsc { + return comparison < 0 + } else { // desc + return comparison > 0 + } + } + } + } + } + + // If all order criteria are equal, preserve original order + return false +} + +// calculateSeriesValue calculates the representative value for a time series +// For single-point series (like table queries), returns that value +// For multi-point series, returns the average of non-NaN/non-Inf values +func calculateSeriesValue(series *TimeSeries) float64 { + if len(series.Values) == 0 { + return 0.0 + } + + // For single-point series, return that value directly + if len(series.Values) == 1 { + value := series.Values[0].Value + if math.IsNaN(value) || math.IsInf(value, 0) { + return 0.0 + } + return value + } + + // For multi-point series, calculate average of valid values + var sum float64 + var count float64 + + for _, point := range series.Values { + if math.IsNaN(point.Value) || math.IsInf(point.Value, 0) { + continue + } + sum += point.Value + count++ + } + + // Avoid division by zero + if count == 0 { + return 0.0 + } + + return sum / count +} + +// convertValueToString converts various types to string for comparison +func convertValueToString(value any) string { + switch v := value.(type) { + case string: + return v + case int: + return strconv.FormatInt(int64(v), 10) + case int64: + return strconv.FormatInt(v, 10) + case float64: + return strconv.FormatFloat(v, 'f', -1, 64) + case bool: + if v { + return "true" + } + return "false" + default: + return "" + } +} diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/series_limit_test.go b/pkg/types/querybuildertypes/querybuildertypesv5/series_limit_test.go new file mode 100644 index 0000000000..918bcfdb08 --- /dev/null +++ b/pkg/types/querybuildertypes/querybuildertypesv5/series_limit_test.go @@ -0,0 +1,234 @@ +package querybuildertypesv5 + +import ( + "testing" + + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/stretchr/testify/assert" +) + +func TestApplySeriesLimit(t *testing.T) { + t.Run("Sort by value with limit", func(t *testing.T) { + // Create test series with different values + series := []*TimeSeries{ + { + Labels: []*Label{ + { + Key: telemetrytypes.TelemetryFieldKey{ + Name: "service", + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + Value: "service-a", + }, + }, + Values: []*TimeSeriesValue{ + {Timestamp: 1000, Value: 10.0}, + }, + }, + { + Labels: []*Label{ + { + Key: telemetrytypes.TelemetryFieldKey{ + Name: "service", + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + Value: "service-b", + }, + }, + Values: []*TimeSeriesValue{ + {Timestamp: 1000, Value: 50.0}, + }, + }, + { + Labels: []*Label{ + { + Key: telemetrytypes.TelemetryFieldKey{ + Name: "service", + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + Value: "service-c", + }, + }, + Values: []*TimeSeriesValue{ + {Timestamp: 1000, Value: 30.0}, + }, + }, + { + Labels: []*Label{ + { + Key: telemetrytypes.TelemetryFieldKey{ + Name: "service", + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + Value: "service-d", + }, + }, + Values: []*TimeSeriesValue{ + {Timestamp: 1000, Value: 20.0}, + }, + }, + } + + // Sort by value descending with limit of 2 + orderBy := []OrderBy{ + { + Key: OrderByKey{ + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: DefaultOrderByKey, + FieldDataType: telemetrytypes.FieldDataTypeFloat64, + }, + }, + Direction: OrderDirectionDesc, + }, + } + + result := ApplySeriesLimit(series, orderBy, 2) + + // Should return top 2 series by value: service-b (50.0), service-c (30.0) + assert.Len(t, result, 2) + + // First series should be service-b with value 50.0 + assert.Equal(t, "service-b", result[0].Labels[0].Value) + assert.Equal(t, 50.0, result[0].Values[0].Value) + + // Second series should be service-c with value 30.0 + assert.Equal(t, "service-c", result[1].Labels[0].Value) + assert.Equal(t, 30.0, result[1].Values[0].Value) + }) + + t.Run("Sort by labels with two keys", func(t *testing.T) { + // Create test series with different label combinations + series := []*TimeSeries{ + { + Labels: []*Label{ + { + Key: telemetrytypes.TelemetryFieldKey{ + Name: "service", + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + Value: "backend", + }, + { + Key: telemetrytypes.TelemetryFieldKey{ + Name: "environment", + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + Value: "prod", + }, + }, + Values: []*TimeSeriesValue{ + {Timestamp: 1000, Value: 10.0}, + }, + }, + { + Labels: []*Label{ + { + Key: telemetrytypes.TelemetryFieldKey{ + Name: "service", + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + Value: "frontend", + }, + { + Key: telemetrytypes.TelemetryFieldKey{ + Name: "environment", + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + Value: "dev", + }, + }, + Values: []*TimeSeriesValue{ + {Timestamp: 1000, Value: 20.0}, + }, + }, + { + Labels: []*Label{ + { + Key: telemetrytypes.TelemetryFieldKey{ + Name: "service", + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + Value: "backend", + }, + { + Key: telemetrytypes.TelemetryFieldKey{ + Name: "environment", + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + Value: "dev", + }, + }, + Values: []*TimeSeriesValue{ + {Timestamp: 1000, Value: 30.0}, + }, + }, + { + Labels: []*Label{ + { + Key: telemetrytypes.TelemetryFieldKey{ + Name: "service", + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + Value: "frontend", + }, + { + Key: telemetrytypes.TelemetryFieldKey{ + Name: "environment", + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + Value: "prod", + }, + }, + Values: []*TimeSeriesValue{ + {Timestamp: 1000, Value: 40.0}, + }, + }, + } + + // Sort by service (asc) then by environment (desc) with limit of 3 + orderBy := []OrderBy{ + { + Key: OrderByKey{ + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service", + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + }, + Direction: OrderDirectionAsc, + }, + { + Key: OrderByKey{ + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "environment", + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + }, + Direction: OrderDirectionDesc, + }, + } + + result := ApplySeriesLimit(series, orderBy, 3) + + // Should return 3 series sorted by service (asc), then environment (desc) + // Expected order: + // 1. backend + prod + // 2. backend + dev + // 3. frontend + prod + assert.Len(t, result, 3) + + // First: backend + prod + assert.Equal(t, "backend", result[0].Labels[0].Value) + assert.Equal(t, "prod", result[0].Labels[1].Value) + assert.Equal(t, 10.0, result[0].Values[0].Value) + + // Second: backend + dev + assert.Equal(t, "backend", result[1].Labels[0].Value) + assert.Equal(t, "dev", result[1].Labels[1].Value) + assert.Equal(t, 30.0, result[1].Values[0].Value) + + // Third: frontend + prod + assert.Equal(t, "frontend", result[2].Labels[0].Value) + assert.Equal(t, "prod", result[2].Labels[1].Value) + assert.Equal(t, 40.0, result[2].Values[0].Value) + }) +}