From fa4e4e77c4e99ab1b745844b4ce5e7852e37d299 Mon Sep 17 00:00:00 2001 From: srikanthccv Date: Tue, 3 Jun 2025 01:14:40 +0530 Subject: [PATCH] chore: update formula --- .../querybuildertypesv5/formula.go | 510 +++++++---- .../querybuildertypesv5/formula_test.go | 865 ++++++++++++++++++ 2 files changed, 1188 insertions(+), 187 deletions(-) create mode 100644 pkg/types/querybuildertypes/querybuildertypesv5/formula_test.go diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/formula.go b/pkg/types/querybuildertypes/querybuildertypesv5/formula.go index 65123f0d7b..f87e9ab996 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/formula.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/formula.go @@ -4,11 +4,15 @@ import ( "fmt" "math" "sort" + "strconv" + "strings" + "sync" "time" + "slices" + "github.com/SigNoz/govaluate" "github.com/SigNoz/signoz/pkg/errors" - "github.com/SigNoz/signoz/pkg/types/telemetrytypes" ) type QueryBuilderFormula struct { @@ -21,107 +25,233 @@ type QueryBuilderFormula struct { Functions []Function `json:"functions,omitempty"` } -// FormulaEvaluator handles formula evaluation for QBv5 types -type FormulaEvaluator struct { - expression *govaluate.EvaluableExpression - canDefaultZero map[string]bool - functions map[string]govaluate.ExpressionFunction +type aggregationRef struct { + QueryName string + Index *int // Index-based reference (e.g., A.0) + Alias *string // Alias-based reference (e.g., A.my_alias) } -// NewFormulaEvaluator creates a new formula evaluator +// seriesLookup provides lookup for series data +type seriesLookup struct { + // seriesKey -> timestamp -> value + data map[string]map[int64]float64 + // seriesKey -> original series for metadata preservation + seriesMetadata map[string]*TimeSeries +} + +// FormulaEvaluator handles formula evaluation b/w time series from different aggregations +type FormulaEvaluator struct { + expression *govaluate.EvaluableExpression + variables []string + canDefaultZero map[string]bool + + // Parsed aggregation references from variables + aggRefs map[string]aggregationRef + + timestampPool sync.Pool + valuesPool sync.Pool +} + +// NewFormulaEvaluator creates a formula evaluator func NewFormulaEvaluator(expressionStr string, canDefaultZero map[string]bool) (*FormulaEvaluator, error) { functions := EvalFuncs() expression, err := govaluate.NewEvaluableExpressionWithFunctions(expressionStr, functions) if err != nil { - return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "failed to parse expression: %s, error: %s", expressionStr, err.Error()) + return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "failed to parse expression") } - return &FormulaEvaluator{ + evaluator := &FormulaEvaluator{ expression: expression, + variables: expression.Vars(), canDefaultZero: canDefaultZero, - functions: functions, - }, nil -} + aggRefs: make(map[string]aggregationRef), + } -// EvaluateFormula processes multiple time series data and evaluates the formula -func (fe *FormulaEvaluator) EvaluateFormula(timeSeriesData map[string]*TimeSeriesData) (*TimeSeriesData, error) { - // Convert TimeSeriesData to a flattened series map for processing - allSeries := fe.flattenTimeSeriesData(timeSeriesData) - - // Find unique label sets for formula evaluation - uniqueLabelSets := fe.findUniqueLabelSets(allSeries) - - // Process each unique label set - var resultSeries []*TimeSeries - for _, labelSet := range uniqueLabelSets { - series, err := fe.joinAndCalculate(allSeries, labelSet) + // Parse aggregation references from variables + for _, variable := range evaluator.variables { + aggRef, err := parseAggregationReference(variable) if err != nil { return nil, err } - if series != nil && len(series.Values) > 0 { - resultSeries = append(resultSeries, series) - } + evaluator.aggRefs[variable] = aggRef } - return &TimeSeriesData{ - QueryName: "formula", - Aggregations: []*AggregationBucket{ - { - Index: 0, - Alias: "formula_result", - Series: resultSeries, - }, - }, - }, nil + evaluator.timestampPool.New = func() any { + return make([]int64, 0, 1000) + } + evaluator.valuesPool.New = func() any { + return make(map[string]any, len(evaluator.variables)) + } + + return evaluator, nil } -// flattenTimeSeriesData converts map of TimeSeriesData to a flat map of series by query name -func (fe *FormulaEvaluator) flattenTimeSeriesData(timeSeriesData map[string]*TimeSeriesData) map[string][]*TimeSeries { - result := make(map[string][]*TimeSeries) +// parseAggregationReference parses variable names like "A", "A.0", "A.my_alias" +func parseAggregationReference(variable string) (aggregationRef, error) { + parts := strings.Split(variable, ".") - for queryName, data := range timeSeriesData { - var allSeries []*TimeSeries - for _, bucket := range data.Aggregations { - allSeries = append(allSeries, bucket.Series...) - } - result[queryName] = allSeries + if len(parts) == 1 { + // Simple query reference like "A" - defaults to first aggregation (index 0) + defaultIndex := 0 + return aggregationRef{ + QueryName: parts[0], + Index: &defaultIndex, + }, nil } - return result + if len(parts) == 2 { + queryName := parts[0] + reference := parts[1] + + // Try to parse as index + if index, err := strconv.Atoi(reference); err == nil { + return aggregationRef{ + QueryName: queryName, + Index: &index, + }, nil + } + + // Otherwise treat as alias + return aggregationRef{ + QueryName: queryName, + Alias: &reference, + }, nil + } + + return aggregationRef{}, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid aggregation reference %q", variable) } -// findUniqueLabelSets finds all unique label combinations across series that are referenced in the expression -func (fe *FormulaEvaluator) findUniqueLabelSets(allSeries map[string][]*TimeSeries) []map[string]string { - queriesInExpression := make(map[string]struct{}) - for _, v := range fe.expression.Vars() { - queriesInExpression[v] = struct{}{} +// EvaluateFormula processes multiple time series with proper aggregation handling +func (fe *FormulaEvaluator) EvaluateFormula(timeSeriesData map[string]*TimeSeriesData) ([]*TimeSeries, error) { + // Build lookup structures for all referenced aggregations + lookup := fe.buildSeriesLookup(timeSeriesData) + + // Find all unique label combinations across referenced series + uniqueLabelSets := fe.findUniqueLabelSets(lookup) + + // Process each unique label set + var resultSeries []*TimeSeries + var wg sync.WaitGroup + resultChan := make(chan *TimeSeries, len(uniqueLabelSets)) + semaphore := make(chan struct{}, 4) // Limit concurrency + + for _, labelSet := range uniqueLabelSets { + wg.Add(1) + go func(labels []*Label) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + series := fe.evaluateForLabelSet(labels, lookup) + if series != nil && len(series.Values) > 0 { + resultChan <- series + } + }(labelSet) } - var allLabelSets []map[string]string + go func() { + wg.Wait() + close(resultChan) + }() - // Collect all label sets from series that are referenced in the expression - for queryName, series := range allSeries { - if _, ok := queriesInExpression[queryName]; !ok { + for series := range resultChan { + resultSeries = append(resultSeries, series) + } + + return resultSeries, nil +} + +// buildSeriesLookup creates lookup structure for all referenced aggregations +func (fe *FormulaEvaluator) buildSeriesLookup(timeSeriesData map[string]*TimeSeriesData) *seriesLookup { + lookup := &seriesLookup{ + data: make(map[string]map[int64]float64), + seriesMetadata: make(map[string]*TimeSeries), + } + + for variable, aggRef := range fe.aggRefs { + data, exists := timeSeriesData[aggRef.QueryName] + if !exists { continue } - for _, s := range series { - labelMap := fe.labelsToMap(s.Labels) - allLabelSets = append(allLabelSets, labelMap) + // Find the specific aggregation bucket + var targetBucket *AggregationBucket + for _, bucket := range data.Aggregations { + if aggRef.Index != nil && bucket.Index == *aggRef.Index { + targetBucket = bucket + break + } + if aggRef.Alias != nil && bucket.Alias == *aggRef.Alias { + targetBucket = bucket + break + } + } + + if targetBucket == nil { + continue + } + + // Process all series in the target bucket + for seriesIdx, series := range targetBucket.Series { + seriesKey := fe.buildSeriesKey(variable, seriesIdx, series.Labels) + + // Initialize timestamp map + if _, exists := lookup.data[seriesKey]; !exists { + lookup.data[seriesKey] = make(map[int64]float64, len(series.Values)) + lookup.seriesMetadata[seriesKey] = series + } + + // Store all timestamp-value pairs + for _, value := range series.Values { + lookup.data[seriesKey][value.Timestamp] = value.Value + } } } - // Sort by number of labels (descending) for subset detection optimization + return lookup +} + +// buildSeriesKey creates a unique key for a series within a specific aggregation +func (fe *FormulaEvaluator) buildSeriesKey(variable string, seriesIndex int, labels []*Label) string { + // Create a deterministic key that includes variable and label information + var keyParts []string + keyParts = append(keyParts, variable) + keyParts = append(keyParts, strconv.Itoa(seriesIndex)) + + // Sort labels by key name for consistent ordering + sortedLabels := make([]*Label, len(labels)) + copy(sortedLabels, labels) + sort.Slice(sortedLabels, func(i, j int) bool { + return sortedLabels[i].Key.Name < sortedLabels[j].Key.Name + }) + + for _, label := range sortedLabels { + keyParts = append(keyParts, fmt.Sprintf("%s=%v", label.Key.Name, label.Value)) + } + + return strings.Join(keyParts, "|") +} + +// findUniqueLabelSets finds all unique label combinations across all referenced series +func (fe *FormulaEvaluator) findUniqueLabelSets(lookup *seriesLookup) [][]*Label { + var allLabelSets [][]*Label + + // Collect all label sets from series metadata + for _, series := range lookup.seriesMetadata { + allLabelSets = append(allLabelSets, series.Labels) + } + + // sort the label sets by the number of labels in descending order sort.Slice(allLabelSets, func(i, j int) bool { return len(allLabelSets[i]) > len(allLabelSets[j]) }) - // Find unique label sets (remove subsets) - var uniqueSets []map[string]string + // Find unique label sets using proper label comparison + var uniqueSets [][]*Label for _, labelSet := range allLabelSets { isUnique := true - for _, uniqueLabelSet := range uniqueSets { - if fe.isSubset(uniqueLabelSet, labelSet) { + for _, uniqueSet := range uniqueSets { + if fe.isSubset(uniqueSet, labelSet) { isUnique = false break } @@ -134,92 +264,130 @@ func (fe *FormulaEvaluator) findUniqueLabelSets(allSeries map[string][]*TimeSeri return uniqueSets } -// joinAndCalculate joins series with matching labels and evaluates the formula at each timestamp -func (fe *FormulaEvaluator) joinAndCalculate(allSeries map[string][]*TimeSeries, uniqueLabelSet map[string]string) (*TimeSeries, error) { - // Map to store values: queryName -> timestamp -> value - seriesMap := make(map[string]map[int64]float64) - uniqueTimestamps := make(map[int64]struct{}) +func (fe *FormulaEvaluator) isSubset(labels1, labels2 []*Label) bool { + labelMap1 := make(map[string]any) + labelMap2 := make(map[string]any) - // Find matching series for each query and build lookup maps - for queryName, seriesList := range allSeries { - var matchingSeries *TimeSeries + for _, label := range labels1 { + labelMap1[label.Key.Name] = label.Value + } + for _, label := range labels2 { + labelMap2[label.Key.Name] = label.Value + } - // Find a series that matches the current label set - for _, series := range seriesList { - seriesLabelMap := fe.labelsToMap(series.Labels) - if fe.isSubset(uniqueLabelSet, seriesLabelMap) { - matchingSeries = series - break - } + for k, v := range labelMap2 { + if val, ok := labelMap1[k]; !ok || val != v { + return false } + } + return true +} - // Build timestamp -> value mapping for quick lookup - if matchingSeries != nil { - if _, ok := seriesMap[queryName]; !ok { - seriesMap[queryName] = make(map[int64]float64) - } +// labelsEqual compares two label sets for equality +func (fe *FormulaEvaluator) labelsEqual(labels1, labels2 []*Label) bool { + if len(labels1) != len(labels2) { + return false + } - for _, point := range matchingSeries.Values { - seriesMap[queryName][point.Timestamp] = point.Value - uniqueTimestamps[point.Timestamp] = struct{}{} + // Create maps for comparison + map1 := make(map[string]any) + map2 := make(map[string]any) + + for _, label := range labels1 { + map1[label.Key.Name] = label.Value + } + for _, label := range labels2 { + map2[label.Key.Name] = label.Value + } + + if len(map1) != len(map2) { + return false + } + + for k, v1 := range map1 { + if v2, exists := map2[k]; !exists || v1 != v2 { + return false + } + } + + return true +} + +// evaluateForLabelSet performs formula evaluation for a specific label set +func (fe *FormulaEvaluator) evaluateForLabelSet(targetLabels []*Label, lookup *seriesLookup) *TimeSeries { + // Find matching series for each variable + variableData := make(map[string]map[int64]float64) + var allTimestamps map[int64]struct{} = make(map[int64]struct{}) + + for variable := range fe.aggRefs { + // Find series with matching labels for this variable + for seriesKey, series := range lookup.seriesMetadata { + if strings.HasPrefix(seriesKey, variable+"|") && fe.isSubset(targetLabels, series.Labels) { + if timestampData, exists := lookup.data[seriesKey]; exists { + variableData[variable] = timestampData + // Collect all timestamps + for ts := range timestampData { + allTimestamps[ts] = struct{}{} + } + break // Found matching series for this variable + } } } } - // Convert unique timestamps to sorted slice - timestamps := make([]int64, 0, len(uniqueTimestamps)) - for timestamp := range uniqueTimestamps { - timestamps = append(timestamps, timestamp) + // Convert timestamps to sorted slice + timestamps := fe.timestampPool.Get().([]int64) + timestamps = timestamps[:0] + defer fe.timestampPool.Put(timestamps) + + for ts := range allTimestamps { + timestamps = append(timestamps, ts) } - sort.Slice(timestamps, func(i, j int) bool { - return timestamps[i] < timestamps[j] - }) + slices.Sort(timestamps) // Evaluate formula at each timestamp var resultValues []*TimeSeriesValue + values := fe.valuesPool.Get().(map[string]any) + defer fe.valuesPool.Put(values) + for _, timestamp := range timestamps { - values := make(map[string]interface{}) + // Clear previous values + for k := range values { + delete(values, k) + } // Collect values for this timestamp - for queryName, series := range seriesMap { - if value, ok := series[timestamp]; ok { - values[queryName] = value + validCount := 0 + for _, variable := range fe.variables { + if varData, exists := variableData[variable]; exists { + if value, exists := varData[timestamp]; exists { + values[variable] = value + validCount++ + } } } - // Set default zeros where allowed - for _, variable := range fe.expression.Vars() { - if _, ok := values[variable]; !ok && fe.canDefaultZero[variable] { + // Apply default zeros where allowed + for _, variable := range fe.variables { + if _, exists := values[variable]; !exists && fe.canDefaultZero[variable] { values[variable] = 0.0 + validCount++ } } - // Check if we have all required variables - canEvaluate := true - for _, variable := range fe.expression.Vars() { - if _, ok := values[variable]; !ok { - canEvaluate = false - break - } - } - - if !canEvaluate { + // Skip if we don't have all required variables + if validCount != len(fe.variables) { continue } - // Evaluate the expression + // Evaluate expression result, err := fe.expression.Evaluate(values) if err != nil { - return nil, fmt.Errorf("expression evaluation failed at timestamp %d: %w", timestamp, err) + continue } value, ok := result.(float64) - if !ok { - return nil, fmt.Errorf("expression result is not float64: %T", result) - } - - // Skip invalid values - if math.IsNaN(value) || math.IsInf(value, 0) { + if !ok || math.IsNaN(value) || math.IsInf(value, 0) { continue } @@ -229,129 +397,97 @@ func (fe *FormulaEvaluator) joinAndCalculate(allSeries map[string][]*TimeSeries, }) } - // Convert label map back to Label slice - resultLabels := fe.mapToLabels(uniqueLabelSet) + if len(resultValues) == 0 { + return nil + } + + // Preserve original label structure and metadata + resultLabels := make([]*Label, len(targetLabels)) + copy(resultLabels, targetLabels) return &TimeSeries{ Labels: resultLabels, Values: resultValues, - }, nil -} - -// Helper functions - -// isSubset checks if 'sub' is a subset of 'super' -func (fe *FormulaEvaluator) isSubset(super, sub map[string]string) bool { - for k, v := range sub { - if val, ok := super[k]; !ok || val != v { - return false - } } - return true } -// labelsToMap converts Label slice to map for easier comparison -func (fe *FormulaEvaluator) labelsToMap(labels []*Label) map[string]string { - result := make(map[string]string) - for _, label := range labels { - if strVal, ok := label.Value.(string); ok { - result[label.Key.Name] = strVal - } else { - result[label.Key.Name] = convertValueToString(label.Value) - } - } - return result -} - -// mapToLabels converts map back to Label slice -func (fe *FormulaEvaluator) mapToLabels(labelMap map[string]string) []*Label { - var labels []*Label - for key, value := range labelMap { - labels = append(labels, &Label{ - Key: telemetrytypes.TelemetryFieldKey{ - Name: key, - FieldDataType: telemetrytypes.FieldDataTypeString, - }, - Value: value, - }) - } - return labels -} - -// EvalFuncs returns the supported mathematical functions for formula evaluation +// EvalFuncs returns mathematical functions func EvalFuncs() map[string]govaluate.ExpressionFunction { funcs := make(map[string]govaluate.ExpressionFunction) + pi180 := math.Pi / 180 + rad180 := 180 / math.Pi + // Mathematical functions - funcs["exp"] = func(args ...interface{}) (interface{}, error) { + funcs["exp"] = func(args ...any) (any, error) { return math.Exp(args[0].(float64)), nil } - funcs["log"] = func(args ...interface{}) (interface{}, error) { + funcs["log"] = func(args ...any) (any, error) { return math.Log(args[0].(float64)), nil } - funcs["ln"] = func(args ...interface{}) (interface{}, error) { + funcs["ln"] = func(args ...any) (any, error) { return math.Log(args[0].(float64)), nil } - funcs["exp2"] = func(args ...interface{}) (interface{}, error) { + funcs["exp2"] = func(args ...any) (any, error) { return math.Exp2(args[0].(float64)), nil } - funcs["log2"] = func(args ...interface{}) (interface{}, error) { + funcs["log2"] = func(args ...any) (any, error) { return math.Log2(args[0].(float64)), nil } - funcs["exp10"] = func(args ...interface{}) (interface{}, error) { + funcs["exp10"] = func(args ...any) (any, error) { return math.Pow10(int(args[0].(float64))), nil } - funcs["log10"] = func(args ...interface{}) (interface{}, error) { + funcs["log10"] = func(args ...any) (any, error) { return math.Log10(args[0].(float64)), nil } - funcs["sqrt"] = func(args ...interface{}) (interface{}, error) { + funcs["sqrt"] = func(args ...any) (any, error) { return math.Sqrt(args[0].(float64)), nil } - funcs["cbrt"] = func(args ...interface{}) (interface{}, error) { + funcs["cbrt"] = func(args ...any) (any, error) { return math.Cbrt(args[0].(float64)), nil } - funcs["erf"] = func(args ...interface{}) (interface{}, error) { + funcs["erf"] = func(args ...any) (any, error) { return math.Erf(args[0].(float64)), nil } - funcs["erfc"] = func(args ...interface{}) (interface{}, error) { + funcs["erfc"] = func(args ...any) (any, error) { return math.Erfc(args[0].(float64)), nil } - funcs["lgamma"] = func(args ...interface{}) (interface{}, error) { + funcs["lgamma"] = func(args ...any) (any, error) { v, _ := math.Lgamma(args[0].(float64)) return v, nil } - funcs["tgamma"] = func(args ...interface{}) (interface{}, error) { + funcs["tgamma"] = func(args ...any) (any, error) { return math.Gamma(args[0].(float64)), nil } // Trigonometric functions - funcs["sin"] = func(args ...interface{}) (interface{}, error) { + funcs["sin"] = func(args ...any) (any, error) { return math.Sin(args[0].(float64)), nil } - funcs["cos"] = func(args ...interface{}) (interface{}, error) { + funcs["cos"] = func(args ...any) (any, error) { return math.Cos(args[0].(float64)), nil } - funcs["tan"] = func(args ...interface{}) (interface{}, error) { + funcs["tan"] = func(args ...any) (any, error) { return math.Tan(args[0].(float64)), nil } - funcs["asin"] = func(args ...interface{}) (interface{}, error) { + funcs["asin"] = func(args ...any) (any, error) { return math.Asin(args[0].(float64)), nil } - funcs["acos"] = func(args ...interface{}) (interface{}, error) { + funcs["acos"] = func(args ...any) (any, error) { return math.Acos(args[0].(float64)), nil } - funcs["atan"] = func(args ...interface{}) (interface{}, error) { + funcs["atan"] = func(args ...any) (any, error) { return math.Atan(args[0].(float64)), nil } - // Utility functions - funcs["degrees"] = func(args ...interface{}) (interface{}, error) { - return args[0].(float64) * 180 / math.Pi, nil + // Utility functions (optimized with pre-computed constants) + funcs["degrees"] = func(args ...any) (any, error) { + return args[0].(float64) * rad180, nil } - funcs["radians"] = func(args ...interface{}) (interface{}, error) { - return args[0].(float64) * math.Pi / 180, nil + funcs["radians"] = func(args ...any) (any, error) { + return args[0].(float64) * pi180, nil } - funcs["now"] = func(args ...interface{}) (interface{}, error) { + funcs["now"] = func(args ...any) (any, error) { return float64(time.Now().Unix()), nil } diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/formula_test.go b/pkg/types/querybuildertypes/querybuildertypesv5/formula_test.go new file mode 100644 index 0000000000..74361619b2 --- /dev/null +++ b/pkg/types/querybuildertypes/querybuildertypesv5/formula_test.go @@ -0,0 +1,865 @@ +package querybuildertypesv5 + +import ( + "testing" + + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func createFormulaTestTimeSeriesData(queryName string, series []*TimeSeries) *TimeSeriesData { + return &TimeSeriesData{ + QueryName: queryName, + Aggregations: []*AggregationBucket{ + { + Index: 0, + Alias: queryName + "_agg", + Series: series, + }, + }, + } +} + +func createLabels(labelMap map[string]string) []*Label { + var labels []*Label + for key, value := range labelMap { + labels = append(labels, &Label{ + Key: telemetrytypes.TelemetryFieldKey{ + Name: key, + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + Value: value, + }) + } + return labels +} + +func createValues(points map[int64]float64) []*TimeSeriesValue { + var values []*TimeSeriesValue + for timestamp, value := range points { + values = append(values, &TimeSeriesValue{ + Timestamp: timestamp, + Value: value, + }) + } + return values +} + +func TestFindUniqueLabelSets(t *testing.T) { + tests := []struct { + name string + tsData map[string]*TimeSeriesData + expression string + expected int // number of unique label sets + }{ + { + name: "two distinct label sets", + tsData: map[string]*TimeSeriesData{ + "A": createFormulaTestTimeSeriesData("A", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "frontend", + "operation": "GET /api", + }), + Values: createValues(map[int64]float64{1: 10}), + }, + }), + "B": createFormulaTestTimeSeriesData("B", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "redis", + }), + Values: createValues(map[int64]float64{1: 30}), + }, + }), + }, + expression: "A + B", + expected: 2, + }, + { + name: "subset elimination test", + tsData: map[string]*TimeSeriesData{ + "A": createFormulaTestTimeSeriesData("A", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "frontend", + "operation": "GET /api", + }), + Values: createValues(map[int64]float64{1: 10}), + }, + }), + "B": createFormulaTestTimeSeriesData("B", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "frontend", + }), + Values: createValues(map[int64]float64{1: 30}), + }, + }), + "C": createFormulaTestTimeSeriesData("C", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "operation": "PUT /api", + }), + Values: createValues(map[int64]float64{1: 30}), + }, + }), + "D": createFormulaTestTimeSeriesData("D", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "frontend", + "http_status": "200", + }), + Values: createValues(map[int64]float64{1: 30}), + }, + }), + }, + expression: "A + B + C + D", + expected: 3, // Three unique label sets after subset elimination + }, + { + name: "empty series", + tsData: map[string]*TimeSeriesData{ + "A": createFormulaTestTimeSeriesData("A", []*TimeSeries{}), + "B": createFormulaTestTimeSeriesData("B", []*TimeSeries{}), + }, + expression: "A + B", + expected: 0, + }, + { + name: "overlapping labels", + tsData: map[string]*TimeSeriesData{ + "A": createFormulaTestTimeSeriesData("A", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "frontend", + "operation": "GET /api", + }), + Values: createValues(map[int64]float64{1: 10}), + }, + { + Labels: createLabels(map[string]string{ + "service_name": "redis", + "operation": "GET /api", + }), + Values: createValues(map[int64]float64{1: 12}), + }, + }), + "B": createFormulaTestTimeSeriesData("B", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "redis", + }), + Values: createValues(map[int64]float64{1: 30}), + }, + { + Labels: createLabels(map[string]string{ + "service_name": "frontend", + }), + Values: createValues(map[int64]float64{1: 25}), + }, + }), + }, + expression: "A + B", + expected: 2, // Two unique label sets after subset detection + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + evaluator, err := NewFormulaEvaluator(tt.expression, map[string]bool{"A": false, "B": false}) + require.NoError(t, err) + + lookup := evaluator.buildSeriesLookup(tt.tsData) + uniqueLabelSets := evaluator.findUniqueLabelSets(lookup) + + assert.Equal(t, tt.expected, len(uniqueLabelSets)) + }) + } +} + +func TestBasicFormulaEvaluation(t *testing.T) { + tests := []struct { + name string + tsData map[string]*TimeSeriesData + expression string + expected int // number of result series + }{ + { + name: "simple addition", + tsData: map[string]*TimeSeriesData{ + "A": createFormulaTestTimeSeriesData("A", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "frontend", + "operation": "GET /api", + }), + Values: createValues(map[int64]float64{ + 1: 10, + 2: 20, + }), + }, + }), + "B": createFormulaTestTimeSeriesData("B", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "redis", + }), + Values: createValues(map[int64]float64{ + 1: 30, + 3: 40, + }), + }, + }), + }, + expression: "A + B", + expected: 2, + }, + { + name: "division with zeros", + tsData: map[string]*TimeSeriesData{ + "A": createFormulaTestTimeSeriesData("A", []*TimeSeries{ + { + Labels: createLabels(map[string]string{}), + Values: createValues(map[int64]float64{ + 1: 10, + 2: 0, + }), + }, + }), + "B": createFormulaTestTimeSeriesData("B", []*TimeSeries{ + { + Labels: createLabels(map[string]string{}), + Values: createValues(map[int64]float64{ + 1: 0, + 3: 10, + }), + }, + }), + }, + expression: "A/B", + expected: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + evaluator, err := NewFormulaEvaluator(tt.expression, map[string]bool{"A": true, "B": true}) + require.NoError(t, err) + + result, err := evaluator.EvaluateFormula(tt.tsData) + require.NoError(t, err) + require.NotNil(t, result) + + assert.Equal(t, tt.expected, len(result)) + }) + } +} + +func TestErrorRateCalculation(t *testing.T) { + tsData := map[string]*TimeSeriesData{ + "A": createFormulaTestTimeSeriesData("A", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "frontend", + }), + Values: createValues(map[int64]float64{ + 1: 10, + 2: 20, + }), + }, + { + Labels: createLabels(map[string]string{ + "service_name": "redis", + }), + Values: createValues(map[int64]float64{ + 1: 12, + 2: 45, + }), + }, + { + Labels: createLabels(map[string]string{ + "service_name": "route", + }), + Values: createValues(map[int64]float64{ + 1: 2, + 2: 45, + }), + }, + }), + "B": createFormulaTestTimeSeriesData("B", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "redis", + }), + Values: createValues(map[int64]float64{ + 1: 6, + 2: 9, + }), + }, + }), + } + + evaluator, err := NewFormulaEvaluator("B/A", map[string]bool{"A": true, "B": true}) + require.NoError(t, err) + + result, err := evaluator.EvaluateFormula(tsData) + require.NoError(t, err) + require.NotNil(t, result) + + // Should have 3 result series (frontend gets 0, redis gets calculated values, route gets 0) + assert.Equal(t, 3, len(result)) + + // Find the redis series and check its values + for _, series := range result { + for _, label := range series.Labels { + if label.Key.Name == "service_name" && label.Value == "redis" { + assert.Len(t, series.Values, 2) + assert.Equal(t, 0.5, series.Values[0].Value) // 6/12 + assert.Equal(t, 0.2, series.Values[1].Value) // 9/45 + } + } + } +} + +func TestNoGroupKeysOnLeftSide(t *testing.T) { + tsData := map[string]*TimeSeriesData{ + "A": createFormulaTestTimeSeriesData("A", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "frontend", + }), + Values: createValues(map[int64]float64{ + 1: 10, + 2: 20, + }), + }, + { + Labels: createLabels(map[string]string{ + "service_name": "redis", + }), + Values: createValues(map[int64]float64{ + 1: 12, + 2: 45, + }), + }, + }), + "B": createFormulaTestTimeSeriesData("B", []*TimeSeries{ + { + Labels: createLabels(map[string]string{}), + Values: createValues(map[int64]float64{ + 1: 22, + 2: 65, + }), + }, + }), + } + + evaluator, err := NewFormulaEvaluator("B/A", map[string]bool{"A": true, "B": true}) + require.NoError(t, err) + + result, err := evaluator.EvaluateFormula(tsData) + require.NoError(t, err) + require.NotNil(t, result) + + // Should have 2 result series (frontend and redis) + assert.Equal(t, 2, len(result)) + + // Verify calculations + expectedValues := map[string][]float64{ + "frontend": {2.2, 3.25}, // 22/10, 65/20 + "redis": {1.8333333333333333, 1.4444444444444444}, // 22/12, 65/45 + } + + for _, series := range result { + for _, label := range series.Labels { + if label.Key.Name == "service_name" { + serviceName := label.Value.(string) + if expected, exists := expectedValues[serviceName]; exists { + assert.Len(t, series.Values, len(expected)) + for i, expectedVal := range expected { + assert.InDelta(t, expectedVal, series.Values[i].Value, 0.0001) + } + } + } + } + } +} + +func TestSameGroupKeys(t *testing.T) { + tsData := map[string]*TimeSeriesData{ + "A": createFormulaTestTimeSeriesData("A", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "host_name": "ip-10-420-69-1", + "state": "running", + }), + Values: createValues(map[int64]float64{ + 1: 10, + 2: 20, + 4: 40, + 5: 50, + 7: 70, + }), + }, + { + Labels: createLabels(map[string]string{ + "host_name": "ip-10-420-69-2", + "state": "idle", + }), + Values: createValues(map[int64]float64{ + 1: 12, + 2: 45, + 3: 30, + 4: 40, + 5: 50, + }), + }, + }), + "B": createFormulaTestTimeSeriesData("B", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "host_name": "ip-10-420-69-1", + "state": "running", + }), + Values: createValues(map[int64]float64{ + 1: 22, + 2: 65, + 3: 30, + 4: 40, + 5: 50, + }), + }, + { + Labels: createLabels(map[string]string{ + "host_name": "ip-10-420-69-2", + "state": "idle", + }), + Values: createValues(map[int64]float64{ + 1: 22, + 2: 65, + 4: 40, + 5: 50, + }), + }, + }), + } + + evaluator, err := NewFormulaEvaluator("A/B", map[string]bool{"A": true, "B": true}) + require.NoError(t, err) + + result, err := evaluator.EvaluateFormula(tsData) + require.NoError(t, err) + require.NotNil(t, result) + + // Should have 2 result series + assert.Equal(t, 2, len(result)) + + // Verify that we get the expected calculations + for _, series := range result { + hostName := "" + state := "" + for _, label := range series.Labels { + if label.Key.Name == "host_name" { + hostName = label.Value.(string) + } + if label.Key.Name == "state" { + state = label.Value.(string) + } + } + + if hostName == "ip-10-420-69-1" && state == "running" { + // Check specific calculations + assert.Equal(t, float64(10)/float64(22), series.Values[0].Value) // timestamp 1 + assert.InDelta(t, 0.3076923076923077, series.Values[1].Value, 0.0001) // timestamp 2 + } + } +} + +func TestGroupKeysDifferentValues(t *testing.T) { + tsData := map[string]*TimeSeriesData{ + "A": createFormulaTestTimeSeriesData("A", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "host_name": "ip-10-420-69-1", + "state": "running", + }), + Values: createValues(map[int64]float64{ + 1: 10, + 2: 20, + 4: 40, + 5: 50, + 7: 70, + }), + }, + { + Labels: createLabels(map[string]string{ + "host_name": "ip-10-420-69-2", + "state": "idle", + }), + Values: createValues(map[int64]float64{ + 1: 12, + 2: 45, + 3: 30, + 4: 40, + 5: 50, + }), + }, + }), + "B": createFormulaTestTimeSeriesData("B", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "host_name": "ip-10-420-69-1", + "state": "not_running_chalamet", + }), + Values: createValues(map[int64]float64{ + 1: 22, + 2: 65, + 3: 30, + 4: 40, + 5: 50, + }), + }, + { + Labels: createLabels(map[string]string{ + "host_name": "ip-10-420-69-2", + "state": "busy", + }), + Values: createValues(map[int64]float64{ + 1: 22, + 2: 65, + 4: 40, + 5: 50, + }), + }, + }), + } + + evaluator, err := NewFormulaEvaluator("A/B", map[string]bool{"A": true, "B": true}) + require.NoError(t, err) + + result, err := evaluator.EvaluateFormula(tsData) + require.NoError(t, err) + require.NotNil(t, result) + + // Should have 2 result series with all zero values (no label matches) + assert.Equal(t, 2, len(result)) + + for _, series := range result { + for _, value := range series.Values { + assert.Equal(t, 0.0, value.Value) // All values should be 0 due to default zero + } + } +} + +func TestLeftSideSuperset(t *testing.T) { + tsData := map[string]*TimeSeriesData{ + "A": createFormulaTestTimeSeriesData("A", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "host_name": "ip-10-420-69-1", + "state": "running", + "os.type": "linux", + }), + Values: createValues(map[int64]float64{ + 1: 10, + 2: 20, + 4: 40, + 5: 50, + 7: 70, + }), + }, + { + Labels: createLabels(map[string]string{ + "host_name": "ip-10-420-69-2", + "state": "idle", + "os.type": "linux", + }), + Values: createValues(map[int64]float64{ + 1: 12, + 2: 45, + 3: 30, + 4: 40, + 5: 50, + }), + }, + }), + "B": createFormulaTestTimeSeriesData("B", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "state": "running", + "os.type": "linux", + }), + Values: createValues(map[int64]float64{ + 1: 22, + 2: 65, + 3: 30, + 4: 40, + 5: 50, + }), + }, + { + Labels: createLabels(map[string]string{ + "state": "busy", + "os.type": "linux", + }), + Values: createValues(map[int64]float64{ + 1: 22, + 2: 65, + 4: 40, + 5: 50, + }), + }, + }), + } + + evaluator, err := NewFormulaEvaluator("A/B", map[string]bool{"A": true, "B": true}) + require.NoError(t, err) + + result, err := evaluator.EvaluateFormula(tsData) + require.NoError(t, err) + require.NotNil(t, result) + + // Should have 2 result series + assert.Equal(t, 2, len(result)) + + // Find the running series and verify calculation + for _, series := range result { + hasRunning := false + hasHost := false + for _, label := range series.Labels { + if label.Key.Name == "state" && label.Value == "running" { + hasRunning = true + } + if label.Key.Name == "host_name" { + hasHost = true + } + } + + if hasRunning && hasHost { + // This should be the matched series + assert.Equal(t, float64(10)/float64(22), series.Values[0].Value) // timestamp 1 + assert.InDelta(t, 0.3076923076923077, series.Values[1].Value, 0.0001) // timestamp 2 + } + } +} + +func TestNoDefaultZero(t *testing.T) { + tsData := map[string]*TimeSeriesData{ + "A": createFormulaTestTimeSeriesData("A", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "frontend", + "operation": "GET /api", + }), + Values: createValues(map[int64]float64{ + 1: 10, + 2: 20, + }), + }, + }), + "B": createFormulaTestTimeSeriesData("B", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "redis", + }), + Values: createValues(map[int64]float64{ + 1: 30, + 3: 40, + }), + }, + }), + } + + // No default zero - should have no results since label sets don't match + evaluator, err := NewFormulaEvaluator("A + B", map[string]bool{"A": false, "B": false}) + require.NoError(t, err) + + result, err := evaluator.EvaluateFormula(tsData) + require.NoError(t, err) + + // Should have no result series since labels don't match and no default zero + assert.Equal(t, 0, len(result)) +} + +func TestMixedQueries(t *testing.T) { + tsData := map[string]*TimeSeriesData{ + "A": createFormulaTestTimeSeriesData("A", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "frontend", + "operation": "GET /api", + }), + Values: createValues(map[int64]float64{ + 1: 10, + 2: 20, + }), + }, + }), + "B": createFormulaTestTimeSeriesData("B", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "frontend", + "operation": "GET /api", + }), + Values: createValues(map[int64]float64{ + 1: 10, + 2: 20, + }), + }, + }), + "C": createFormulaTestTimeSeriesData("C", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "service_name": "redis", + }), + Values: createValues(map[int64]float64{ + 1: 30, + 2: 50, + 3: 45, + }), + }, + }), + } + + evaluator, err := NewFormulaEvaluator("A / B", map[string]bool{"A": true, "B": true, "C": true}) + require.NoError(t, err) + + result, err := evaluator.EvaluateFormula(tsData) + require.NoError(t, err) + require.NotNil(t, result) + + // Should have 1 result series (only A and B have matching labels) + assert.Equal(t, 1, len(result)) + + // Verify the result is A/B = 1 for matching timestamps + series := result[0] + assert.Len(t, series.Values, 2) + assert.Equal(t, 1.0, series.Values[0].Value) // 10/10 + assert.Equal(t, 1.0, series.Values[1].Value) // 20/20 +} + +func TestComplexExpression(t *testing.T) { + tsData := map[string]*TimeSeriesData{ + "A": createFormulaTestTimeSeriesData("A", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "state": "running", + }), + Values: createValues(map[int64]float64{ + 1: 10, + 2: 20, + 4: 40, + 5: 50, + 7: 70, + }), + }, + { + Labels: createLabels(map[string]string{ + "state": "idle", + }), + Values: createValues(map[int64]float64{ + 1: 12, + 2: 45, + 3: 30, + 4: 40, + 5: 50, + }), + }, + }), + "B": createFormulaTestTimeSeriesData("B", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "host_name": "ip-10-420-69-1", + "state": "running", + }), + Values: createValues(map[int64]float64{ + 1: 22, + 2: 65, + 3: 30, + 4: 40, + 5: 50, + }), + }, + { + Labels: createLabels(map[string]string{ + "host_name": "ip-10-420-69-2", + "state": "idle", + }), + Values: createValues(map[int64]float64{ + 1: 22, + 2: 65, + 4: 40, + 5: 50, + }), + }, + }), + "C": createFormulaTestTimeSeriesData("C", []*TimeSeries{ + { + Labels: createLabels(map[string]string{ + "host_name": "ip-10-420-69-1", + "state": "running", + "os.type": "linux", + }), + Values: createValues(map[int64]float64{ + 1: 10, + 2: 20, + 4: 40, + 5: 50, + 7: 70, + }), + }, + { + Labels: createLabels(map[string]string{ + "host_name": "ip-10-420-69-2", + "state": "idle", + "os.type": "linux", + }), + Values: createValues(map[int64]float64{ + 1: 12, + 2: 45, + 3: 30, + 4: 40, + 5: 50, + }), + }, + }), + } + + // Complex expression: A/B + C + evaluator, err := NewFormulaEvaluator("A/B + C", map[string]bool{"A": true, "B": true, "C": true}) + require.NoError(t, err) + + result, err := evaluator.EvaluateFormula(tsData) + require.NoError(t, err) + require.NotNil(t, result) + + // Should have 2 result series + assert.Equal(t, 2, len(result)) + + // Verify the complex calculation: A/B + C for the first series + for _, series := range result { + hasRunning := false + hasHost := false + for _, label := range series.Labels { + if label.Key.Name == "state" && label.Value == "running" { + hasRunning = true + } + if label.Key.Name == "host_name" { + hasHost = true + } + } + + if hasRunning && hasHost { + // timestamp 1: 10/22 + 10 = 10.45454545454545 + expectedVal1 := 10.0/22.0 + 10.0 + assert.InDelta(t, expectedVal1, series.Values[0].Value, 0.0001) + + // timestamp 2: 20/65 + 20 = 20.3076923076923077 + expectedVal2 := 20.0/65.0 + 20.0 + assert.InDelta(t, expectedVal2, series.Values[1].Value, 0.0001) + } + } +}