diff --git a/pkg/query-service/app/queryBuilder/query_builder.go b/pkg/query-service/app/queryBuilder/query_builder.go index 693bc88f44..a3a957c985 100644 --- a/pkg/query-service/app/queryBuilder/query_builder.go +++ b/pkg/query-service/app/queryBuilder/query_builder.go @@ -384,6 +384,8 @@ func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[stri parts = append(parts, fmt.Sprintf("source=%s", query.DataSource)) parts = append(parts, fmt.Sprintf("step=%d", query.StepInterval)) parts = append(parts, fmt.Sprintf("aggregate=%s", query.AggregateOperator)) + parts = append(parts, fmt.Sprintf("timeAggregation=%s", query.TimeAggregation)) + parts = append(parts, fmt.Sprintf("spaceAggregation=%s", query.SpaceAggregation)) if query.AggregateAttribute.Key != "" { parts = append(parts, fmt.Sprintf("aggregateAttribute=%s", query.AggregateAttribute.CacheKey())) diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index d1f55d7bb0..aa50065a74 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -669,6 +669,34 @@ type BuilderQuery struct { ShiftBy int64 } +// CanDefaultZero returns true if the missing value can be substituted by zero +// For example, for an aggregation window [Tx - Tx+1], with an aggregation operator `count` +// The lack of data can always be interpreted as zero. No data for requests count = zero requests +// This is true for all aggregations that have `count`ing involved. +// +// The same can't be true for others, `sum` of no values doesn't necessarily mean zero. +// We can't decide whether or not should it be zero. +func (b *BuilderQuery) CanDefaultZero() bool { + switch b.DataSource { + case DataSourceMetrics: + if b.AggregateOperator.IsRateOperator() || + b.TimeAggregation.IsRateOperator() || + b.AggregateOperator == AggregateOperatorCount || + b.AggregateOperator == AggregateOperatorCountDistinct || + b.TimeAggregation == TimeAggregationCount || + b.TimeAggregation == TimeAggregationCountDistinct { + return true + } + case DataSourceTraces, DataSourceLogs: + if b.AggregateOperator.IsRateOperator() || + b.AggregateOperator == AggregateOperatorCount || + b.AggregateOperator == AggregateOperatorCountDistinct { + return true + } + } + return false +} + func (b *BuilderQuery) Validate(panelType PanelType) error { if b == nil { return nil diff --git a/pkg/query-service/postprocess/formula.go b/pkg/query-service/postprocess/formula.go index c6a3aa7bb2..f1dd86eae5 100644 --- a/pkg/query-service/postprocess/formula.go +++ b/pkg/query-service/postprocess/formula.go @@ -60,7 +60,12 @@ func findUniqueLabelSets(results []*v3.Result) []map[string]string { } // Function to join series on timestamp and calculate new values -func joinAndCalculate(results []*v3.Result, uniqueLabelSet map[string]string, expression *govaluate.EvaluableExpression) (*v3.Series, error) { +func joinAndCalculate( + results []*v3.Result, + uniqueLabelSet map[string]string, + expression *govaluate.EvaluableExpression, + canDefaultZero map[string]bool, +) (*v3.Series, error) { uniqueTimestamps := make(map[int64]struct{}) // map[queryName]map[timestamp]value @@ -108,10 +113,16 @@ func joinAndCalculate(results []*v3.Result, uniqueLabelSet map[string]string, ex // If the value is not present in the values map, set it to 0 for _, v := range expression.Vars() { - if _, ok := values[v]; !ok { + if _, ok := values[v]; !ok && canDefaultZero[v] { values[v] = 0 } } + + if len(expression.Vars()) != len(values) { + // not enough values for expression evaluation + continue + } + newValue, err := expression.Evaluate(values) if err != nil { return nil, err @@ -136,16 +147,20 @@ func joinAndCalculate(results []*v3.Result, uniqueLabelSet map[string]string, ex // 2. For each unique label set, find a series that matches the label set from each query result // 3. Join the series on timestamp and calculate the new values // 4. Return the new series -func processResults(results []*v3.Result, expression *govaluate.EvaluableExpression) (*v3.Result, error) { +func processResults( + results []*v3.Result, + expression *govaluate.EvaluableExpression, + canDefaultZero map[string]bool, +) (*v3.Result, error) { uniqueLabelSets := findUniqueLabelSets(results) newSeries := make([]*v3.Series, 0) for _, labelSet := range uniqueLabelSets { - series, err := joinAndCalculate(results, labelSet, expression) + series, err := joinAndCalculate(results, labelSet, expression, canDefaultZero) if err != nil { return nil, err } - if series != nil { + if series != nil && len(series.Points) != 0 { labelsArray := make([]map[string]string, 0) for k, v := range series.Labels { labelsArray = append(labelsArray, map[string]string{k: v}) diff --git a/pkg/query-service/postprocess/formula_test.go b/pkg/query-service/postprocess/formula_test.go index 315344636f..c8a9f1c59c 100644 --- a/pkg/query-service/postprocess/formula_test.go +++ b/pkg/query-service/postprocess/formula_test.go @@ -278,7 +278,11 @@ func TestProcessResults(t *testing.T) { if err != nil { t.Errorf("Error parsing expression: %v", err) } - got, err := processResults(tt.results, expression) + canDefaultZero := map[string]bool{ + "A": true, + "B": true, + } + got, err := processResults(tt.results, expression, canDefaultZero) if err != nil { t.Errorf("Error processing results: %v", err) } @@ -438,7 +442,11 @@ func TestProcessResultsErrorRate(t *testing.T) { if err != nil { t.Errorf("Error parsing expression: %v", err) } - got, err := processResults(tt.results, expression) + canDefaultZero := map[string]bool{ + "A": true, + "B": true, + } + got, err := processResults(tt.results, expression, canDefaultZero) if err != nil { t.Errorf("Error processing results: %v", err) } @@ -1557,7 +1565,12 @@ func TestFormula(t *testing.T) { t.Errorf("Error parsing expression: %v", err) return } - got, err := processResults(tt.results, expression) + canDefaultZero := map[string]bool{ + "A": true, + "B": true, + "C": true, + } + got, err := processResults(tt.results, expression, canDefaultZero) if err != nil { t.Errorf("Error processing results: %v", err) return @@ -1581,3 +1594,92 @@ func TestFormula(t *testing.T) { }) } } + +func TestProcessResultsNoDefaultZero(t *testing.T) { + tests := []struct { + name string + results []*v3.Result + want *v3.Result + }{ + { + name: "test1", + results: []*v3.Result{ + { + QueryName: "A", + Series: []*v3.Series{ + { + Labels: map[string]string{ + "service_name": "frontend", + "operation": "GET /api", + }, + Points: []v3.Point{ + { + Timestamp: 1, + Value: 10, + }, + { + Timestamp: 2, + Value: 20, + }, + }, + }, + }, + }, + { + QueryName: "B", + Series: []*v3.Series{ + { + Labels: map[string]string{ + "service_name": "redis", + }, + Points: []v3.Point{ + { + Timestamp: 1, + Value: 30, + }, + { + Timestamp: 3, + Value: 40, + }, + }, + }, + }, + }, + }, + want: &v3.Result{ + Series: []*v3.Series{}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + expression, err := govaluate.NewEvaluableExpression("A + B") + if err != nil { + t.Errorf("Error parsing expression: %v", err) + } + canDefaultZero := map[string]bool{ + "A": false, + "B": false, + } + got, err := processResults(tt.results, expression, canDefaultZero) + if err != nil { + t.Errorf("Error processing results: %v", err) + } + if len(got.Series) != len(tt.want.Series) { + t.Errorf("processResults(): number of sereis - got = %v, want %v", len(got.Series), len(tt.want.Series)) + } + + for i := range got.Series { + if len(got.Series[i].Points) != len(tt.want.Series[i].Points) { + t.Errorf("processResults(): number of points - got = %v, want %v", got, tt.want) + } + for j := range got.Series[i].Points { + if got.Series[i].Points[j].Value != tt.want.Series[i].Points[j].Value { + t.Errorf("processResults(): got = %v, want %v", got.Series[i].Points[j].Value, tt.want.Series[i].Points[j].Value) + } + } + } + }) + } +} diff --git a/pkg/query-service/postprocess/query_range.go b/pkg/query-service/postprocess/query_range.go index 7477ac11c3..b6bd6461f3 100644 --- a/pkg/query-service/postprocess/query_range.go +++ b/pkg/query-service/postprocess/query_range.go @@ -41,6 +41,12 @@ func PostProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParam tablePanelResultProcessor(result) } + canDefaultZero := make(map[string]bool) + + for _, query := range queryRangeParams.CompositeQuery.BuilderQueries { + canDefaultZero[query.QueryName] = query.CanDefaultZero() + } + for _, query := range queryRangeParams.CompositeQuery.BuilderQueries { // The way we distinguish between a formula and a query is by checking if the expression // is the same as the query name @@ -52,7 +58,7 @@ func PostProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParam zap.L().Error("error in expression", zap.Error(err)) return nil, err } - formulaResult, err := processResults(result, expression) + formulaResult, err := processResults(result, expression, canDefaultZero) if err != nil { zap.L().Error("error in expression", zap.Error(err)) return nil, err diff --git a/pkg/query-service/rules/thresholdRule.go b/pkg/query-service/rules/thresholdRule.go index 865bac7f81..9bc2c5240d 100644 --- a/pkg/query-service/rules/thresholdRule.go +++ b/pkg/query-service/rules/thresholdRule.go @@ -884,7 +884,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie return result } - lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel) + lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel) for _, l := range r.labels { lb.Set(l.Name, expand(l.Value)) @@ -1022,20 +1022,27 @@ func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) { var alertSmpl Sample var shouldAlert bool var lbls labels.Labels + var lblsNormalized labels.Labels for name, value := range series.Labels { lbls = append(lbls, labels.Label{Name: name, Value: value}) + lblsNormalized = append(lblsNormalized, labels.Label{Name: normalizeLabelName(name), Value: value}) } series.Points = removeGroupinSetPoints(series) + // nothing to evaluate + if len(series.Points) == 0 { + return alertSmpl, false + } + switch r.matchType() { case AtleastOnce: // If any sample matches the condition, the rule is firing. if r.compareOp() == ValueIsAbove { for _, smpl := range series.Points { if smpl.Value > r.targetVal() { - alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls} + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} shouldAlert = true break } @@ -1043,7 +1050,7 @@ func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) { } else if r.compareOp() == ValueIsBelow { for _, smpl := range series.Points { if smpl.Value < r.targetVal() { - alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls} + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} shouldAlert = true break } @@ -1051,7 +1058,7 @@ func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) { } else if r.compareOp() == ValueIsEq { for _, smpl := range series.Points { if smpl.Value == r.targetVal() { - alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls} + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} shouldAlert = true break } @@ -1059,7 +1066,7 @@ func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) { } else if r.compareOp() == ValueIsNotEq { for _, smpl := range series.Points { if smpl.Value != r.targetVal() { - alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls} + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} shouldAlert = true break } @@ -1068,7 +1075,7 @@ func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) { case AllTheTimes: // If all samples match the condition, the rule is firing. shouldAlert = true - alertSmpl = Sample{Point: Point{V: r.targetVal()}, Metric: lbls, MetricOrig: lbls} + alertSmpl = Sample{Point: Point{V: r.targetVal()}, Metric: lblsNormalized, MetricOrig: lbls} if r.compareOp() == ValueIsAbove { for _, smpl := range series.Points { if smpl.Value <= r.targetVal() { @@ -1109,7 +1116,7 @@ func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) { count++ } avg := sum / count - alertSmpl = Sample{Point: Point{V: avg}, Metric: lbls, MetricOrig: lbls} + alertSmpl = Sample{Point: Point{V: avg}, Metric: lblsNormalized, MetricOrig: lbls} if r.compareOp() == ValueIsAbove { if avg > r.targetVal() { shouldAlert = true @@ -1137,7 +1144,7 @@ func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) { } sum += smpl.Value } - alertSmpl = Sample{Point: Point{V: sum}, Metric: lbls, MetricOrig: lbls} + alertSmpl = Sample{Point: Point{V: sum}, Metric: lblsNormalized, MetricOrig: lbls} if r.compareOp() == ValueIsAbove { if sum > r.targetVal() { shouldAlert = true diff --git a/pkg/query-service/rules/thresholdRule_test.go b/pkg/query-service/rules/thresholdRule_test.go index faf5803bc6..e895f486b2 100644 --- a/pkg/query-service/rules/thresholdRule_test.go +++ b/pkg/query-service/rules/thresholdRule_test.go @@ -749,3 +749,87 @@ func TestPrepareLinksToTraces(t *testing.T) { link := rule.prepareLinksToTraces(ts, labels.Labels{}) assert.Contains(t, link, "&timeRange=%7B%22start%22%3A1705468620000000000%2C%22end%22%3A1705468920000000000%2C%22pageSize%22%3A100%7D&startTime=1705468620000000000&endTime=1705468920000000000") } + +func TestThresholdRuleLabelNormalization(t *testing.T) { + postableRule := PostableRule{ + AlertName: "Tricky Condition Tests", + AlertType: "METRIC_BASED_ALERT", + RuleType: RuleTypeThreshold, + EvalWindow: Duration(5 * time.Minute), + Frequency: Duration(1 * time.Minute), + RuleCondition: &RuleCondition{ + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + AggregateAttribute: v3.AttributeKey{ + Key: "probe_success", + }, + AggregateOperator: v3.AggregateOperatorNoOp, + DataSource: v3.DataSourceMetrics, + Expression: "A", + }, + }, + }, + }, + } + + cases := []struct { + values v3.Series + expectAlert bool + compareOp string + matchType string + target float64 + }{ + // Test cases for Equals Always + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + }, + Labels: map[string]string{ + "service.name": "frontend", + }, + LabelsArray: []map[string]string{ + map[string]string{ + "service.name": "frontend", + }, + }, + }, + expectAlert: true, + compareOp: "3", // Equals + matchType: "2", // Always + target: 0.0, + }, + } + + fm := featureManager.StartManager() + for idx, c := range cases { + postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp) + postableRule.RuleCondition.MatchType = MatchType(c.matchType) + postableRule.RuleCondition.Target = &c.target + + rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil) + if err != nil { + assert.NoError(t, err) + } + + values := c.values + for i := range values.Points { + values.Points[i].Timestamp = time.Now().UnixMilli() + } + + sample, shoulAlert := rule.shouldAlert(c.values) + for name, value := range c.values.Labels { + assert.Equal(t, value, sample.Metric.Get(normalizeLabelName(name))) + } + + assert.Equal(t, c.expectAlert, shoulAlert, "Test case %d", idx) + } +} diff --git a/pkg/query-service/utils/labels/labels.go b/pkg/query-service/utils/labels/labels.go index 2e0041aafc..991490806c 100644 --- a/pkg/query-service/utils/labels/labels.go +++ b/pkg/query-service/utils/labels/labels.go @@ -14,8 +14,9 @@ const sep = '\xff' // Well-known label names used by Prometheus components. const ( - MetricNameLabel = "__name__" - AlertNameLabel = "alertname" + MetricNameLabel = "__name__" + TemporalityLabel = "__temporality__" + AlertNameLabel = "alertname" // AlertStateLabel is the label name indicating the state of an alert. AlertStateLabel = "alertstate"