chore: default zero only for counters and label normalization (#5085)

This commit is contained in:
Srikanth Chekuri 2024-05-27 13:19:28 +05:30 committed by GitHub
parent c1c5c4dfa8
commit 1d1d85efa3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 264 additions and 19 deletions

View File

@ -384,6 +384,8 @@ func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[stri
parts = append(parts, fmt.Sprintf("source=%s", query.DataSource))
parts = append(parts, fmt.Sprintf("step=%d", query.StepInterval))
parts = append(parts, fmt.Sprintf("aggregate=%s", query.AggregateOperator))
parts = append(parts, fmt.Sprintf("timeAggregation=%s", query.TimeAggregation))
parts = append(parts, fmt.Sprintf("spaceAggregation=%s", query.SpaceAggregation))
if query.AggregateAttribute.Key != "" {
parts = append(parts, fmt.Sprintf("aggregateAttribute=%s", query.AggregateAttribute.CacheKey()))

View File

@ -669,6 +669,34 @@ type BuilderQuery struct {
ShiftBy int64
}
// CanDefaultZero returns true if the missing value can be substituted by zero
// For example, for an aggregation window [Tx - Tx+1], with an aggregation operator `count`
// The lack of data can always be interpreted as zero. No data for requests count = zero requests
// This is true for all aggregations that have `count`ing involved.
//
// The same can't be true for others, `sum` of no values doesn't necessarily mean zero.
// We can't decide whether or not should it be zero.
func (b *BuilderQuery) CanDefaultZero() bool {
switch b.DataSource {
case DataSourceMetrics:
if b.AggregateOperator.IsRateOperator() ||
b.TimeAggregation.IsRateOperator() ||
b.AggregateOperator == AggregateOperatorCount ||
b.AggregateOperator == AggregateOperatorCountDistinct ||
b.TimeAggregation == TimeAggregationCount ||
b.TimeAggregation == TimeAggregationCountDistinct {
return true
}
case DataSourceTraces, DataSourceLogs:
if b.AggregateOperator.IsRateOperator() ||
b.AggregateOperator == AggregateOperatorCount ||
b.AggregateOperator == AggregateOperatorCountDistinct {
return true
}
}
return false
}
func (b *BuilderQuery) Validate(panelType PanelType) error {
if b == nil {
return nil

View File

@ -60,7 +60,12 @@ func findUniqueLabelSets(results []*v3.Result) []map[string]string {
}
// Function to join series on timestamp and calculate new values
func joinAndCalculate(results []*v3.Result, uniqueLabelSet map[string]string, expression *govaluate.EvaluableExpression) (*v3.Series, error) {
func joinAndCalculate(
results []*v3.Result,
uniqueLabelSet map[string]string,
expression *govaluate.EvaluableExpression,
canDefaultZero map[string]bool,
) (*v3.Series, error) {
uniqueTimestamps := make(map[int64]struct{})
// map[queryName]map[timestamp]value
@ -108,10 +113,16 @@ func joinAndCalculate(results []*v3.Result, uniqueLabelSet map[string]string, ex
// If the value is not present in the values map, set it to 0
for _, v := range expression.Vars() {
if _, ok := values[v]; !ok {
if _, ok := values[v]; !ok && canDefaultZero[v] {
values[v] = 0
}
}
if len(expression.Vars()) != len(values) {
// not enough values for expression evaluation
continue
}
newValue, err := expression.Evaluate(values)
if err != nil {
return nil, err
@ -136,16 +147,20 @@ func joinAndCalculate(results []*v3.Result, uniqueLabelSet map[string]string, ex
// 2. For each unique label set, find a series that matches the label set from each query result
// 3. Join the series on timestamp and calculate the new values
// 4. Return the new series
func processResults(results []*v3.Result, expression *govaluate.EvaluableExpression) (*v3.Result, error) {
func processResults(
results []*v3.Result,
expression *govaluate.EvaluableExpression,
canDefaultZero map[string]bool,
) (*v3.Result, error) {
uniqueLabelSets := findUniqueLabelSets(results)
newSeries := make([]*v3.Series, 0)
for _, labelSet := range uniqueLabelSets {
series, err := joinAndCalculate(results, labelSet, expression)
series, err := joinAndCalculate(results, labelSet, expression, canDefaultZero)
if err != nil {
return nil, err
}
if series != nil {
if series != nil && len(series.Points) != 0 {
labelsArray := make([]map[string]string, 0)
for k, v := range series.Labels {
labelsArray = append(labelsArray, map[string]string{k: v})

View File

@ -278,7 +278,11 @@ func TestProcessResults(t *testing.T) {
if err != nil {
t.Errorf("Error parsing expression: %v", err)
}
got, err := processResults(tt.results, expression)
canDefaultZero := map[string]bool{
"A": true,
"B": true,
}
got, err := processResults(tt.results, expression, canDefaultZero)
if err != nil {
t.Errorf("Error processing results: %v", err)
}
@ -438,7 +442,11 @@ func TestProcessResultsErrorRate(t *testing.T) {
if err != nil {
t.Errorf("Error parsing expression: %v", err)
}
got, err := processResults(tt.results, expression)
canDefaultZero := map[string]bool{
"A": true,
"B": true,
}
got, err := processResults(tt.results, expression, canDefaultZero)
if err != nil {
t.Errorf("Error processing results: %v", err)
}
@ -1557,7 +1565,12 @@ func TestFormula(t *testing.T) {
t.Errorf("Error parsing expression: %v", err)
return
}
got, err := processResults(tt.results, expression)
canDefaultZero := map[string]bool{
"A": true,
"B": true,
"C": true,
}
got, err := processResults(tt.results, expression, canDefaultZero)
if err != nil {
t.Errorf("Error processing results: %v", err)
return
@ -1581,3 +1594,92 @@ func TestFormula(t *testing.T) {
})
}
}
func TestProcessResultsNoDefaultZero(t *testing.T) {
tests := []struct {
name string
results []*v3.Result
want *v3.Result
}{
{
name: "test1",
results: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
"operation": "GET /api",
},
Points: []v3.Point{
{
Timestamp: 1,
Value: 10,
},
{
Timestamp: 2,
Value: 20,
},
},
},
},
},
{
QueryName: "B",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "redis",
},
Points: []v3.Point{
{
Timestamp: 1,
Value: 30,
},
{
Timestamp: 3,
Value: 40,
},
},
},
},
},
},
want: &v3.Result{
Series: []*v3.Series{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
expression, err := govaluate.NewEvaluableExpression("A + B")
if err != nil {
t.Errorf("Error parsing expression: %v", err)
}
canDefaultZero := map[string]bool{
"A": false,
"B": false,
}
got, err := processResults(tt.results, expression, canDefaultZero)
if err != nil {
t.Errorf("Error processing results: %v", err)
}
if len(got.Series) != len(tt.want.Series) {
t.Errorf("processResults(): number of sereis - got = %v, want %v", len(got.Series), len(tt.want.Series))
}
for i := range got.Series {
if len(got.Series[i].Points) != len(tt.want.Series[i].Points) {
t.Errorf("processResults(): number of points - got = %v, want %v", got, tt.want)
}
for j := range got.Series[i].Points {
if got.Series[i].Points[j].Value != tt.want.Series[i].Points[j].Value {
t.Errorf("processResults(): got = %v, want %v", got.Series[i].Points[j].Value, tt.want.Series[i].Points[j].Value)
}
}
}
})
}
}

View File

@ -41,6 +41,12 @@ func PostProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParam
tablePanelResultProcessor(result)
}
canDefaultZero := make(map[string]bool)
for _, query := range queryRangeParams.CompositeQuery.BuilderQueries {
canDefaultZero[query.QueryName] = query.CanDefaultZero()
}
for _, query := range queryRangeParams.CompositeQuery.BuilderQueries {
// The way we distinguish between a formula and a query is by checking if the expression
// is the same as the query name
@ -52,7 +58,7 @@ func PostProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParam
zap.L().Error("error in expression", zap.Error(err))
return nil, err
}
formulaResult, err := processResults(result, expression)
formulaResult, err := processResults(result, expression, canDefaultZero)
if err != nil {
zap.L().Error("error in expression", zap.Error(err))
return nil, err

View File

@ -884,7 +884,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
return result
}
lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel)
lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel)
for _, l := range r.labels {
lb.Set(l.Name, expand(l.Value))
@ -1022,20 +1022,27 @@ func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) {
var alertSmpl Sample
var shouldAlert bool
var lbls labels.Labels
var lblsNormalized labels.Labels
for name, value := range series.Labels {
lbls = append(lbls, labels.Label{Name: name, Value: value})
lblsNormalized = append(lblsNormalized, labels.Label{Name: normalizeLabelName(name), Value: value})
}
series.Points = removeGroupinSetPoints(series)
// nothing to evaluate
if len(series.Points) == 0 {
return alertSmpl, false
}
switch r.matchType() {
case AtleastOnce:
// If any sample matches the condition, the rule is firing.
if r.compareOp() == ValueIsAbove {
for _, smpl := range series.Points {
if smpl.Value > r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls}
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
shouldAlert = true
break
}
@ -1043,7 +1050,7 @@ func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) {
} else if r.compareOp() == ValueIsBelow {
for _, smpl := range series.Points {
if smpl.Value < r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls}
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
shouldAlert = true
break
}
@ -1051,7 +1058,7 @@ func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) {
} else if r.compareOp() == ValueIsEq {
for _, smpl := range series.Points {
if smpl.Value == r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls}
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
shouldAlert = true
break
}
@ -1059,7 +1066,7 @@ func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) {
} else if r.compareOp() == ValueIsNotEq {
for _, smpl := range series.Points {
if smpl.Value != r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls}
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
shouldAlert = true
break
}
@ -1068,7 +1075,7 @@ func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) {
case AllTheTimes:
// If all samples match the condition, the rule is firing.
shouldAlert = true
alertSmpl = Sample{Point: Point{V: r.targetVal()}, Metric: lbls, MetricOrig: lbls}
alertSmpl = Sample{Point: Point{V: r.targetVal()}, Metric: lblsNormalized, MetricOrig: lbls}
if r.compareOp() == ValueIsAbove {
for _, smpl := range series.Points {
if smpl.Value <= r.targetVal() {
@ -1109,7 +1116,7 @@ func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) {
count++
}
avg := sum / count
alertSmpl = Sample{Point: Point{V: avg}, Metric: lbls, MetricOrig: lbls}
alertSmpl = Sample{Point: Point{V: avg}, Metric: lblsNormalized, MetricOrig: lbls}
if r.compareOp() == ValueIsAbove {
if avg > r.targetVal() {
shouldAlert = true
@ -1137,7 +1144,7 @@ func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) {
}
sum += smpl.Value
}
alertSmpl = Sample{Point: Point{V: sum}, Metric: lbls, MetricOrig: lbls}
alertSmpl = Sample{Point: Point{V: sum}, Metric: lblsNormalized, MetricOrig: lbls}
if r.compareOp() == ValueIsAbove {
if sum > r.targetVal() {
shouldAlert = true

View File

@ -749,3 +749,87 @@ func TestPrepareLinksToTraces(t *testing.T) {
link := rule.prepareLinksToTraces(ts, labels.Labels{})
assert.Contains(t, link, "&timeRange=%7B%22start%22%3A1705468620000000000%2C%22end%22%3A1705468920000000000%2C%22pageSize%22%3A100%7D&startTime=1705468620000000000&endTime=1705468920000000000")
}
func TestThresholdRuleLabelNormalization(t *testing.T) {
postableRule := PostableRule{
AlertName: "Tricky Condition Tests",
AlertType: "METRIC_BASED_ALERT",
RuleType: RuleTypeThreshold,
EvalWindow: Duration(5 * time.Minute),
Frequency: Duration(1 * time.Minute),
RuleCondition: &RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
AggregateAttribute: v3.AttributeKey{
Key: "probe_success",
},
AggregateOperator: v3.AggregateOperatorNoOp,
DataSource: v3.DataSourceMetrics,
Expression: "A",
},
},
},
},
}
cases := []struct {
values v3.Series
expectAlert bool
compareOp string
matchType string
target float64
}{
// Test cases for Equals Always
{
values: v3.Series{
Points: []v3.Point{
{Value: 0.0},
{Value: 0.0},
{Value: 0.0},
{Value: 0.0},
{Value: 0.0},
},
Labels: map[string]string{
"service.name": "frontend",
},
LabelsArray: []map[string]string{
map[string]string{
"service.name": "frontend",
},
},
},
expectAlert: true,
compareOp: "3", // Equals
matchType: "2", // Always
target: 0.0,
},
}
fm := featureManager.StartManager()
for idx, c := range cases {
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil)
if err != nil {
assert.NoError(t, err)
}
values := c.values
for i := range values.Points {
values.Points[i].Timestamp = time.Now().UnixMilli()
}
sample, shoulAlert := rule.shouldAlert(c.values)
for name, value := range c.values.Labels {
assert.Equal(t, value, sample.Metric.Get(normalizeLabelName(name)))
}
assert.Equal(t, c.expectAlert, shoulAlert, "Test case %d", idx)
}
}

View File

@ -14,8 +14,9 @@ const sep = '\xff'
// Well-known label names used by Prometheus components.
const (
MetricNameLabel = "__name__"
AlertNameLabel = "alertname"
MetricNameLabel = "__name__"
TemporalityLabel = "__temporality__"
AlertNameLabel = "alertname"
// AlertStateLabel is the label name indicating the state of an alert.
AlertStateLabel = "alertstate"