chore: series in and series out

This commit is contained in:
srikanthccv 2025-05-31 09:40:39 +05:30
parent 2e9c66abdd
commit c397f57956
4 changed files with 194 additions and 322 deletions

View File

@ -205,17 +205,19 @@ type SecondaryAggregation struct {
LimitBy LimitBy `json:"limitBy,omitempty"` LimitBy LimitBy `json:"limitBy,omitempty"`
} }
type FunctionArg struct {
// name of the argument
Name string `json:"name,omitempty"`
// value of the argument
Value string `json:"value"`
}
type Function struct { type Function struct {
// name of the function // name of the function
Name FunctionName `json:"name"` Name FunctionName `json:"name"`
// args is the arguments to the function // args is the arguments to the function
Args []struct { Args []FunctionArg `json:"args,omitempty"`
// name of the argument
Name string `json:"name,omitempty"`
// value of the argument
Value string `json:"value"`
} `json:"args,omitempty"`
} }
type LimitBy struct { type LimitBy struct {

View File

@ -13,27 +13,27 @@ type FunctionName struct {
} }
var ( var (
FunctionNameCutOffMin = FunctionName{valuer.NewString("cutoff_min")} FunctionNameCutOffMin = FunctionName{valuer.NewString("cutOffMin")}
FunctionNameCutOffMax = FunctionName{valuer.NewString("cutoff_max")} FunctionNameCutOffMax = FunctionName{valuer.NewString("cutOffMax")}
FunctionNameClampMin = FunctionName{valuer.NewString("clamp_min")} FunctionNameClampMin = FunctionName{valuer.NewString("clampMin")}
FunctionNameClampMax = FunctionName{valuer.NewString("clamp_max")} FunctionNameClampMax = FunctionName{valuer.NewString("clampMax")}
FunctionNameAbsolute = FunctionName{valuer.NewString("absolute")} FunctionNameAbsolute = FunctionName{valuer.NewString("absolute")}
FunctionNameRunningDiff = FunctionName{valuer.NewString("running_diff")} FunctionNameRunningDiff = FunctionName{valuer.NewString("runningDiff")}
FunctionNameLog2 = FunctionName{valuer.NewString("log2")} FunctionNameLog2 = FunctionName{valuer.NewString("log2")}
FunctionNameLog10 = FunctionName{valuer.NewString("log10")} FunctionNameLog10 = FunctionName{valuer.NewString("log10")}
FunctionNameCumulativeSum = FunctionName{valuer.NewString("cumulative_sum")} FunctionNameCumulativeSum = FunctionName{valuer.NewString("cumulativeSum")}
FunctionNameEWMA3 = FunctionName{valuer.NewString("ewma3")} FunctionNameEWMA3 = FunctionName{valuer.NewString("ewma3")}
FunctionNameEWMA5 = FunctionName{valuer.NewString("ewma5")} FunctionNameEWMA5 = FunctionName{valuer.NewString("ewma5")}
FunctionNameEWMA7 = FunctionName{valuer.NewString("ewma7")} FunctionNameEWMA7 = FunctionName{valuer.NewString("ewma7")}
FunctionNameMedian3 = FunctionName{valuer.NewString("median3")} FunctionNameMedian3 = FunctionName{valuer.NewString("median3")}
FunctionNameMedian5 = FunctionName{valuer.NewString("median5")} FunctionNameMedian5 = FunctionName{valuer.NewString("median5")}
FunctionNameMedian7 = FunctionName{valuer.NewString("median7")} FunctionNameMedian7 = FunctionName{valuer.NewString("median7")}
FunctionNameTimeShift = FunctionName{valuer.NewString("time_shift")} FunctionNameTimeShift = FunctionName{valuer.NewString("timeShift")}
FunctionNameAnomaly = FunctionName{valuer.NewString("anomaly")} FunctionNameAnomaly = FunctionName{valuer.NewString("anomaly")}
) )
// ApplyFunction applies the given function to the result data // ApplyFunction applies the given function to the result data
func ApplyFunction(fn Function, result *Result) *Result { func ApplyFunction(fn Function, result *TimeSeries) *TimeSeries {
// Extract the function name and arguments // Extract the function name and arguments
name := fn.Name name := fn.Name
args := fn.Args args := fn.Args
@ -86,7 +86,8 @@ func ApplyFunction(fn Function, result *Result) *Result {
} }
return funcTimeShift(result, shift) return funcTimeShift(result, shift)
case FunctionNameAnomaly: case FunctionNameAnomaly:
// Placeholder for anomaly detection - would need more sophisticated implementation // Placeholder for anomaly detection as function that can be used in dashboards other than
// the anomaly alert
return result return result
} }
return result return result
@ -98,10 +99,7 @@ func parseFloat64Arg(value string) (float64, error) {
} }
// getEWMAAlpha calculates the alpha value for EWMA functions // getEWMAAlpha calculates the alpha value for EWMA functions
func getEWMAAlpha(name FunctionName, args []struct { func getEWMAAlpha(name FunctionName, args []FunctionArg) float64 {
Name string `json:"name,omitempty"`
Value string `json:"value"`
}) float64 {
// Try to get alpha from arguments first // Try to get alpha from arguments first
if len(args) > 0 { if len(args) > 0 {
if alpha, err := parseFloat64Arg(args[0].Value); err == nil { if alpha, err := parseFloat64Arg(args[0].Value); err == nil {
@ -122,324 +120,196 @@ func getEWMAAlpha(name FunctionName, args []struct {
} }
// funcCutOffMin cuts off values below the threshold and replaces them with NaN // funcCutOffMin cuts off values below the threshold and replaces them with NaN
func funcCutOffMin(result *Result, threshold float64) *Result { func funcCutOffMin(result *TimeSeries, threshold float64) *TimeSeries {
if result.Type != RequestTypeTimeSeries { for idx, point := range result.Values {
return result if point.Value < threshold {
} point.Value = math.NaN()
timeSeriesData, ok := result.Value.(*TimeSeriesData)
if !ok {
return result
}
for _, aggregation := range timeSeriesData.Aggregations {
for _, series := range aggregation.Series {
for idx, point := range series.Values {
if point.Value < threshold {
point.Value = math.NaN()
}
series.Values[idx] = point
}
} }
result.Values[idx] = point
} }
return result return result
} }
// funcCutOffMax cuts off values above the threshold and replaces them with NaN // funcCutOffMax cuts off values above the threshold and replaces them with NaN
func funcCutOffMax(result *Result, threshold float64) *Result { func funcCutOffMax(result *TimeSeries, threshold float64) *TimeSeries {
if result.Type != RequestTypeTimeSeries { for idx, point := range result.Values {
return result if point.Value > threshold {
} point.Value = math.NaN()
timeSeriesData, ok := result.Value.(*TimeSeriesData)
if !ok {
return result
}
for _, aggregation := range timeSeriesData.Aggregations {
for _, series := range aggregation.Series {
for idx, point := range series.Values {
if point.Value > threshold {
point.Value = math.NaN()
}
series.Values[idx] = point
}
} }
result.Values[idx] = point
} }
return result return result
} }
// funcClampMin cuts off values below the threshold and replaces them with the threshold // funcClampMin cuts off values below the threshold and replaces them with the threshold
func funcClampMin(result *Result, threshold float64) *Result { func funcClampMin(result *TimeSeries, threshold float64) *TimeSeries {
if result.Type != RequestTypeTimeSeries { for idx, point := range result.Values {
return result if point.Value < threshold {
} point.Value = threshold
timeSeriesData, ok := result.Value.(*TimeSeriesData)
if !ok {
return result
}
for _, aggregation := range timeSeriesData.Aggregations {
for _, series := range aggregation.Series {
for idx, point := range series.Values {
if point.Value < threshold {
point.Value = threshold
}
series.Values[idx] = point
}
} }
result.Values[idx] = point
} }
return result return result
} }
// funcClampMax cuts off values above the threshold and replaces them with the threshold // funcClampMax cuts off values above the threshold and replaces them with the threshold
func funcClampMax(result *Result, threshold float64) *Result { func funcClampMax(result *TimeSeries, threshold float64) *TimeSeries {
if result.Type != RequestTypeTimeSeries { for idx, point := range result.Values {
return result if point.Value > threshold {
} point.Value = threshold
timeSeriesData, ok := result.Value.(*TimeSeriesData)
if !ok {
return result
}
for _, aggregation := range timeSeriesData.Aggregations {
for _, series := range aggregation.Series {
for idx, point := range series.Values {
if point.Value > threshold {
point.Value = threshold
}
series.Values[idx] = point
}
} }
result.Values[idx] = point
} }
return result return result
} }
// funcAbsolute returns the absolute value of each point // funcAbsolute returns the absolute value of each point
func funcAbsolute(result *Result) *Result { func funcAbsolute(result *TimeSeries) *TimeSeries {
if result.Type != RequestTypeTimeSeries { for idx, point := range result.Values {
return result point.Value = math.Abs(point.Value)
} result.Values[idx] = point
timeSeriesData, ok := result.Value.(*TimeSeriesData)
if !ok {
return result
}
for _, aggregation := range timeSeriesData.Aggregations {
for _, series := range aggregation.Series {
for idx, point := range series.Values {
point.Value = math.Abs(point.Value)
series.Values[idx] = point
}
}
} }
return result return result
} }
// funcRunningDiff returns the running difference of each point // funcRunningDiff returns the running difference of each point
func funcRunningDiff(result *Result) *Result { func funcRunningDiff(result *TimeSeries) *TimeSeries {
if result.Type != RequestTypeTimeSeries { // iterate over the points in reverse order
return result for idx := len(result.Values) - 1; idx >= 0; idx-- {
} if idx > 0 {
result.Values[idx].Value = result.Values[idx].Value - result.Values[idx-1].Value
timeSeriesData, ok := result.Value.(*TimeSeriesData)
if !ok {
return result
}
for _, aggregation := range timeSeriesData.Aggregations {
for _, series := range aggregation.Series {
// iterate over the points in reverse order
for idx := len(series.Values) - 1; idx >= 0; idx-- {
if idx > 0 {
series.Values[idx].Value = series.Values[idx].Value - series.Values[idx-1].Value
}
}
// remove the first point
if len(series.Values) > 0 {
series.Values = series.Values[1:]
}
} }
} }
// remove the first point
result.Values = result.Values[1:]
return result return result
} }
// funcLog2 returns the log2 of each point // funcLog2 returns the log2 of each point
func funcLog2(result *Result) *Result { func funcLog2(result *TimeSeries) *TimeSeries {
if result.Type != RequestTypeTimeSeries { for idx, point := range result.Values {
return result point.Value = math.Log2(point.Value)
} result.Values[idx] = point
timeSeriesData, ok := result.Value.(*TimeSeriesData)
if !ok {
return result
}
for _, aggregation := range timeSeriesData.Aggregations {
for _, series := range aggregation.Series {
for idx, point := range series.Values {
point.Value = math.Log2(point.Value)
series.Values[idx] = point
}
}
} }
return result return result
} }
// funcLog10 returns the log10 of each point // funcLog10 returns the log10 of each point
func funcLog10(result *Result) *Result { func funcLog10(result *TimeSeries) *TimeSeries {
if result.Type != RequestTypeTimeSeries { for idx, point := range result.Values {
return result point.Value = math.Log10(point.Value)
} result.Values[idx] = point
timeSeriesData, ok := result.Value.(*TimeSeriesData)
if !ok {
return result
}
for _, aggregation := range timeSeriesData.Aggregations {
for _, series := range aggregation.Series {
for idx, point := range series.Values {
point.Value = math.Log10(point.Value)
series.Values[idx] = point
}
}
} }
return result return result
} }
// funcCumulativeSum returns the cumulative sum for each point in a series // funcCumulativeSum returns the cumulative sum for each point in a series
func funcCumulativeSum(result *Result) *Result { func funcCumulativeSum(result *TimeSeries) *TimeSeries {
if result.Type != RequestTypeTimeSeries { var sum float64
return result for idx, point := range result.Values {
} if !math.IsNaN(point.Value) {
sum += point.Value
timeSeriesData, ok := result.Value.(*TimeSeriesData)
if !ok {
return result
}
for _, aggregation := range timeSeriesData.Aggregations {
for _, series := range aggregation.Series {
var sum float64
for idx, point := range series.Values {
if !math.IsNaN(point.Value) {
sum += point.Value
}
point.Value = sum
series.Values[idx] = point
}
} }
point.Value = sum
result.Values[idx] = point
} }
return result return result
} }
// funcEWMA calculates the Exponentially Weighted Moving Average // funcEWMA calculates the Exponentially Weighted Moving Average
func funcEWMA(result *Result, alpha float64) *Result { func funcEWMA(result *TimeSeries, alpha float64) *TimeSeries {
if result.Type != RequestTypeTimeSeries { var ewma float64
return result var initialized bool
}
timeSeriesData, ok := result.Value.(*TimeSeriesData) for i, point := range result.Values {
if !ok { if !initialized {
return result if !math.IsNaN(point.Value) {
} // Initialize EWMA with the first non-NaN value
ewma = point.Value
for _, aggregation := range timeSeriesData.Aggregations { initialized = true
for _, series := range aggregation.Series {
var ewma float64
var initialized bool
for i, point := range series.Values {
if !initialized {
if !math.IsNaN(point.Value) {
// Initialize EWMA with the first non-NaN value
ewma = point.Value
initialized = true
}
// Continue until the EWMA is initialized
continue
}
if !math.IsNaN(point.Value) {
// Update EWMA with the current value
ewma = alpha*point.Value + (1-alpha)*ewma
}
// Set the EWMA value for the current point
series.Values[i].Value = ewma
} }
// Continue until the EWMA is initialized
continue
} }
if !math.IsNaN(point.Value) {
// Update EWMA with the current value
ewma = alpha*point.Value + (1-alpha)*ewma
}
// Set the EWMA value for the current point
result.Values[i].Value = ewma
} }
return result return result
} }
// funcMedian3 returns the median of 3 points for each point in a series // funcMedian3 returns the median of 3 points for each point in a series
func funcMedian3(result *Result) *Result { func funcMedian3(result *TimeSeries) *TimeSeries {
return funcMedianN(result, 3) return funcMedianN(result, 3)
} }
// funcMedian5 returns the median of 5 points for each point in a series // funcMedian5 returns the median of 5 points for each point in a series
func funcMedian5(result *Result) *Result { func funcMedian5(result *TimeSeries) *TimeSeries {
return funcMedianN(result, 5) return funcMedianN(result, 5)
} }
// funcMedian7 returns the median of 7 points for each point in a series // funcMedian7 returns the median of 7 points for each point in a series
func funcMedian7(result *Result) *Result { func funcMedian7(result *TimeSeries) *TimeSeries {
return funcMedianN(result, 7) return funcMedianN(result, 7)
} }
// funcMedianN returns the median of N points for each point in a series // funcMedianN returns the median of N points for each point in a series
func funcMedianN(result *Result, n int) *Result { func funcMedianN(result *TimeSeries, n int) *TimeSeries {
if result.Type != RequestTypeTimeSeries { if len(result.Values) == 0 {
return result return result
} }
timeSeriesData, ok := result.Value.(*TimeSeriesData) // For series shorter than window size, return original values
if !ok { if len(result.Values) < n {
return result return result
} }
halfWindow := n / 2 halfWindow := n / 2
newValues := make([]*TimeSeriesValue, len(result.Values))
for _, aggregation := range timeSeriesData.Aggregations { // Copy edge values that can't have a full window
for _, series := range aggregation.Series { for i := 0; i < halfWindow; i++ {
medianValues := make([]*TimeSeriesValue, 0) newValues[i] = &TimeSeriesValue{
Timestamp: result.Values[i].Timestamp,
for i := halfWindow; i < len(series.Values)-halfWindow; i++ { Value: result.Values[i].Value,
values := make([]float64, 0, n)
// Add non-NaN values to the slice
for j := -halfWindow; j <= halfWindow; j++ {
if !math.IsNaN(series.Values[i+j].Value) {
values = append(values, series.Values[i+j].Value)
}
}
// Create a new point with median value
newPoint := &TimeSeriesValue{
Timestamp: series.Values[i].Timestamp,
}
// Handle the case where there are not enough values to calculate a median
if len(values) == 0 {
newPoint.Value = math.NaN()
} else {
newPoint.Value = median(values)
}
medianValues = append(medianValues, newPoint)
}
// Replace the series values with median values
// Keep the original edge points unchanged
for i := halfWindow; i < len(series.Values)-halfWindow; i++ {
series.Values[i] = medianValues[i-halfWindow]
}
} }
} }
for i := len(result.Values) - halfWindow; i < len(result.Values); i++ {
newValues[i] = &TimeSeriesValue{
Timestamp: result.Values[i].Timestamp,
Value: result.Values[i].Value,
}
}
// Calculate median for points that have a full window
for i := halfWindow; i < len(result.Values)-halfWindow; i++ {
values := make([]float64, 0, n)
// Add non-NaN values to the slice
for j := -halfWindow; j <= halfWindow; j++ {
if !math.IsNaN(result.Values[i+j].Value) {
values = append(values, result.Values[i+j].Value)
}
}
newValues[i] = &TimeSeriesValue{
Timestamp: result.Values[i].Timestamp,
}
// Handle the case where there are not enough values to calculate a median
if len(values) == 0 {
newValues[i].Value = math.NaN()
} else {
newValues[i].Value = median(values)
}
}
result.Values = newValues
return result return result
} }
@ -458,30 +328,19 @@ func median(values []float64) float64 {
} }
// funcTimeShift shifts all timestamps by the given amount (in seconds) // funcTimeShift shifts all timestamps by the given amount (in seconds)
func funcTimeShift(result *Result, shift float64) *Result { func funcTimeShift(result *TimeSeries, shift float64) *TimeSeries {
if result.Type != RequestTypeTimeSeries {
return result
}
timeSeriesData, ok := result.Value.(*TimeSeriesData)
if !ok {
return result
}
shiftMs := int64(shift * 1000) // Convert seconds to milliseconds shiftMs := int64(shift * 1000) // Convert seconds to milliseconds
for _, aggregation := range timeSeriesData.Aggregations { for idx, point := range result.Values {
for _, series := range aggregation.Series { point.Timestamp = point.Timestamp + shiftMs
for idx, point := range series.Values { result.Values[idx] = point
series.Values[idx].Timestamp = point.Timestamp + shiftMs
}
}
} }
return result return result
} }
// ApplyFunctions applies a list of functions sequentially to the result // ApplyFunctions applies a list of functions sequentially to the result
func ApplyFunctions(functions []Function, result *Result) *Result { func ApplyFunctions(functions []Function, result *TimeSeries) *TimeSeries {
for _, fn := range functions { for _, fn := range functions {
result = ApplyFunction(fn, result) result = ApplyFunction(fn, result)
} }

View File

@ -6,7 +6,7 @@ import (
) )
// Helper function to create test time series data // Helper function to create test time series data
func createTestTimeSeriesData(values []float64) *Result { func createTestTimeSeriesData(values []float64) *TimeSeries {
timeSeriesValues := make([]*TimeSeriesValue, len(values)) timeSeriesValues := make([]*TimeSeriesValue, len(values))
for i, val := range values { for i, val := range values {
timeSeriesValues[i] = &TimeSeriesValue{ timeSeriesValues[i] = &TimeSeriesValue{
@ -19,33 +19,13 @@ func createTestTimeSeriesData(values []float64) *Result {
Values: timeSeriesValues, Values: timeSeriesValues,
} }
aggregation := &AggregationBucket{ return series
Index: 0,
Alias: "test",
Series: []*TimeSeries{series},
}
timeSeriesData := &TimeSeriesData{
QueryName: "test",
Aggregations: []*AggregationBucket{aggregation},
}
return &Result{
Type: RequestTypeTimeSeries,
Value: timeSeriesData,
}
} }
// Helper function to extract values from result for comparison // Helper function to extract values from result for comparison
func extractValues(result *Result) []float64 { func extractValues(result *TimeSeries) []float64 {
timeSeriesData, ok := result.Value.(*TimeSeriesData) values := make([]float64, len(result.Values))
if !ok || len(timeSeriesData.Aggregations) == 0 || len(timeSeriesData.Aggregations[0].Series) == 0 { for i, point := range result.Values {
return nil
}
series := timeSeriesData.Aggregations[0].Series[0]
values := make([]float64, len(series.Values))
for i, point := range series.Values {
values[i] = point.Value values[i] = point.Value
} }
return values return values
@ -578,13 +558,13 @@ func TestFuncTimeShift(t *testing.T) {
name: "test funcTimeShift positive", name: "test funcTimeShift positive",
values: []float64{1, 2, 3}, values: []float64{1, 2, 3},
shift: 5.0, // 5 seconds shift: 5.0, // 5 seconds
want: []int64{6000, 7000, 8000}, // original timestamps (1,2,3) + 5000ms want: []int64{5001, 5002, 5003}, // original timestamps (1,2,3) + 5000ms
}, },
{ {
name: "test funcTimeShift negative", name: "test funcTimeShift negative",
values: []float64{1, 2, 3}, values: []float64{1, 2, 3},
shift: -2.0, // -2 seconds shift: -2.0, // -2 seconds
want: []int64{-1000, 0, 1000}, // original timestamps (1,2,3) - 2000ms want: []int64{-1999, -1998, -1997}, // original timestamps (1,2,3) - 2000ms
}, },
} }
@ -593,15 +573,8 @@ func TestFuncTimeShift(t *testing.T) {
result := createTestTimeSeriesData(tt.values) result := createTestTimeSeriesData(tt.values)
newResult := funcTimeShift(result, tt.shift) newResult := funcTimeShift(result, tt.shift)
timeSeriesData, ok := newResult.Value.(*TimeSeriesData) got := make([]int64, len(newResult.Values))
if !ok { for i, point := range newResult.Values {
t.Errorf("funcTimeShift() failed to get time series data")
return
}
series := timeSeriesData.Aggregations[0].Series[0]
got := make([]int64, len(series.Values))
for i, point := range series.Values {
got[i] = point.Timestamp got[i] = point.Timestamp
} }
@ -630,10 +603,7 @@ func TestApplyFunction(t *testing.T) {
name: "cutOffMin function", name: "cutOffMin function",
function: Function{ function: Function{
Name: FunctionNameCutOffMin, Name: FunctionNameCutOffMin,
Args: []struct { Args: []FunctionArg{
Name string `json:"name,omitempty"`
Value string `json:"value"`
}{
{Value: "0.3"}, {Value: "0.3"},
}, },
}, },
@ -680,10 +650,7 @@ func TestApplyFunctions(t *testing.T) {
functions := []Function{ functions := []Function{
{ {
Name: FunctionNameCutOffMin, Name: FunctionNameCutOffMin,
Args: []struct { Args: []FunctionArg{
Name string `json:"name,omitempty"`
Value string `json:"value"`
}{
{Value: "0.3"}, {Value: "0.3"},
}, },
}, },

View File

@ -250,10 +250,7 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
Expression: "A / B * 100", Expression: "A / B * 100",
Functions: []Function{{ Functions: []Function{{
Name: FunctionNameAbsolute, Name: FunctionNameAbsolute,
Args: []struct { Args: []FunctionArg{},
Name string `json:"name,omitempty"`
Value string `json:"value"`
}{},
}}, }},
}, },
}}, }},
@ -261,6 +258,53 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
}, },
wantErr: false, wantErr: false,
}, },
{
name: "function cut off min",
jsonData: `{
"schemaVersion": "v1",
"start": 1640995200000,
"end": 1640998800000,
"requestType": "time_series",
"compositeQuery": {
"queries": [{
"name": "F1",
"type": "builder_formula",
"spec": {
"name": "error_rate",
"expression": "A / B * 100",
"functions": [{
"name": "cut_off_min",
"args": [{
"value": "0.3"
}]
}]
}
}]
}
}`,
expected: QueryRangeRequest{
SchemaVersion: "v1",
Start: 1640995200000,
End: 1640998800000,
RequestType: RequestTypeTimeSeries,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{{
Name: "F1",
Type: QueryTypeFormula,
Spec: QueryBuilderFormula{
Name: "error_rate",
Expression: "A / B * 100",
Functions: []Function{{
Name: FunctionNameCutOffMin,
Args: []FunctionArg{{
Value: "0.3",
}},
}},
},
}},
},
},
},
{ {
name: "valid join query", name: "valid join query",
jsonData: `{ jsonData: `{