From c397f57956a050e5707d3153c649467197f15c0d Mon Sep 17 00:00:00 2001 From: srikanthccv Date: Sat, 31 May 2025 09:40:39 +0530 Subject: [PATCH] chore: series in and series out --- .../querybuildertypesv5/builder_elements.go | 14 +- .../querybuildertypesv5/functions.go | 393 ++++++------------ .../querybuildertypesv5/functions_test.go | 57 +-- .../querybuildertypesv5/req_test.go | 52 ++- 4 files changed, 194 insertions(+), 322 deletions(-) diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go b/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go index 68f78ffc6c..3e503b49f7 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go @@ -205,17 +205,19 @@ type SecondaryAggregation struct { LimitBy LimitBy `json:"limitBy,omitempty"` } +type FunctionArg struct { + // name of the argument + Name string `json:"name,omitempty"` + // value of the argument + Value string `json:"value"` +} + type Function struct { // name of the function Name FunctionName `json:"name"` // args is the arguments to the function - Args []struct { - // name of the argument - Name string `json:"name,omitempty"` - // value of the argument - Value string `json:"value"` - } `json:"args,omitempty"` + Args []FunctionArg `json:"args,omitempty"` } type LimitBy struct { diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/functions.go b/pkg/types/querybuildertypes/querybuildertypesv5/functions.go index 679e7c9095..81e3cd0c55 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/functions.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/functions.go @@ -13,27 +13,27 @@ type FunctionName struct { } var ( - FunctionNameCutOffMin = FunctionName{valuer.NewString("cutoff_min")} - FunctionNameCutOffMax = FunctionName{valuer.NewString("cutoff_max")} - FunctionNameClampMin = FunctionName{valuer.NewString("clamp_min")} - FunctionNameClampMax = FunctionName{valuer.NewString("clamp_max")} + FunctionNameCutOffMin = FunctionName{valuer.NewString("cutOffMin")} + FunctionNameCutOffMax = FunctionName{valuer.NewString("cutOffMax")} + FunctionNameClampMin = FunctionName{valuer.NewString("clampMin")} + FunctionNameClampMax = FunctionName{valuer.NewString("clampMax")} FunctionNameAbsolute = FunctionName{valuer.NewString("absolute")} - FunctionNameRunningDiff = FunctionName{valuer.NewString("running_diff")} + FunctionNameRunningDiff = FunctionName{valuer.NewString("runningDiff")} FunctionNameLog2 = FunctionName{valuer.NewString("log2")} FunctionNameLog10 = FunctionName{valuer.NewString("log10")} - FunctionNameCumulativeSum = FunctionName{valuer.NewString("cumulative_sum")} + FunctionNameCumulativeSum = FunctionName{valuer.NewString("cumulativeSum")} FunctionNameEWMA3 = FunctionName{valuer.NewString("ewma3")} FunctionNameEWMA5 = FunctionName{valuer.NewString("ewma5")} FunctionNameEWMA7 = FunctionName{valuer.NewString("ewma7")} FunctionNameMedian3 = FunctionName{valuer.NewString("median3")} FunctionNameMedian5 = FunctionName{valuer.NewString("median5")} FunctionNameMedian7 = FunctionName{valuer.NewString("median7")} - FunctionNameTimeShift = FunctionName{valuer.NewString("time_shift")} + FunctionNameTimeShift = FunctionName{valuer.NewString("timeShift")} FunctionNameAnomaly = FunctionName{valuer.NewString("anomaly")} ) // ApplyFunction applies the given function to the result data -func ApplyFunction(fn Function, result *Result) *Result { +func ApplyFunction(fn Function, result *TimeSeries) *TimeSeries { // Extract the function name and arguments name := fn.Name args := fn.Args @@ -86,7 +86,8 @@ func ApplyFunction(fn Function, result *Result) *Result { } return funcTimeShift(result, shift) case FunctionNameAnomaly: - // Placeholder for anomaly detection - would need more sophisticated implementation + // Placeholder for anomaly detection as function that can be used in dashboards other than + // the anomaly alert return result } return result @@ -98,10 +99,7 @@ func parseFloat64Arg(value string) (float64, error) { } // getEWMAAlpha calculates the alpha value for EWMA functions -func getEWMAAlpha(name FunctionName, args []struct { - Name string `json:"name,omitempty"` - Value string `json:"value"` -}) float64 { +func getEWMAAlpha(name FunctionName, args []FunctionArg) float64 { // Try to get alpha from arguments first if len(args) > 0 { if alpha, err := parseFloat64Arg(args[0].Value); err == nil { @@ -122,324 +120,196 @@ func getEWMAAlpha(name FunctionName, args []struct { } // funcCutOffMin cuts off values below the threshold and replaces them with NaN -func funcCutOffMin(result *Result, threshold float64) *Result { - if result.Type != RequestTypeTimeSeries { - return result - } - - timeSeriesData, ok := result.Value.(*TimeSeriesData) - if !ok { - return result - } - - for _, aggregation := range timeSeriesData.Aggregations { - for _, series := range aggregation.Series { - for idx, point := range series.Values { - if point.Value < threshold { - point.Value = math.NaN() - } - series.Values[idx] = point - } +func funcCutOffMin(result *TimeSeries, threshold float64) *TimeSeries { + for idx, point := range result.Values { + if point.Value < threshold { + point.Value = math.NaN() } + result.Values[idx] = point } return result } // funcCutOffMax cuts off values above the threshold and replaces them with NaN -func funcCutOffMax(result *Result, threshold float64) *Result { - if result.Type != RequestTypeTimeSeries { - return result - } - - timeSeriesData, ok := result.Value.(*TimeSeriesData) - if !ok { - return result - } - - for _, aggregation := range timeSeriesData.Aggregations { - for _, series := range aggregation.Series { - for idx, point := range series.Values { - if point.Value > threshold { - point.Value = math.NaN() - } - series.Values[idx] = point - } +func funcCutOffMax(result *TimeSeries, threshold float64) *TimeSeries { + for idx, point := range result.Values { + if point.Value > threshold { + point.Value = math.NaN() } + result.Values[idx] = point } return result } // funcClampMin cuts off values below the threshold and replaces them with the threshold -func funcClampMin(result *Result, threshold float64) *Result { - if result.Type != RequestTypeTimeSeries { - return result - } - - timeSeriesData, ok := result.Value.(*TimeSeriesData) - if !ok { - return result - } - - for _, aggregation := range timeSeriesData.Aggregations { - for _, series := range aggregation.Series { - for idx, point := range series.Values { - if point.Value < threshold { - point.Value = threshold - } - series.Values[idx] = point - } +func funcClampMin(result *TimeSeries, threshold float64) *TimeSeries { + for idx, point := range result.Values { + if point.Value < threshold { + point.Value = threshold } + result.Values[idx] = point } return result } // funcClampMax cuts off values above the threshold and replaces them with the threshold -func funcClampMax(result *Result, threshold float64) *Result { - if result.Type != RequestTypeTimeSeries { - return result - } - - timeSeriesData, ok := result.Value.(*TimeSeriesData) - if !ok { - return result - } - - for _, aggregation := range timeSeriesData.Aggregations { - for _, series := range aggregation.Series { - for idx, point := range series.Values { - if point.Value > threshold { - point.Value = threshold - } - series.Values[idx] = point - } +func funcClampMax(result *TimeSeries, threshold float64) *TimeSeries { + for idx, point := range result.Values { + if point.Value > threshold { + point.Value = threshold } + result.Values[idx] = point } return result } // funcAbsolute returns the absolute value of each point -func funcAbsolute(result *Result) *Result { - if result.Type != RequestTypeTimeSeries { - return result - } - - timeSeriesData, ok := result.Value.(*TimeSeriesData) - if !ok { - return result - } - - for _, aggregation := range timeSeriesData.Aggregations { - for _, series := range aggregation.Series { - for idx, point := range series.Values { - point.Value = math.Abs(point.Value) - series.Values[idx] = point - } - } +func funcAbsolute(result *TimeSeries) *TimeSeries { + for idx, point := range result.Values { + point.Value = math.Abs(point.Value) + result.Values[idx] = point } return result } // funcRunningDiff returns the running difference of each point -func funcRunningDiff(result *Result) *Result { - if result.Type != RequestTypeTimeSeries { - return result - } - - timeSeriesData, ok := result.Value.(*TimeSeriesData) - if !ok { - return result - } - - for _, aggregation := range timeSeriesData.Aggregations { - for _, series := range aggregation.Series { - // iterate over the points in reverse order - for idx := len(series.Values) - 1; idx >= 0; idx-- { - if idx > 0 { - series.Values[idx].Value = series.Values[idx].Value - series.Values[idx-1].Value - } - } - // remove the first point - if len(series.Values) > 0 { - series.Values = series.Values[1:] - } +func funcRunningDiff(result *TimeSeries) *TimeSeries { + // iterate over the points in reverse order + for idx := len(result.Values) - 1; idx >= 0; idx-- { + if idx > 0 { + result.Values[idx].Value = result.Values[idx].Value - result.Values[idx-1].Value } } + // remove the first point + result.Values = result.Values[1:] return result } // funcLog2 returns the log2 of each point -func funcLog2(result *Result) *Result { - if result.Type != RequestTypeTimeSeries { - return result - } - - timeSeriesData, ok := result.Value.(*TimeSeriesData) - if !ok { - return result - } - - for _, aggregation := range timeSeriesData.Aggregations { - for _, series := range aggregation.Series { - for idx, point := range series.Values { - point.Value = math.Log2(point.Value) - series.Values[idx] = point - } - } +func funcLog2(result *TimeSeries) *TimeSeries { + for idx, point := range result.Values { + point.Value = math.Log2(point.Value) + result.Values[idx] = point } return result } // funcLog10 returns the log10 of each point -func funcLog10(result *Result) *Result { - if result.Type != RequestTypeTimeSeries { - return result - } - - timeSeriesData, ok := result.Value.(*TimeSeriesData) - if !ok { - return result - } - - for _, aggregation := range timeSeriesData.Aggregations { - for _, series := range aggregation.Series { - for idx, point := range series.Values { - point.Value = math.Log10(point.Value) - series.Values[idx] = point - } - } +func funcLog10(result *TimeSeries) *TimeSeries { + for idx, point := range result.Values { + point.Value = math.Log10(point.Value) + result.Values[idx] = point } return result } // funcCumulativeSum returns the cumulative sum for each point in a series -func funcCumulativeSum(result *Result) *Result { - if result.Type != RequestTypeTimeSeries { - return result - } - - timeSeriesData, ok := result.Value.(*TimeSeriesData) - if !ok { - return result - } - - for _, aggregation := range timeSeriesData.Aggregations { - for _, series := range aggregation.Series { - var sum float64 - for idx, point := range series.Values { - if !math.IsNaN(point.Value) { - sum += point.Value - } - point.Value = sum - series.Values[idx] = point - } +func funcCumulativeSum(result *TimeSeries) *TimeSeries { + var sum float64 + for idx, point := range result.Values { + if !math.IsNaN(point.Value) { + sum += point.Value } + point.Value = sum + result.Values[idx] = point } + return result } // funcEWMA calculates the Exponentially Weighted Moving Average -func funcEWMA(result *Result, alpha float64) *Result { - if result.Type != RequestTypeTimeSeries { - return result - } +func funcEWMA(result *TimeSeries, alpha float64) *TimeSeries { + var ewma float64 + var initialized bool - timeSeriesData, ok := result.Value.(*TimeSeriesData) - if !ok { - return result - } - - for _, aggregation := range timeSeriesData.Aggregations { - for _, series := range aggregation.Series { - var ewma float64 - var initialized bool - - for i, point := range series.Values { - if !initialized { - if !math.IsNaN(point.Value) { - // Initialize EWMA with the first non-NaN value - ewma = point.Value - initialized = true - } - // Continue until the EWMA is initialized - continue - } - - if !math.IsNaN(point.Value) { - // Update EWMA with the current value - ewma = alpha*point.Value + (1-alpha)*ewma - } - // Set the EWMA value for the current point - series.Values[i].Value = ewma + for i, point := range result.Values { + if !initialized { + if !math.IsNaN(point.Value) { + // Initialize EWMA with the first non-NaN value + ewma = point.Value + initialized = true } + // Continue until the EWMA is initialized + continue } + + if !math.IsNaN(point.Value) { + // Update EWMA with the current value + ewma = alpha*point.Value + (1-alpha)*ewma + } + // Set the EWMA value for the current point + result.Values[i].Value = ewma } return result } // funcMedian3 returns the median of 3 points for each point in a series -func funcMedian3(result *Result) *Result { +func funcMedian3(result *TimeSeries) *TimeSeries { return funcMedianN(result, 3) } // funcMedian5 returns the median of 5 points for each point in a series -func funcMedian5(result *Result) *Result { +func funcMedian5(result *TimeSeries) *TimeSeries { return funcMedianN(result, 5) } // funcMedian7 returns the median of 7 points for each point in a series -func funcMedian7(result *Result) *Result { +func funcMedian7(result *TimeSeries) *TimeSeries { return funcMedianN(result, 7) } // funcMedianN returns the median of N points for each point in a series -func funcMedianN(result *Result, n int) *Result { - if result.Type != RequestTypeTimeSeries { +func funcMedianN(result *TimeSeries, n int) *TimeSeries { + if len(result.Values) == 0 { return result } - timeSeriesData, ok := result.Value.(*TimeSeriesData) - if !ok { + // For series shorter than window size, return original values + if len(result.Values) < n { return result } halfWindow := n / 2 + newValues := make([]*TimeSeriesValue, len(result.Values)) - for _, aggregation := range timeSeriesData.Aggregations { - for _, series := range aggregation.Series { - medianValues := make([]*TimeSeriesValue, 0) - - for i := halfWindow; i < len(series.Values)-halfWindow; i++ { - values := make([]float64, 0, n) - - // Add non-NaN values to the slice - for j := -halfWindow; j <= halfWindow; j++ { - if !math.IsNaN(series.Values[i+j].Value) { - values = append(values, series.Values[i+j].Value) - } - } - - // Create a new point with median value - newPoint := &TimeSeriesValue{ - Timestamp: series.Values[i].Timestamp, - } - - // Handle the case where there are not enough values to calculate a median - if len(values) == 0 { - newPoint.Value = math.NaN() - } else { - newPoint.Value = median(values) - } - - medianValues = append(medianValues, newPoint) - } - - // Replace the series values with median values - // Keep the original edge points unchanged - for i := halfWindow; i < len(series.Values)-halfWindow; i++ { - series.Values[i] = medianValues[i-halfWindow] - } + // Copy edge values that can't have a full window + for i := 0; i < halfWindow; i++ { + newValues[i] = &TimeSeriesValue{ + Timestamp: result.Values[i].Timestamp, + Value: result.Values[i].Value, } } + for i := len(result.Values) - halfWindow; i < len(result.Values); i++ { + newValues[i] = &TimeSeriesValue{ + Timestamp: result.Values[i].Timestamp, + Value: result.Values[i].Value, + } + } + + // Calculate median for points that have a full window + for i := halfWindow; i < len(result.Values)-halfWindow; i++ { + values := make([]float64, 0, n) + + // Add non-NaN values to the slice + for j := -halfWindow; j <= halfWindow; j++ { + if !math.IsNaN(result.Values[i+j].Value) { + values = append(values, result.Values[i+j].Value) + } + } + + newValues[i] = &TimeSeriesValue{ + Timestamp: result.Values[i].Timestamp, + } + + // Handle the case where there are not enough values to calculate a median + if len(values) == 0 { + newValues[i].Value = math.NaN() + } else { + newValues[i].Value = median(values) + } + } + + result.Values = newValues return result } @@ -458,30 +328,19 @@ func median(values []float64) float64 { } // funcTimeShift shifts all timestamps by the given amount (in seconds) -func funcTimeShift(result *Result, shift float64) *Result { - if result.Type != RequestTypeTimeSeries { - return result - } - - timeSeriesData, ok := result.Value.(*TimeSeriesData) - if !ok { - return result - } - +func funcTimeShift(result *TimeSeries, shift float64) *TimeSeries { shiftMs := int64(shift * 1000) // Convert seconds to milliseconds - for _, aggregation := range timeSeriesData.Aggregations { - for _, series := range aggregation.Series { - for idx, point := range series.Values { - series.Values[idx].Timestamp = point.Timestamp + shiftMs - } - } + for idx, point := range result.Values { + point.Timestamp = point.Timestamp + shiftMs + result.Values[idx] = point } + return result } // ApplyFunctions applies a list of functions sequentially to the result -func ApplyFunctions(functions []Function, result *Result) *Result { +func ApplyFunctions(functions []Function, result *TimeSeries) *TimeSeries { for _, fn := range functions { result = ApplyFunction(fn, result) } diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/functions_test.go b/pkg/types/querybuildertypes/querybuildertypesv5/functions_test.go index c1faea09b2..277587d0e8 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/functions_test.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/functions_test.go @@ -6,7 +6,7 @@ import ( ) // Helper function to create test time series data -func createTestTimeSeriesData(values []float64) *Result { +func createTestTimeSeriesData(values []float64) *TimeSeries { timeSeriesValues := make([]*TimeSeriesValue, len(values)) for i, val := range values { timeSeriesValues[i] = &TimeSeriesValue{ @@ -19,33 +19,13 @@ func createTestTimeSeriesData(values []float64) *Result { Values: timeSeriesValues, } - aggregation := &AggregationBucket{ - Index: 0, - Alias: "test", - Series: []*TimeSeries{series}, - } - - timeSeriesData := &TimeSeriesData{ - QueryName: "test", - Aggregations: []*AggregationBucket{aggregation}, - } - - return &Result{ - Type: RequestTypeTimeSeries, - Value: timeSeriesData, - } + return series } // Helper function to extract values from result for comparison -func extractValues(result *Result) []float64 { - timeSeriesData, ok := result.Value.(*TimeSeriesData) - if !ok || len(timeSeriesData.Aggregations) == 0 || len(timeSeriesData.Aggregations[0].Series) == 0 { - return nil - } - - series := timeSeriesData.Aggregations[0].Series[0] - values := make([]float64, len(series.Values)) - for i, point := range series.Values { +func extractValues(result *TimeSeries) []float64 { + values := make([]float64, len(result.Values)) + for i, point := range result.Values { values[i] = point.Value } return values @@ -578,13 +558,13 @@ func TestFuncTimeShift(t *testing.T) { name: "test funcTimeShift positive", values: []float64{1, 2, 3}, shift: 5.0, // 5 seconds - want: []int64{6000, 7000, 8000}, // original timestamps (1,2,3) + 5000ms + want: []int64{5001, 5002, 5003}, // original timestamps (1,2,3) + 5000ms }, { name: "test funcTimeShift negative", values: []float64{1, 2, 3}, - shift: -2.0, // -2 seconds - want: []int64{-1000, 0, 1000}, // original timestamps (1,2,3) - 2000ms + shift: -2.0, // -2 seconds + want: []int64{-1999, -1998, -1997}, // original timestamps (1,2,3) - 2000ms }, } @@ -593,15 +573,8 @@ func TestFuncTimeShift(t *testing.T) { result := createTestTimeSeriesData(tt.values) newResult := funcTimeShift(result, tt.shift) - timeSeriesData, ok := newResult.Value.(*TimeSeriesData) - if !ok { - t.Errorf("funcTimeShift() failed to get time series data") - return - } - - series := timeSeriesData.Aggregations[0].Series[0] - got := make([]int64, len(series.Values)) - for i, point := range series.Values { + got := make([]int64, len(newResult.Values)) + for i, point := range newResult.Values { got[i] = point.Timestamp } @@ -630,10 +603,7 @@ func TestApplyFunction(t *testing.T) { name: "cutOffMin function", function: Function{ Name: FunctionNameCutOffMin, - Args: []struct { - Name string `json:"name,omitempty"` - Value string `json:"value"` - }{ + Args: []FunctionArg{ {Value: "0.3"}, }, }, @@ -680,10 +650,7 @@ func TestApplyFunctions(t *testing.T) { functions := []Function{ { Name: FunctionNameCutOffMin, - Args: []struct { - Name string `json:"name,omitempty"` - Value string `json:"value"` - }{ + Args: []FunctionArg{ {Value: "0.3"}, }, }, diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go b/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go index 176b34b8f4..6c48a90ae5 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go @@ -250,10 +250,7 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) { Expression: "A / B * 100", Functions: []Function{{ Name: FunctionNameAbsolute, - Args: []struct { - Name string `json:"name,omitempty"` - Value string `json:"value"` - }{}, + Args: []FunctionArg{}, }}, }, }}, @@ -261,6 +258,53 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) { }, wantErr: false, }, + { + name: "function cut off min", + jsonData: `{ + "schemaVersion": "v1", + "start": 1640995200000, + "end": 1640998800000, + "requestType": "time_series", + "compositeQuery": { + "queries": [{ + "name": "F1", + "type": "builder_formula", + "spec": { + "name": "error_rate", + "expression": "A / B * 100", + "functions": [{ + "name": "cut_off_min", + "args": [{ + "value": "0.3" + }] + }] + } + }] + } + }`, + expected: QueryRangeRequest{ + SchemaVersion: "v1", + Start: 1640995200000, + End: 1640998800000, + RequestType: RequestTypeTimeSeries, + CompositeQuery: CompositeQuery{ + Queries: []QueryEnvelope{{ + Name: "F1", + Type: QueryTypeFormula, + Spec: QueryBuilderFormula{ + Name: "error_rate", + Expression: "A / B * 100", + Functions: []Function{{ + Name: FunctionNameCutOffMin, + Args: []FunctionArg{{ + Value: "0.3", + }}, + }}, + }, + }}, + }, + }, + }, { name: "valid join query", jsonData: `{