diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 7fb9317946..ee019e639a 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -746,10 +746,7 @@ func makeRulesManager( // create manager opts managerOpts := &baserules.ManagerOptions{ NotifierOpts: notifierOpts, - Queriers: &baserules.Queriers{ - PqlEngine: pqle, - Ch: ch.GetConn(), - }, + PqlEngine: pqle, RepoURL: ruleRepoURL, DBConn: db, Context: context.Background(), diff --git a/ee/query-service/rules/manager.go b/ee/query-service/rules/manager.go index 831fb52793..d3bc03f58a 100644 --- a/ee/query-service/rules/manager.go +++ b/ee/query-service/rules/manager.go @@ -18,11 +18,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error) tr, err := baserules.NewThresholdRule( ruleId, opts.Rule, - baserules.ThresholdRuleOpts{ - EvalDelay: opts.ManagerOpts.EvalDelay, - }, opts.FF, opts.Reader, + baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay), ) if err != nil { @@ -41,8 +39,8 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error) ruleId, opts.Rule, opts.Logger, - baserules.PromRuleOpts{}, opts.Reader, + opts.ManagerOpts.PqlEngine, ) if err != nil { diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 77caa9170b..557b082f42 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -731,10 +731,7 @@ func makeRulesManager( // create manager opts managerOpts := &rules.ManagerOptions{ NotifierOpts: notifierOpts, - Queriers: &rules.Queriers{ - PqlEngine: pqle, - Ch: ch.GetConn(), - }, + PqlEngine: pqle, RepoURL: ruleRepoURL, DBConn: db, Context: context.Background(), diff --git a/pkg/query-service/rules/base_rule.go b/pkg/query-service/rules/base_rule.go new file mode 100644 index 0000000000..492f6f685c --- /dev/null +++ b/pkg/query-service/rules/base_rule.go @@ -0,0 +1,567 @@ +package rules + +import ( + "context" + "fmt" + "math" + "net/url" + "sync" + "time" + + "go.signoz.io/signoz/pkg/query-service/converter" + "go.signoz.io/signoz/pkg/query-service/interfaces" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + qslabels "go.signoz.io/signoz/pkg/query-service/utils/labels" + "go.uber.org/zap" +) + +// BaseRule contains common fields and methods for all rule types +type BaseRule struct { + id string + name string + source string + handledRestart bool + + // Type of the rule + typ AlertType + + ruleCondition *RuleCondition + // evalWindow is the time window used for evaluating the rule + // i.e each time we lookback from the current time, we look at data for the last + // evalWindow duration + evalWindow time.Duration + // holdDuration is the duration for which the alert waits before firing + holdDuration time.Duration + + // evalDelay is the delay in evaluation of the rule + // this is useful in cases where the data is not available immediately + evalDelay time.Duration + + // holds the static set of labels and annotations for the rule + // these are the same for all alerts created for this rule + labels qslabels.BaseLabels + annotations qslabels.BaseLabels + // preferredChannels is the list of channels to send the alert to + // if the rule is triggered + preferredChannels []string + mtx sync.Mutex + // the time it took to evaluate the rule (most recent evaluation) + evaluationDuration time.Duration + // the timestamp of the last evaluation + evaluationTimestamp time.Time + + health RuleHealth + lastError error + active map[uint64]*Alert + + // lastTimestampWithDatapoints is the timestamp of the last datapoint we observed + // for this rule + // this is used for missing data alerts + lastTimestampWithDatapoints time.Time + + reader interfaces.Reader + + logger *zap.Logger + + // sendUnmatched sends observed metric values + // even if they dont match the rule condition. this is + // useful in testing the rule + sendUnmatched bool + + // sendAlways will send alert irresepective of resendDelay + // or other params + sendAlways bool +} + +type RuleOption func(*BaseRule) + +func WithSendAlways() RuleOption { + return func(r *BaseRule) { + r.sendAlways = true + } +} + +func WithSendUnmatched() RuleOption { + return func(r *BaseRule) { + r.sendUnmatched = true + } +} + +func WithEvalDelay(dur time.Duration) RuleOption { + return func(r *BaseRule) { + r.evalDelay = dur + } +} + +func WithLogger(logger *zap.Logger) RuleOption { + return func(r *BaseRule) { + r.logger = logger + } +} + +func NewBaseRule(id string, p *PostableRule, reader interfaces.Reader, opts ...RuleOption) (*BaseRule, error) { + if p.RuleCondition == nil || !p.RuleCondition.IsValid() { + return nil, fmt.Errorf("invalid rule condition") + } + + baseRule := &BaseRule{ + id: id, + name: p.AlertName, + source: p.Source, + ruleCondition: p.RuleCondition, + evalWindow: time.Duration(p.EvalWindow), + labels: qslabels.FromMap(p.Labels), + annotations: qslabels.FromMap(p.Annotations), + preferredChannels: p.PreferredChannels, + health: HealthUnknown, + active: map[uint64]*Alert{}, + reader: reader, + } + + if baseRule.evalWindow == 0 { + baseRule.evalWindow = 5 * time.Minute + } + + for _, opt := range opts { + opt(baseRule) + } + + return baseRule, nil +} + +func (r *BaseRule) targetVal() float64 { + if r.ruleCondition == nil || r.ruleCondition.Target == nil { + return 0 + } + + // get the converter for the target unit + unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit)) + // convert the target value to the y-axis unit + value := unitConverter.Convert(converter.Value{ + F: *r.ruleCondition.Target, + U: converter.Unit(r.ruleCondition.TargetUnit), + }, converter.Unit(r.Unit())) + + return value.F +} + +func (r *BaseRule) matchType() MatchType { + if r.ruleCondition == nil { + return AtleastOnce + } + return r.ruleCondition.MatchType +} + +func (r *BaseRule) compareOp() CompareOp { + if r.ruleCondition == nil { + return ValueIsEq + } + return r.ruleCondition.CompareOp +} + +func (r *BaseRule) currentAlerts() []*Alert { + r.mtx.Lock() + defer r.mtx.Unlock() + + alerts := make([]*Alert, 0, len(r.active)) + for _, a := range r.active { + anew := *a + alerts = append(alerts, &anew) + } + return alerts +} + +func (r *ThresholdRule) hostFromSource() string { + parsedUrl, err := url.Parse(r.source) + if err != nil { + return "" + } + if parsedUrl.Port() != "" { + return fmt.Sprintf("%s://%s:%s", parsedUrl.Scheme, parsedUrl.Hostname(), parsedUrl.Port()) + } + return fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Hostname()) +} + +func (r *BaseRule) ID() string { return r.id } +func (r *BaseRule) Name() string { return r.name } +func (r *BaseRule) Condition() *RuleCondition { return r.ruleCondition } +func (r *BaseRule) Labels() qslabels.BaseLabels { return r.labels } +func (r *BaseRule) Annotations() qslabels.BaseLabels { return r.annotations } +func (r *BaseRule) PreferredChannels() []string { return r.preferredChannels } + +func (r *BaseRule) GeneratorURL() string { + return prepareRuleGeneratorURL(r.ID(), r.source) +} + +func (r *BaseRule) Unit() string { + if r.ruleCondition != nil && r.ruleCondition.CompositeQuery != nil { + return r.ruleCondition.CompositeQuery.Unit + } + return "" +} + +func (r *BaseRule) SetLastError(err error) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.lastError = err +} + +func (r *BaseRule) LastError() error { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.lastError +} + +func (r *BaseRule) SetHealth(health RuleHealth) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.health = health +} + +func (r *BaseRule) Health() RuleHealth { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.health +} + +func (r *BaseRule) SetEvaluationDuration(dur time.Duration) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.evaluationDuration = dur +} + +func (r *BaseRule) GetEvaluationDuration() time.Duration { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.evaluationDuration +} + +func (r *BaseRule) SetEvaluationTimestamp(ts time.Time) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.evaluationTimestamp = ts +} + +func (r *BaseRule) GetEvaluationTimestamp() time.Time { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.evaluationTimestamp +} + +func (r *BaseRule) State() model.AlertState { + maxState := model.StateInactive + for _, a := range r.active { + if a.State > maxState { + maxState = a.State + } + } + return maxState +} + +func (r *BaseRule) ActiveAlerts() []*Alert { + var res []*Alert + for _, a := range r.currentAlerts() { + if a.ResolvedAt.IsZero() { + res = append(res, a) + } + } + return res +} + +func (r *BaseRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) { + alerts := []*Alert{} + r.ForEachActiveAlert(func(alert *Alert) { + if alert.needsSending(ts, resendDelay) { + alert.LastSentAt = ts + delta := resendDelay + if interval > resendDelay { + delta = interval + } + alert.ValidUntil = ts.Add(4 * delta) + anew := *alert + alerts = append(alerts, &anew) + } + }) + notifyFunc(ctx, "", alerts...) +} + +func (r *BaseRule) ForEachActiveAlert(f func(*Alert)) { + r.mtx.Lock() + defer r.mtx.Unlock() + + for _, a := range r.active { + f(a) + } +} + +func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) { + var alertSmpl Sample + var shouldAlert bool + var lbls qslabels.Labels + var lblsNormalized qslabels.Labels + + for name, value := range series.Labels { + lbls = append(lbls, qslabels.Label{Name: name, Value: value}) + lblsNormalized = append(lblsNormalized, qslabels.Label{Name: normalizeLabelName(name), Value: value}) + } + + series.Points = removeGroupinSetPoints(series) + + // nothing to evaluate + if len(series.Points) == 0 { + return alertSmpl, false + } + + switch r.matchType() { + case AtleastOnce: + // If any sample matches the condition, the rule is firing. + if r.compareOp() == ValueIsAbove { + for _, smpl := range series.Points { + if smpl.Value > r.targetVal() { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} + shouldAlert = true + break + } + } + } else if r.compareOp() == ValueIsBelow { + for _, smpl := range series.Points { + if smpl.Value < r.targetVal() { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} + shouldAlert = true + break + } + } + } else if r.compareOp() == ValueIsEq { + for _, smpl := range series.Points { + if smpl.Value == r.targetVal() { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} + shouldAlert = true + break + } + } + } else if r.compareOp() == ValueIsNotEq { + for _, smpl := range series.Points { + if smpl.Value != r.targetVal() { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} + shouldAlert = true + break + } + } + } + case AllTheTimes: + // If all samples match the condition, the rule is firing. + shouldAlert = true + alertSmpl = Sample{Point: Point{V: r.targetVal()}, Metric: lblsNormalized, MetricOrig: lbls} + if r.compareOp() == ValueIsAbove { + for _, smpl := range series.Points { + if smpl.Value <= r.targetVal() { + shouldAlert = false + break + } + } + // use min value from the series + if shouldAlert { + var minValue float64 = math.Inf(1) + for _, smpl := range series.Points { + if smpl.Value < minValue { + minValue = smpl.Value + } + } + alertSmpl = Sample{Point: Point{V: minValue}, Metric: lblsNormalized, MetricOrig: lbls} + } + } else if r.compareOp() == ValueIsBelow { + for _, smpl := range series.Points { + if smpl.Value >= r.targetVal() { + shouldAlert = false + break + } + } + if shouldAlert { + var maxValue float64 = math.Inf(-1) + for _, smpl := range series.Points { + if smpl.Value > maxValue { + maxValue = smpl.Value + } + } + alertSmpl = Sample{Point: Point{V: maxValue}, Metric: lblsNormalized, MetricOrig: lbls} + } + } else if r.compareOp() == ValueIsEq { + for _, smpl := range series.Points { + if smpl.Value != r.targetVal() { + shouldAlert = false + break + } + } + } else if r.compareOp() == ValueIsNotEq { + for _, smpl := range series.Points { + if smpl.Value == r.targetVal() { + shouldAlert = false + break + } + } + // use any non-inf or nan value from the series + if shouldAlert { + for _, smpl := range series.Points { + if !math.IsInf(smpl.Value, 0) && !math.IsNaN(smpl.Value) { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} + break + } + } + } + } + case OnAverage: + // If the average of all samples matches the condition, the rule is firing. + var sum, count float64 + for _, smpl := range series.Points { + if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) { + continue + } + sum += smpl.Value + count++ + } + avg := sum / count + alertSmpl = Sample{Point: Point{V: avg}, Metric: lblsNormalized, MetricOrig: lbls} + if r.compareOp() == ValueIsAbove { + if avg > r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsBelow { + if avg < r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsEq { + if avg == r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsNotEq { + if avg != r.targetVal() { + shouldAlert = true + } + } + case InTotal: + // If the sum of all samples matches the condition, the rule is firing. + var sum float64 + + for _, smpl := range series.Points { + if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) { + continue + } + sum += smpl.Value + } + alertSmpl = Sample{Point: Point{V: sum}, Metric: lblsNormalized, MetricOrig: lbls} + if r.compareOp() == ValueIsAbove { + if sum > r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsBelow { + if sum < r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsEq { + if sum == r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsNotEq { + if sum != r.targetVal() { + shouldAlert = true + } + } + } + return alertSmpl, shouldAlert +} + +func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error { + zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd)) + revisedItemsToAdd := map[uint64]v3.RuleStateHistory{} + + lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID()) + if err != nil { + return err + } + // if the query-service has been restarted, or the rule has been modified (which re-initializes the rule), + // the state would reset so we need to add the corresponding state changes to previously saved states + if !r.handledRestart && len(lastSavedState) > 0 { + zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState)) + l := map[uint64]v3.RuleStateHistory{} + for _, item := range itemsToAdd { + l[item.Fingerprint] = item + } + + shouldSkip := map[uint64]bool{} + + for _, item := range lastSavedState { + // for the last saved item with fingerprint, check if there is a corresponding entry in the current state + currentState, ok := l[item.Fingerprint] + if !ok { + // there was a state change in the past, but not in the current state + // if the state was firing, then we should add a resolved state change + if item.State == model.StateFiring || item.State == model.StateNoData { + item.State = model.StateInactive + item.StateChanged = true + item.UnixMilli = time.Now().UnixMilli() + revisedItemsToAdd[item.Fingerprint] = item + } + // there is nothing to do if the prev state was normal + } else { + if item.State != currentState.State { + item.State = currentState.State + item.StateChanged = true + item.UnixMilli = time.Now().UnixMilli() + revisedItemsToAdd[item.Fingerprint] = item + } + } + // do not add this item to revisedItemsToAdd as it is already processed + shouldSkip[item.Fingerprint] = true + } + zap.L().Debug("after lastSavedState loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + // if there are any new state changes that were not saved, add them to the revised items + for _, item := range itemsToAdd { + if _, ok := revisedItemsToAdd[item.Fingerprint]; !ok && !shouldSkip[item.Fingerprint] { + revisedItemsToAdd[item.Fingerprint] = item + } + } + zap.L().Debug("after itemsToAdd loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + newState := model.StateInactive + for _, item := range revisedItemsToAdd { + if item.State == model.StateFiring || item.State == model.StateNoData { + newState = model.StateFiring + break + } + } + zap.L().Debug("newState", zap.String("ruleid", r.ID()), zap.Any("newState", newState)) + + // if there is a change in the overall state, update the overall state + if lastSavedState[0].OverallState != newState { + for fingerprint, item := range revisedItemsToAdd { + item.OverallState = newState + item.OverallStateChanged = true + revisedItemsToAdd[fingerprint] = item + } + } + zap.L().Debug("revisedItemsToAdd after newState", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + } else { + for _, item := range itemsToAdd { + revisedItemsToAdd[item.Fingerprint] = item + } + } + + if len(revisedItemsToAdd) > 0 && r.reader != nil { + zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + entries := make([]v3.RuleStateHistory, 0, len(revisedItemsToAdd)) + for _, item := range revisedItemsToAdd { + entries = append(entries, item) + } + err := r.reader.AddRuleStateHistory(ctx, entries) + if err != nil { + zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd)) + } + } + r.handledRestart = true + + return nil +} diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index fe309334b1..120d674a9a 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -21,6 +21,7 @@ import ( am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" + pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine" "go.signoz.io/signoz/pkg/query-service/telemetry" "go.signoz.io/signoz/pkg/query-service/utils/labels" ) @@ -56,7 +57,7 @@ func prepareTaskName(ruleId interface{}) string { // ManagerOptions bundles options for the Manager. type ManagerOptions struct { NotifierOpts am.NotifierOptions - Queriers *Queriers + PqlEngine *pqle.PqlEngine // RepoURL is used to generate a backlink in sent alert messages RepoURL string @@ -127,11 +128,9 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) { tr, err := NewThresholdRule( ruleId, opts.Rule, - ThresholdRuleOpts{ - EvalDelay: opts.ManagerOpts.EvalDelay, - }, opts.FF, opts.Reader, + WithEvalDelay(opts.ManagerOpts.EvalDelay), ) if err != nil { @@ -150,8 +149,8 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) { ruleId, opts.Rule, opts.Logger, - PromRuleOpts{}, opts.Reader, + opts.ManagerOpts.PqlEngine, ) if err != nil { @@ -793,12 +792,10 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m rule, err = NewThresholdRule( alertname, parsedRule, - ThresholdRuleOpts{ - SendUnmatched: true, - SendAlways: true, - }, m.featureFlags, m.reader, + WithSendAlways(), + WithSendUnmatched(), ) if err != nil { @@ -813,10 +810,10 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m alertname, parsedRule, m.logger, - PromRuleOpts{ - SendAlways: true, - }, m.reader, + m.opts.PqlEngine, + WithSendAlways(), + WithSendUnmatched(), ) if err != nil { @@ -830,7 +827,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m // set timestamp to current utc time ts := time.Now().UTC() - count, err := rule.Eval(ctx, ts, m.opts.Queriers) + count, err := rule.Eval(ctx, ts) if err != nil { zap.L().Error("evaluating rule failed", zap.String("rule", rule.Name()), zap.Error(err)) return 0, newApiErrorInternal(fmt.Errorf("rule evaluation failed")) diff --git a/pkg/query-service/rules/prom_rule.go b/pkg/query-service/rules/prom_rule.go index 2241d32a4b..314e268e1f 100644 --- a/pkg/query-service/rules/prom_rule.go +++ b/pkg/query-service/rules/prom_rule.go @@ -4,295 +4,60 @@ import ( "context" "encoding/json" "fmt" - "math" - "sync" "time" "go.uber.org/zap" - plabels "github.com/prometheus/prometheus/model/labels" - pql "github.com/prometheus/prometheus/promql" - "go.signoz.io/signoz/pkg/query-service/converter" + "github.com/prometheus/prometheus/promql" "go.signoz.io/signoz/pkg/query-service/formatter" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine" qslabels "go.signoz.io/signoz/pkg/query-service/utils/labels" "go.signoz.io/signoz/pkg/query-service/utils/times" "go.signoz.io/signoz/pkg/query-service/utils/timestamp" yaml "gopkg.in/yaml.v2" ) -type PromRuleOpts struct { - // SendAlways will send alert irresepective of resendDelay - // or other params - SendAlways bool -} - type PromRule struct { - id string - name string - source string - ruleCondition *RuleCondition - - evalWindow time.Duration - holdDuration time.Duration - labels plabels.Labels - annotations plabels.Labels - - preferredChannels []string - - mtx sync.Mutex - evaluationDuration time.Duration - evaluationTimestamp time.Time - - health RuleHealth - - lastError error - - // map of active alerts - active map[uint64]*Alert - - logger *zap.Logger - opts PromRuleOpts - - reader interfaces.Reader - - handledRestart bool + *BaseRule + pqlEngine *pqle.PqlEngine } func NewPromRule( id string, postableRule *PostableRule, logger *zap.Logger, - opts PromRuleOpts, reader interfaces.Reader, + pqlEngine *pqle.PqlEngine, + opts ...RuleOption, ) (*PromRule, error) { - if postableRule.RuleCondition == nil { - return nil, fmt.Errorf("no rule condition") - } else if !postableRule.RuleCondition.IsValid() { - return nil, fmt.Errorf("invalid rule condition") + baseRule, err := NewBaseRule(id, postableRule, reader, opts...) + if err != nil { + return nil, err } p := PromRule{ - id: id, - name: postableRule.AlertName, - source: postableRule.Source, - ruleCondition: postableRule.RuleCondition, - evalWindow: time.Duration(postableRule.EvalWindow), - labels: plabels.FromMap(postableRule.Labels), - annotations: plabels.FromMap(postableRule.Annotations), - preferredChannels: postableRule.PreferredChannels, - health: HealthUnknown, - active: map[uint64]*Alert{}, - logger: logger, - opts: opts, + BaseRule: baseRule, + pqlEngine: pqlEngine, } - p.reader = reader - if int64(p.evalWindow) == 0 { - p.evalWindow = 5 * time.Minute - } query, err := p.getPqlQuery() if err != nil { // can not generate a valid prom QL query return nil, err } - - zap.L().Info("creating new alerting rule", zap.String("name", p.name), zap.String("condition", p.ruleCondition.String()), zap.String("query", query)) - + zap.L().Info("creating new prom rule", zap.String("name", p.name), zap.String("query", query)) return &p, nil } -func (r *PromRule) Name() string { - return r.name -} - -func (r *PromRule) ID() string { - return r.id -} - -func (r *PromRule) Condition() *RuleCondition { - return r.ruleCondition -} - -// targetVal returns the target value for the rule condition -// when the y-axis and target units are non-empty, it -// converts the target value to the y-axis unit -func (r *PromRule) targetVal() float64 { - if r.ruleCondition == nil || r.ruleCondition.Target == nil { - return 0 - } - - // get the converter for the target unit - unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit)) - // convert the target value to the y-axis unit - value := unitConverter.Convert(converter.Value{ - F: *r.ruleCondition.Target, - U: converter.Unit(r.ruleCondition.TargetUnit), - }, converter.Unit(r.Unit())) - - return value.F -} - func (r *PromRule) Type() RuleType { return RuleTypeProm } -func (r *PromRule) GeneratorURL() string { - return prepareRuleGeneratorURL(r.ID(), r.source) -} - -func (r *PromRule) PreferredChannels() []string { - return r.preferredChannels -} - -func (r *PromRule) SetLastError(err error) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.lastError = err -} - -func (r *PromRule) LastError() error { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.lastError -} - -func (r *PromRule) SetHealth(health RuleHealth) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.health = health -} - -func (r *PromRule) Health() RuleHealth { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.health -} - -// SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation. -func (r *PromRule) SetEvaluationDuration(dur time.Duration) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.evaluationDuration = dur -} - -func (r *PromRule) HoldDuration() time.Duration { - return r.holdDuration -} - -func (r *PromRule) EvalWindow() time.Duration { - return r.evalWindow -} - -// Labels returns the labels of the alerting rule. -func (r *PromRule) Labels() qslabels.BaseLabels { - return r.labels -} - -// Annotations returns the annotations of the alerting rule. -func (r *PromRule) Annotations() qslabels.BaseLabels { - return r.annotations -} - -// GetEvaluationDuration returns the time in seconds it took to evaluate the alerting rule. -func (r *PromRule) GetEvaluationDuration() time.Duration { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.evaluationDuration -} - -// SetEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule was last evaluated. -func (r *PromRule) SetEvaluationTimestamp(ts time.Time) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.evaluationTimestamp = ts -} - -// GetEvaluationTimestamp returns the time the evaluation took place. -func (r *PromRule) GetEvaluationTimestamp() time.Time { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.evaluationTimestamp -} - -// State returns the maximum state of alert instances for this rule. -// StateFiring > StatePending > StateInactive -func (r *PromRule) State() model.AlertState { - - maxState := model.StateInactive - for _, a := range r.active { - if a.State > maxState { - maxState = a.State - } - } - return maxState -} - -func (r *PromRule) currentAlerts() []*Alert { - r.mtx.Lock() - defer r.mtx.Unlock() - - alerts := make([]*Alert, 0, len(r.active)) - - for _, a := range r.active { - anew := *a - alerts = append(alerts, &anew) - } - return alerts -} - -func (r *PromRule) ActiveAlerts() []*Alert { - var res []*Alert - for _, a := range r.currentAlerts() { - if a.ResolvedAt.IsZero() { - res = append(res, a) - } - } - return res -} - -func (r *PromRule) Unit() string { - if r.ruleCondition != nil && r.ruleCondition.CompositeQuery != nil { - return r.ruleCondition.CompositeQuery.Unit - } - return "" -} - -// ForEachActiveAlert runs the given function on each alert. -// This should be used when you want to use the actual alerts from the ThresholdRule -// and not on its copy. -// If you want to run on a copy of alerts then don't use this, get the alerts from 'ActiveAlerts()'. -func (r *PromRule) ForEachActiveAlert(f func(*Alert)) { - r.mtx.Lock() - defer r.mtx.Unlock() - - for _, a := range r.active { - f(a) - } -} - -func (r *PromRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) { - alerts := []*Alert{} - r.ForEachActiveAlert(func(alert *Alert) { - if r.opts.SendAlways || alert.needsSending(ts, resendDelay) { - alert.LastSentAt = ts - // Allow for two Eval or Alertmanager send failures. - delta := resendDelay - if interval > resendDelay { - delta = interval - } - alert.ValidUntil = ts.Add(4 * delta) - anew := *alert - alerts = append(alerts, &anew) - } - }) - notifyFunc(ctx, "", alerts...) -} - func (r *PromRule) GetSelectedQuery() string { if r.ruleCondition != nil { // If the user has explicitly set the selected query, we return that. @@ -327,117 +92,7 @@ func (r *PromRule) getPqlQuery() (string, error) { return "", fmt.Errorf("invalid promql rule query") } -func (r *PromRule) matchType() MatchType { - if r.ruleCondition == nil { - return AtleastOnce - } - return r.ruleCondition.MatchType -} - -func (r *PromRule) compareOp() CompareOp { - if r.ruleCondition == nil { - return ValueIsEq - } - return r.ruleCondition.CompareOp -} - -// TODO(srikanthccv): implement base rule and use for all types of rules -func (r *PromRule) recordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error { - zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd)) - revisedItemsToAdd := map[uint64]v3.RuleStateHistory{} - - lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID()) - if err != nil { - return err - } - // if the query-service has been restarted, or the rule has been modified (which re-initializes the rule), - // the state would reset so we need to add the corresponding state changes to previously saved states - if !r.handledRestart && len(lastSavedState) > 0 { - zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState)) - l := map[uint64]v3.RuleStateHistory{} - for _, item := range itemsToAdd { - l[item.Fingerprint] = item - } - - shouldSkip := map[uint64]bool{} - - for _, item := range lastSavedState { - // for the last saved item with fingerprint, check if there is a corresponding entry in the current state - currentState, ok := l[item.Fingerprint] - if !ok { - // there was a state change in the past, but not in the current state - // if the state was firing, then we should add a resolved state change - if item.State == model.StateFiring || item.State == model.StateNoData { - item.State = model.StateInactive - item.StateChanged = true - item.UnixMilli = time.Now().UnixMilli() - revisedItemsToAdd[item.Fingerprint] = item - } - // there is nothing to do if the prev state was normal - } else { - if item.State != currentState.State { - item.State = currentState.State - item.StateChanged = true - item.UnixMilli = time.Now().UnixMilli() - revisedItemsToAdd[item.Fingerprint] = item - } - } - // do not add this item to revisedItemsToAdd as it is already processed - shouldSkip[item.Fingerprint] = true - } - zap.L().Debug("after lastSavedState loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - // if there are any new state changes that were not saved, add them to the revised items - for _, item := range itemsToAdd { - if _, ok := revisedItemsToAdd[item.Fingerprint]; !ok && !shouldSkip[item.Fingerprint] { - revisedItemsToAdd[item.Fingerprint] = item - } - } - zap.L().Debug("after itemsToAdd loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - newState := model.StateInactive - for _, item := range revisedItemsToAdd { - if item.State == model.StateFiring || item.State == model.StateNoData { - newState = model.StateFiring - break - } - } - zap.L().Debug("newState", zap.String("ruleid", r.ID()), zap.Any("newState", newState)) - - // if there is a change in the overall state, update the overall state - if lastSavedState[0].OverallState != newState { - for fingerprint, item := range revisedItemsToAdd { - item.OverallState = newState - item.OverallStateChanged = true - revisedItemsToAdd[fingerprint] = item - } - } - zap.L().Debug("revisedItemsToAdd after newState", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - } else { - for _, item := range itemsToAdd { - revisedItemsToAdd[item.Fingerprint] = item - } - } - - if len(revisedItemsToAdd) > 0 && r.reader != nil { - zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - entries := make([]v3.RuleStateHistory, 0, len(revisedItemsToAdd)) - for _, item := range revisedItemsToAdd { - entries = append(entries, item) - } - err := r.reader.AddRuleStateHistory(ctx, entries) - if err != nil { - zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd)) - } - } - r.handledRestart = true - - return nil -} - -func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) { +func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) { prevState := r.State() @@ -452,7 +107,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( return nil, err } zap.L().Info("evaluating promql query", zap.String("name", r.Name()), zap.String("query", q)) - res, err := queriers.PqlEngine.RunAlertQuery(ctx, q, start, end, interval) + res, err := r.pqlEngine.RunAlertQuery(ctx, q, start, end, interval) if err != nil { r.SetHealth(HealthBad) r.SetLastError(err) @@ -476,7 +131,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( continue } - alertSmpl, shouldAlert := r.shouldAlert(series) + alertSmpl, shouldAlert := r.shouldAlert(toCommonSeries(series)) if !shouldAlert { continue } @@ -484,7 +139,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( threshold := valueFormatter.Format(r.targetVal(), r.Unit()) - tmplData := AlertTemplateData(l, valueFormatter.Format(alertSmpl.F, r.Unit()), threshold) + tmplData := AlertTemplateData(l, valueFormatter.Format(alertSmpl.V, r.Unit()), threshold) // Inject some convenience variables that are easier to remember for users // who are not used to Go's templating system. defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}" @@ -507,20 +162,20 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( return result } - lb := plabels.NewBuilder(alertSmpl.Metric).Del(plabels.MetricName) - resultLabels := plabels.NewBuilder(alertSmpl.Metric).Del(plabels.MetricName).Labels() + lb := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel) + resultLabels := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel).Labels() - for _, l := range r.labels { - lb.Set(l.Name, expand(l.Value)) + for name, value := range r.labels.Map() { + lb.Set(name, expand(value)) } lb.Set(qslabels.AlertNameLabel, r.Name()) lb.Set(qslabels.AlertRuleIdLabel, r.ID()) lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL()) - annotations := make(plabels.Labels, 0, len(r.annotations)) - for _, a := range r.annotations { - annotations = append(annotations, plabels.Label{Name: a.Name, Value: expand(a.Value)}) + annotations := make(qslabels.Labels, 0, len(r.annotations.Map())) + for name, value := range r.annotations.Map() { + annotations = append(annotations, qslabels.Label{Name: name, Value: expand(value)}) } lbs := lb.Labels() @@ -542,7 +197,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( Annotations: annotations, ActiveAt: ts, State: model.StatePending, - Value: alertSmpl.F, + Value: alertSmpl.V, GeneratorURL: r.GeneratorURL(), Receivers: r.preferredChannels, } @@ -626,169 +281,11 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( itemsToAdd[idx] = item } - r.recordRuleStateHistory(ctx, prevState, currentState, itemsToAdd) + r.RecordRuleStateHistory(ctx, prevState, currentState, itemsToAdd) return len(r.active), nil } -func (r *PromRule) shouldAlert(series pql.Series) (pql.Sample, bool) { - var alertSmpl pql.Sample - var shouldAlert bool - switch r.matchType() { - case AtleastOnce: - // If any sample matches the condition, the rule is firing. - if r.compareOp() == ValueIsAbove { - for _, smpl := range series.Floats { - if smpl.F > r.targetVal() { - alertSmpl = pql.Sample{F: smpl.F, T: smpl.T, Metric: series.Metric} - shouldAlert = true - break - } - } - } else if r.compareOp() == ValueIsBelow { - for _, smpl := range series.Floats { - if smpl.F < r.targetVal() { - alertSmpl = pql.Sample{F: smpl.F, T: smpl.T, Metric: series.Metric} - shouldAlert = true - break - } - } - } else if r.compareOp() == ValueIsEq { - for _, smpl := range series.Floats { - if smpl.F == r.targetVal() { - alertSmpl = pql.Sample{F: smpl.F, T: smpl.T, Metric: series.Metric} - shouldAlert = true - break - } - } - } else if r.compareOp() == ValueIsNotEq { - for _, smpl := range series.Floats { - if smpl.F != r.targetVal() { - alertSmpl = pql.Sample{F: smpl.F, T: smpl.T, Metric: series.Metric} - shouldAlert = true - break - } - } - } - case AllTheTimes: - // If all samples match the condition, the rule is firing. - shouldAlert = true - alertSmpl = pql.Sample{F: r.targetVal(), Metric: series.Metric} - if r.compareOp() == ValueIsAbove { - for _, smpl := range series.Floats { - if smpl.F <= r.targetVal() { - shouldAlert = false - break - } - } - // use min value from the series - if shouldAlert { - var minValue float64 = math.Inf(1) - for _, smpl := range series.Floats { - if smpl.F < minValue { - minValue = smpl.F - } - } - alertSmpl = pql.Sample{F: minValue, Metric: series.Metric} - } - } else if r.compareOp() == ValueIsBelow { - for _, smpl := range series.Floats { - if smpl.F >= r.targetVal() { - shouldAlert = false - break - } - } - if shouldAlert { - var maxValue float64 = math.Inf(-1) - for _, smpl := range series.Floats { - if smpl.F > maxValue { - maxValue = smpl.F - } - } - alertSmpl = pql.Sample{F: maxValue, Metric: series.Metric} - } - } else if r.compareOp() == ValueIsEq { - for _, smpl := range series.Floats { - if smpl.F != r.targetVal() { - shouldAlert = false - break - } - } - } else if r.compareOp() == ValueIsNotEq { - for _, smpl := range series.Floats { - if smpl.F == r.targetVal() { - shouldAlert = false - break - } - } - if shouldAlert { - for _, smpl := range series.Floats { - if !math.IsInf(smpl.F, 0) && !math.IsNaN(smpl.F) { - alertSmpl = pql.Sample{F: smpl.F, Metric: series.Metric} - break - } - } - } - } - case OnAverage: - // If the average of all samples matches the condition, the rule is firing. - var sum float64 - for _, smpl := range series.Floats { - if math.IsNaN(smpl.F) { - continue - } - sum += smpl.F - } - avg := sum / float64(len(series.Floats)) - alertSmpl = pql.Sample{F: avg, Metric: series.Metric} - if r.compareOp() == ValueIsAbove { - if avg > r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsBelow { - if avg < r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsEq { - if avg == r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsNotEq { - if avg != r.targetVal() { - shouldAlert = true - } - } - case InTotal: - // If the sum of all samples matches the condition, the rule is firing. - var sum float64 - for _, smpl := range series.Floats { - if math.IsNaN(smpl.F) { - continue - } - sum += smpl.F - } - alertSmpl = pql.Sample{F: sum, Metric: series.Metric} - if r.compareOp() == ValueIsAbove { - if sum > r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsBelow { - if sum < r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsEq { - if sum == r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsNotEq { - if sum != r.targetVal() { - shouldAlert = true - } - } - } - return alertSmpl, shouldAlert -} - func (r *PromRule) String() string { ar := PostableRule{ @@ -807,3 +304,23 @@ func (r *PromRule) String() string { return string(byt) } + +func toCommonSeries(series promql.Series) v3.Series { + commonSeries := v3.Series{} + + for _, lbl := range series.Metric { + commonSeries.Labels[lbl.Name] = lbl.Value + commonSeries.LabelsArray = append(commonSeries.LabelsArray, map[string]string{ + lbl.Name: lbl.Value, + }) + } + + for _, f := range series.Floats { + commonSeries.Points = append(commonSeries.Points, v3.Point{ + Timestamp: f.T, + Value: f.F, + }) + } + + return commonSeries +} diff --git a/pkg/query-service/rules/prom_rule_task.go b/pkg/query-service/rules/prom_rule_task.go index 032fc227f2..f78994430a 100644 --- a/pkg/query-service/rules/prom_rule_task.go +++ b/pkg/query-service/rules/prom_rule_task.go @@ -367,7 +367,7 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) { } ctx = context.WithValue(ctx, common.LogCommentKey, kvs) - _, err := rule.Eval(ctx, ts, g.opts.Queriers) + _, err := rule.Eval(ctx, ts) if err != nil { rule.SetHealth(HealthBad) rule.SetLastError(err) diff --git a/pkg/query-service/rules/promrule_test.go b/pkg/query-service/rules/promrule_test.go index fef7630bbd..7c559d1eee 100644 --- a/pkg/query-service/rules/promrule_test.go +++ b/pkg/query-service/rules/promrule_test.go @@ -656,12 +656,12 @@ func TestPromRuleShouldAlert(t *testing.T) { postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewPromRule("69", &postableRule, zap.NewNop(), PromRuleOpts{}, nil) + rule, err := NewPromRule("69", &postableRule, zap.NewNop(), nil, nil) if err != nil { assert.NoError(t, err) } - _, shoulAlert := rule.shouldAlert(c.values) + _, shoulAlert := rule.shouldAlert(toCommonSeries(c.values)) assert.Equal(t, c.expectAlert, shoulAlert, "Test case %d", idx) } } diff --git a/pkg/query-service/rules/queriers.go b/pkg/query-service/rules/queriers.go index 2739e04280..1e8c7fa083 100644 --- a/pkg/query-service/rules/queriers.go +++ b/pkg/query-service/rules/queriers.go @@ -1,21 +1 @@ package rules - -import ( - "github.com/ClickHouse/clickhouse-go/v2" - pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine" -) - -// Queriers register the options for querying metrics or event sources -// which return a condition that results in a alert. Currently we support -// promql engine and clickhouse queries but in future we may include -// api readers for Machine Learning (ML) use cases. -// Note: each rule will pick up the querier it is interested in -// and use it. This allows rules to have flexibility in choosing -// the query engines. -type Queriers struct { - // promql engine - PqlEngine *pqle.PqlEngine - - // metric querier - Ch clickhouse.Conn -} diff --git a/pkg/query-service/rules/rule.go b/pkg/query-service/rules/rule.go index eeb7de9066..bb41a2be13 100644 --- a/pkg/query-service/rules/rule.go +++ b/pkg/query-service/rules/rule.go @@ -5,6 +5,7 @@ import ( "time" "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils/labels" ) @@ -23,9 +24,8 @@ type Rule interface { PreferredChannels() []string - Eval(context.Context, time.Time, *Queriers) (interface{}, error) + Eval(context.Context, time.Time) (interface{}, error) String() string - // Query() string SetLastError(error) LastError() error SetHealth(RuleHealth) @@ -35,5 +35,7 @@ type Rule interface { SetEvaluationTimestamp(time.Time) GetEvaluationTimestamp() time.Time + RecordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error + SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) } diff --git a/pkg/query-service/rules/rule_task.go b/pkg/query-service/rules/rule_task.go index 61e154d74d..0a969bffc8 100644 --- a/pkg/query-service/rules/rule_task.go +++ b/pkg/query-service/rules/rule_task.go @@ -349,7 +349,7 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) { } ctx = context.WithValue(ctx, common.LogCommentKey, kvs) - _, err := rule.Eval(ctx, ts, g.opts.Queriers) + _, err := rule.Eval(ctx, ts) if err != nil { rule.SetHealth(HealthBad) rule.SetLastError(err) diff --git a/pkg/query-service/rules/threshold_rule.go b/pkg/query-service/rules/threshold_rule.go index e50fb4b761..d35798035e 100644 --- a/pkg/query-service/rules/threshold_rule.go +++ b/pkg/query-service/rules/threshold_rule.go @@ -9,17 +9,13 @@ import ( "net/url" "regexp" "sort" - "sync" "text/template" "time" "unicode" "go.uber.org/zap" - "github.com/ClickHouse/clickhouse-go/v2" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "go.signoz.io/signoz/pkg/query-service/common" - "go.signoz.io/signoz/pkg/query-service/converter" "go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/postprocess" @@ -41,38 +37,7 @@ import ( ) type ThresholdRule struct { - id string - name string - source string - ruleCondition *RuleCondition - // evalWindow is the time window used for evaluating the rule - // i.e each time we lookback from the current time, we look at data for the last - // evalWindow duration - evalWindow time.Duration - // holdDuration is the duration for which the alert waits before firing - holdDuration time.Duration - // holds the static set of labels and annotations for the rule - // these are the same for all alerts created for this rule - labels labels.Labels - annotations labels.Labels - - // preferredChannels is the list of channels to send the alert to - // if the rule is triggered - preferredChannels []string - mtx sync.Mutex - - // the time it took to evaluate the rule - evaluationDuration time.Duration - // the timestamp of the last evaluation - evaluationTimestamp time.Time - - health RuleHealth - - lastError error - - // map of active alerts - active map[uint64]*Alert - + *BaseRule // Ever since we introduced the new metrics query builder, the version is "v4" // for all the rules // if the version is "v3", then we use the old querier @@ -84,80 +49,31 @@ type ThresholdRule struct { // should be fast but we can still avoid the query if we have the data in memory temporalityMap map[string]map[v3.Temporality]bool - opts ThresholdRuleOpts - - // lastTimestampWithDatapoints is the timestamp of the last datapoint we observed - // for this rule - // this is used for missing data alerts - lastTimestampWithDatapoints time.Time - - // Type of the rule - typ AlertType - // querier is used for alerts created before the introduction of new metrics query builder querier interfaces.Querier // querierV2 is used for alerts created after the introduction of new metrics query builder querierV2 interfaces.Querier - - reader interfaces.Reader - evalDelay time.Duration - - handledRestart bool -} - -type ThresholdRuleOpts struct { - // sendUnmatched sends observed metric values - // even if they dont match the rule condition. this is - // useful in testing the rule - SendUnmatched bool - - // sendAlways will send alert irresepective of resendDelay - // or other params - SendAlways bool - - // EvalDelay is the time to wait for data to be available - // before evaluating the rule. This is useful in scenarios - // where data might not be available in the system immediately - // after the timestamp. - EvalDelay time.Duration } func NewThresholdRule( id string, p *PostableRule, - opts ThresholdRuleOpts, featureFlags interfaces.FeatureLookup, reader interfaces.Reader, + opts ...RuleOption, ) (*ThresholdRule, error) { zap.L().Info("creating new ThresholdRule", zap.String("id", id), zap.Any("opts", opts)) - if p.RuleCondition == nil { - return nil, fmt.Errorf("no rule condition") - } else if !p.RuleCondition.IsValid() { - return nil, fmt.Errorf("invalid rule condition") + baseRule, err := NewBaseRule(id, p, reader, opts...) + if err != nil { + return nil, err } t := ThresholdRule{ - id: id, - name: p.AlertName, - source: p.Source, - ruleCondition: p.RuleCondition, - evalWindow: time.Duration(p.EvalWindow), - labels: labels.FromMap(p.Labels), - annotations: labels.FromMap(p.Annotations), - preferredChannels: p.PreferredChannels, - health: HealthUnknown, - active: map[uint64]*Alert{}, - opts: opts, - typ: p.AlertType, - version: p.Version, - temporalityMap: make(map[string]map[v3.Temporality]bool), - evalDelay: opts.EvalDelay, - } - - if int64(t.evalWindow) == 0 { - t.evalWindow = 5 * time.Minute + BaseRule: baseRule, + version: p.Version, + temporalityMap: make(map[string]map[v3.Temporality]bool), } querierOption := querier.QuerierOptions{ @@ -177,203 +93,15 @@ func NewThresholdRule( t.querier = querier.NewQuerier(querierOption) t.querierV2 = querierV2.NewQuerier(querierOptsV2) t.reader = reader - - zap.L().Info("creating new ThresholdRule", zap.String("name", t.name), zap.String("id", t.id)) - return &t, nil } -func (r *ThresholdRule) Name() string { - return r.name -} - -func (r *ThresholdRule) ID() string { - return r.id -} - -func (r *ThresholdRule) Condition() *RuleCondition { - return r.ruleCondition -} - -func (r *ThresholdRule) GeneratorURL() string { - return prepareRuleGeneratorURL(r.ID(), r.source) -} - -func (r *ThresholdRule) PreferredChannels() []string { - return r.preferredChannels -} - -// targetVal returns the target value for the rule condition -// when the y-axis and target units are non-empty, it -// converts the target value to the y-axis unit -func (r *ThresholdRule) targetVal() float64 { - if r.ruleCondition == nil || r.ruleCondition.Target == nil { - return 0 - } - - // get the converter for the target unit - unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit)) - // convert the target value to the y-axis unit - value := unitConverter.Convert(converter.Value{ - F: *r.ruleCondition.Target, - U: converter.Unit(r.ruleCondition.TargetUnit), - }, converter.Unit(r.Unit())) - - return value.F -} - -func (r *ThresholdRule) matchType() MatchType { - if r.ruleCondition == nil { - return AtleastOnce - } - return r.ruleCondition.MatchType -} - -func (r *ThresholdRule) compareOp() CompareOp { - if r.ruleCondition == nil { - return ValueIsEq - } - return r.ruleCondition.CompareOp -} - func (r *ThresholdRule) Type() RuleType { return RuleTypeThreshold } -func (r *ThresholdRule) SetLastError(err error) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.lastError = err -} - -func (r *ThresholdRule) LastError() error { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.lastError -} - -func (r *ThresholdRule) SetHealth(health RuleHealth) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.health = health -} - -func (r *ThresholdRule) Health() RuleHealth { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.health -} - -// SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation. -func (r *ThresholdRule) SetEvaluationDuration(dur time.Duration) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.evaluationDuration = dur -} - -func (r *ThresholdRule) HoldDuration() time.Duration { - return r.holdDuration -} - -func (r *ThresholdRule) EvalWindow() time.Duration { - return r.evalWindow -} - -// Labels returns the labels of the alerting rule. -func (r *ThresholdRule) Labels() labels.BaseLabels { - return r.labels -} - -// Annotations returns the annotations of the alerting rule. -func (r *ThresholdRule) Annotations() labels.BaseLabels { - return r.annotations -} - -// GetEvaluationDuration returns the time in seconds it took to evaluate the alerting rule. -func (r *ThresholdRule) GetEvaluationDuration() time.Duration { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.evaluationDuration -} - -// SetEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule was last evaluated. -func (r *ThresholdRule) SetEvaluationTimestamp(ts time.Time) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.evaluationTimestamp = ts -} - -// GetEvaluationTimestamp returns the time the evaluation took place. -func (r *ThresholdRule) GetEvaluationTimestamp() time.Time { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.evaluationTimestamp -} - -// State returns the maximum state of alert instances for this rule. -// StateFiring > StatePending > StateInactive -func (r *ThresholdRule) State() model.AlertState { - - maxState := model.StateInactive - for _, a := range r.active { - if a.State > maxState { - maxState = a.State - } - } - return maxState -} - -func (r *ThresholdRule) currentAlerts() []*Alert { - r.mtx.Lock() - defer r.mtx.Unlock() - - alerts := make([]*Alert, 0, len(r.active)) - - for _, a := range r.active { - anew := *a - alerts = append(alerts, &anew) - } - return alerts -} - -func (r *ThresholdRule) ActiveAlerts() []*Alert { - var res []*Alert - for _, a := range r.currentAlerts() { - if a.ResolvedAt.IsZero() { - res = append(res, a) - } - } - return res -} - -func (r *ThresholdRule) FetchTemporality(ctx context.Context, metricNames []string, ch driver.Conn) (map[string]map[v3.Temporality]bool, error) { - - metricNameToTemporality := make(map[string]map[v3.Temporality]bool) - - query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME) - - rows, err := ch.Query(ctx, query, metricNames) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var metricName, temporality string - err := rows.Scan(&metricName, &temporality) - if err != nil { - return nil, err - } - if _, ok := metricNameToTemporality[metricName]; !ok { - metricNameToTemporality[metricName] = make(map[v3.Temporality]bool) - } - metricNameToTemporality[metricName][v3.Temporality(temporality)] = true - } - return metricNameToTemporality, nil -} - // populateTemporality same as addTemporality but for v4 and better -func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3, ch driver.Conn) error { +func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error { missingTemporality := make([]string, 0) metricNameToTemporality := make(map[string]map[v3.Temporality]bool) @@ -405,7 +133,7 @@ func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRan var err error if len(missingTemporality) > 0 { - nameToTemporality, err = r.FetchTemporality(ctx, missingTemporality, ch) + nameToTemporality, err = r.reader.FetchTemporality(ctx, missingTemporality) if err != nil { return err } @@ -429,52 +157,13 @@ func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRan return nil } -// ForEachActiveAlert runs the given function on each alert. -// This should be used when you want to use the actual alerts from the ThresholdRule -// and not on its copy. -// If you want to run on a copy of alerts then don't use this, get the alerts from 'ActiveAlerts()'. -func (r *ThresholdRule) ForEachActiveAlert(f func(*Alert)) { - r.mtx.Lock() - defer r.mtx.Unlock() - - for _, a := range r.active { - f(a) - } -} - -func (r *ThresholdRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) { - alerts := []*Alert{} - r.ForEachActiveAlert(func(alert *Alert) { - if r.opts.SendAlways || alert.needsSending(ts, resendDelay) { - alert.LastSentAt = ts - // Allow for two Eval or Alertmanager send failures. - delta := resendDelay - if interval > resendDelay { - delta = interval - } - alert.ValidUntil = ts.Add(4 * delta) - anew := *alert - alerts = append(alerts, &anew) - } else { - zap.L().Debug("skipping send alert due to resend delay", zap.String("rule", r.Name()), zap.Any("alert", alert.Labels)) - } - }) - notifyFunc(ctx, "", alerts...) -} - -func (r *ThresholdRule) Unit() string { - if r.ruleCondition != nil && r.ruleCondition.CompositeQuery != nil { - return r.ruleCondition.CompositeQuery.Unit - } - return "" -} - -func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 { +func (r *ThresholdRule) prepareQueryRange(ts time.Time) (*v3.QueryRangeParamsV3, error) { zap.L().Info("prepareQueryRange", zap.Int64("ts", ts.UnixMilli()), zap.Int64("evalWindow", r.evalWindow.Milliseconds()), zap.Int64("evalDelay", r.evalDelay.Milliseconds())) start := ts.Add(-time.Duration(r.evalWindow)).UnixMilli() end := ts.UnixMilli() + if r.evalDelay > 0 { start = start - int64(r.evalDelay.Milliseconds()) end = end - int64(r.evalDelay.Milliseconds()) @@ -507,16 +196,12 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 { tmpl := template.New("clickhouse-query") tmpl, err := tmpl.Parse(chQuery.Query) if err != nil { - zap.L().Error("failed to parse clickhouse query to populate vars", zap.String("ruleid", r.ID()), zap.Error(err)) - r.SetHealth(HealthBad) - return params + return nil, err } var query bytes.Buffer err = tmpl.Execute(&query, params.Variables) if err != nil { - zap.L().Error("failed to populate clickhouse query", zap.String("ruleid", r.ID()), zap.Error(err)) - r.SetHealth(HealthBad) - return params + return nil, err } params.CompositeQuery.ClickHouseQueries[name] = &v3.ClickHouseQuery{ Query: query.String(), @@ -524,7 +209,7 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 { Legend: chQuery.Legend, } } - return params + return params, nil } if r.ruleCondition.CompositeQuery != nil && r.ruleCondition.CompositeQuery.BuilderQueries != nil { @@ -548,7 +233,7 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 { CompositeQuery: r.ruleCondition.CompositeQuery, Variables: make(map[string]interface{}, 0), NoCache: true, - } + }, nil } // The following function is used to prepare the where clause for the query @@ -625,7 +310,10 @@ func (r *ThresholdRule) prepareLinksToLogs(ts time.Time, lbls labels.Labels) str return "" } - q := r.prepareQueryRange(ts) + q, err := r.prepareQueryRange(ts) + if err != nil { + return "" + } // Logs list view expects time in milliseconds tr := v3.URLShareableTimeRange{ Start: q.Start, @@ -689,7 +377,10 @@ func (r *ThresholdRule) prepareLinksToTraces(ts time.Time, lbls labels.Labels) s return "" } - q := r.prepareQueryRange(ts) + q, err := r.prepareQueryRange(ts) + if err != nil { + return "" + } // Traces list view expects time in nanoseconds tr := v3.URLShareableTimeRange{ Start: q.Start * time.Second.Microseconds(), @@ -745,17 +436,6 @@ func (r *ThresholdRule) prepareLinksToTraces(ts time.Time, lbls labels.Labels) s return fmt.Sprintf("compositeQuery=%s&timeRange=%s&startTime=%d&endTime=%d&options=%s", compositeQuery, urlEncodedTimeRange, tr.Start, tr.End, urlEncodedOptions) } -func (r *ThresholdRule) hostFromSource() string { - parsedUrl, err := url.Parse(r.source) - if err != nil { - return "" - } - if parsedUrl.Port() != "" { - return fmt.Sprintf("%s://%s:%s", parsedUrl.Scheme, parsedUrl.Hostname(), parsedUrl.Port()) - } - return fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Hostname()) -} - func (r *ThresholdRule) GetSelectedQuery() string { if r.ruleCondition != nil { if r.ruleCondition.SelectedQuery != "" { @@ -797,18 +477,14 @@ func (r *ThresholdRule) GetSelectedQuery() string { return "" } -func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch clickhouse.Conn) (Vector, error) { - if r.ruleCondition == nil || r.ruleCondition.CompositeQuery == nil { - r.SetHealth(HealthBad) - r.SetLastError(fmt.Errorf("no rule condition")) - return nil, fmt.Errorf("invalid rule condition") - } +func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time) (Vector, error) { - params := r.prepareQueryRange(ts) - err := r.populateTemporality(ctx, params, ch) + params, err := r.prepareQueryRange(ts) + if err != nil { + return nil, err + } + err = r.populateTemporality(ctx, params) if err != nil { - r.SetHealth(HealthBad) - zap.L().Error("failed to set temporality", zap.String("rule", r.Name()), zap.Error(err)) return nil, fmt.Errorf("internal error while setting temporality") } @@ -822,24 +498,22 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch c } var results []*v3.Result - var errQuriesByName map[string]error + var queryErrors map[string]error if r.version == "v4" { - results, errQuriesByName, err = r.querierV2.QueryRange(ctx, params, map[string]v3.AttributeKey{}) + results, queryErrors, err = r.querierV2.QueryRange(ctx, params, map[string]v3.AttributeKey{}) } else { - results, errQuriesByName, err = r.querier.QueryRange(ctx, params, map[string]v3.AttributeKey{}) + results, queryErrors, err = r.querier.QueryRange(ctx, params, map[string]v3.AttributeKey{}) } if err != nil { - zap.L().Error("failed to get alert query result", zap.String("rule", r.Name()), zap.Error(err), zap.Any("queries", errQuriesByName)) - r.SetHealth(HealthBad) + zap.L().Error("failed to get alert query result", zap.String("rule", r.Name()), zap.Error(err), zap.Any("errors", queryErrors)) return nil, fmt.Errorf("internal error while querying") } if params.CompositeQuery.QueryType == v3.QueryTypeBuilder { results, err = postprocess.PostProcessResult(results, params) if err != nil { - r.SetHealth(HealthBad) zap.L().Error("failed to post process result", zap.String("rule", r.Name()), zap.Error(err)) return nil, fmt.Errorf("internal error while post processing") } @@ -901,113 +575,14 @@ func normalizeLabelName(name string) string { return normalized } -// TODO(srikanthccv): implement base rule and use for all types of rules -func (r *ThresholdRule) recordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error { - zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd)) - revisedItemsToAdd := map[uint64]v3.RuleStateHistory{} - - lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID()) - if err != nil { - return err - } - // if the query-service has been restarted, or the rule has been modified (which re-initializes the rule), - // the state would reset so we need to add the corresponding state changes to previously saved states - if !r.handledRestart && len(lastSavedState) > 0 { - zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState)) - l := map[uint64]v3.RuleStateHistory{} - for _, item := range itemsToAdd { - l[item.Fingerprint] = item - } - - shouldSkip := map[uint64]bool{} - - for _, item := range lastSavedState { - // for the last saved item with fingerprint, check if there is a corresponding entry in the current state - currentState, ok := l[item.Fingerprint] - if !ok { - // there was a state change in the past, but not in the current state - // if the state was firing, then we should add a resolved state change - if item.State == model.StateFiring || item.State == model.StateNoData { - item.State = model.StateInactive - item.StateChanged = true - item.UnixMilli = time.Now().UnixMilli() - revisedItemsToAdd[item.Fingerprint] = item - } - // there is nothing to do if the prev state was normal - } else { - if item.State != currentState.State { - item.State = currentState.State - item.StateChanged = true - item.UnixMilli = time.Now().UnixMilli() - revisedItemsToAdd[item.Fingerprint] = item - } - } - // do not add this item to revisedItemsToAdd as it is already processed - shouldSkip[item.Fingerprint] = true - } - zap.L().Debug("after lastSavedState loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - // if there are any new state changes that were not saved, add them to the revised items - for _, item := range itemsToAdd { - if _, ok := revisedItemsToAdd[item.Fingerprint]; !ok && !shouldSkip[item.Fingerprint] { - revisedItemsToAdd[item.Fingerprint] = item - } - } - zap.L().Debug("after itemsToAdd loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - newState := model.StateInactive - for _, item := range revisedItemsToAdd { - if item.State == model.StateFiring || item.State == model.StateNoData { - newState = model.StateFiring - break - } - } - zap.L().Debug("newState", zap.String("ruleid", r.ID()), zap.Any("newState", newState)) - - // if there is a change in the overall state, update the overall state - if lastSavedState[0].OverallState != newState { - for fingerprint, item := range revisedItemsToAdd { - item.OverallState = newState - item.OverallStateChanged = true - revisedItemsToAdd[fingerprint] = item - } - } - zap.L().Debug("revisedItemsToAdd after newState", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - } else { - for _, item := range itemsToAdd { - revisedItemsToAdd[item.Fingerprint] = item - } - } - - if len(revisedItemsToAdd) > 0 && r.reader != nil { - zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - entries := make([]v3.RuleStateHistory, 0, len(revisedItemsToAdd)) - for _, item := range revisedItemsToAdd { - entries = append(entries, item) - } - err := r.reader.AddRuleStateHistory(ctx, entries) - if err != nil { - zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd)) - } - } - r.handledRestart = true - - return nil -} - -func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) { +func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) { prevState := r.State() valueFormatter := formatter.FromUnit(r.Unit()) - res, err := r.buildAndRunQuery(ctx, ts, queriers.Ch) + res, err := r.buildAndRunQuery(ctx, ts) if err != nil { - r.SetHealth(HealthBad) - r.SetLastError(err) - zap.L().Error("failure in buildAndRunQuery", zap.String("ruleid", r.ID()), zap.Error(err)) return nil, err } @@ -1054,17 +629,17 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel) resultLabels := labels.NewBuilder(smpl.MetricOrig).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel).Labels() - for _, l := range r.labels { - lb.Set(l.Name, expand(l.Value)) + for name, value := range r.labels.Map() { + lb.Set(name, expand(value)) } lb.Set(labels.AlertNameLabel, r.Name()) lb.Set(labels.AlertRuleIdLabel, r.ID()) lb.Set(labels.RuleSourceLabel, r.GeneratorURL()) - annotations := make(labels.Labels, 0, len(r.annotations)) - for _, a := range r.annotations { - annotations = append(annotations, labels.Label{Name: normalizeLabelName(a.Name), Value: expand(a.Value)}) + annotations := make(labels.Labels, 0, len(r.annotations.Map())) + for name, value := range r.annotations.Map() { + annotations = append(annotations, labels.Label{Name: normalizeLabelName(name), Value: expand(value)}) } if smpl.IsMissing { lb.Set(labels.AlertNameLabel, "[No data] "+r.Name()) @@ -1092,10 +667,6 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie if _, ok := alerts[h]; ok { zap.L().Error("the alert query returns duplicate records", zap.String("ruleid", r.ID()), zap.Any("alert", alerts[h])) err = fmt.Errorf("duplicate alert found, vector contains metrics with the same labelset after applying alert labels") - // We have already acquired the lock above hence using SetHealth and - // SetLastError will deadlock. - r.health = HealthBad - r.lastError = err return nil, err } @@ -1112,7 +683,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie } } - zap.L().Info("alerts found", zap.String("name", r.Name()), zap.Int("count", len(alerts))) + zap.L().Info("number of alerts found", zap.String("name", r.Name()), zap.Int("count", len(alerts))) // alerts[h] is ready, add or update active list now for h, a := range alerts { @@ -1127,7 +698,6 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie } r.active[h] = a - } itemsToAdd := []v3.RuleStateHistory{} @@ -1190,7 +760,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie itemsToAdd[idx] = item } - r.recordRuleStateHistory(ctx, prevState, currentState, itemsToAdd) + r.RecordRuleStateHistory(ctx, prevState, currentState, itemsToAdd) r.health = HealthGood r.lastError = err @@ -1226,179 +796,3 @@ func removeGroupinSetPoints(series v3.Series) []v3.Point { } return result } - -func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) { - var alertSmpl Sample - var shouldAlert bool - var lbls labels.Labels - var lblsNormalized labels.Labels - - for name, value := range series.Labels { - lbls = append(lbls, labels.Label{Name: name, Value: value}) - lblsNormalized = append(lblsNormalized, labels.Label{Name: normalizeLabelName(name), Value: value}) - } - - series.Points = removeGroupinSetPoints(series) - - // nothing to evaluate - if len(series.Points) == 0 { - return alertSmpl, false - } - - switch r.matchType() { - case AtleastOnce: - // If any sample matches the condition, the rule is firing. - if r.compareOp() == ValueIsAbove { - for _, smpl := range series.Points { - if smpl.Value > r.targetVal() { - alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} - shouldAlert = true - break - } - } - } else if r.compareOp() == ValueIsBelow { - for _, smpl := range series.Points { - if smpl.Value < r.targetVal() { - alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} - shouldAlert = true - break - } - } - } else if r.compareOp() == ValueIsEq { - for _, smpl := range series.Points { - if smpl.Value == r.targetVal() { - alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} - shouldAlert = true - break - } - } - } else if r.compareOp() == ValueIsNotEq { - for _, smpl := range series.Points { - if smpl.Value != r.targetVal() { - alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} - shouldAlert = true - break - } - } - } - case AllTheTimes: - // If all samples match the condition, the rule is firing. - shouldAlert = true - alertSmpl = Sample{Point: Point{V: r.targetVal()}, Metric: lblsNormalized, MetricOrig: lbls} - if r.compareOp() == ValueIsAbove { - for _, smpl := range series.Points { - if smpl.Value <= r.targetVal() { - shouldAlert = false - break - } - } - // use min value from the series - if shouldAlert { - var minValue float64 = math.Inf(1) - for _, smpl := range series.Points { - if smpl.Value < minValue { - minValue = smpl.Value - } - } - alertSmpl = Sample{Point: Point{V: minValue}, Metric: lblsNormalized, MetricOrig: lbls} - } - } else if r.compareOp() == ValueIsBelow { - for _, smpl := range series.Points { - if smpl.Value >= r.targetVal() { - shouldAlert = false - break - } - } - if shouldAlert { - var maxValue float64 = math.Inf(-1) - for _, smpl := range series.Points { - if smpl.Value > maxValue { - maxValue = smpl.Value - } - } - alertSmpl = Sample{Point: Point{V: maxValue}, Metric: lblsNormalized, MetricOrig: lbls} - } - } else if r.compareOp() == ValueIsEq { - for _, smpl := range series.Points { - if smpl.Value != r.targetVal() { - shouldAlert = false - break - } - } - } else if r.compareOp() == ValueIsNotEq { - for _, smpl := range series.Points { - if smpl.Value == r.targetVal() { - shouldAlert = false - break - } - } - // use any non-inf or nan value from the series - if shouldAlert { - for _, smpl := range series.Points { - if !math.IsInf(smpl.Value, 0) && !math.IsNaN(smpl.Value) { - alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} - break - } - } - } - } - case OnAverage: - // If the average of all samples matches the condition, the rule is firing. - var sum, count float64 - for _, smpl := range series.Points { - if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) { - continue - } - sum += smpl.Value - count++ - } - avg := sum / count - alertSmpl = Sample{Point: Point{V: avg}, Metric: lblsNormalized, MetricOrig: lbls} - if r.compareOp() == ValueIsAbove { - if avg > r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsBelow { - if avg < r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsEq { - if avg == r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsNotEq { - if avg != r.targetVal() { - shouldAlert = true - } - } - case InTotal: - // If the sum of all samples matches the condition, the rule is firing. - var sum float64 - - for _, smpl := range series.Points { - if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) { - continue - } - sum += smpl.Value - } - alertSmpl = Sample{Point: Point{V: sum}, Metric: lblsNormalized, MetricOrig: lbls} - if r.compareOp() == ValueIsAbove { - if sum > r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsBelow { - if sum < r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsEq { - if sum == r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsNotEq { - if sum != r.targetVal() { - shouldAlert = true - } - } - } - return alertSmpl, shouldAlert -} diff --git a/pkg/query-service/rules/threshold_rule_test.go b/pkg/query-service/rules/threshold_rule_test.go index 6cfeac83d9..734347793d 100644 --- a/pkg/query-service/rules/threshold_rule_test.go +++ b/pkg/query-service/rules/threshold_rule_test.go @@ -685,7 +685,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) { postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -774,7 +774,7 @@ func TestPrepareLinksToLogs(t *testing.T) { } fm := featureManager.StartManager() - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -816,7 +816,7 @@ func TestPrepareLinksToTraces(t *testing.T) { } fm := featureManager.StartManager() - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -892,7 +892,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) { postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -945,17 +945,17 @@ func TestThresholdRuleEvalDelay(t *testing.T) { fm := featureManager.StartManager() for idx, c := range cases { - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil) // no eval delay + rule, err := NewThresholdRule("69", &postableRule, fm, nil) // no eval delay if err != nil { assert.NoError(t, err) } - params := rule.prepareQueryRange(ts) - + params, err := rule.prepareQueryRange(ts) + assert.NoError(t, err) assert.Equal(t, c.expectedQuery, params.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx) - secondTimeParams := rule.prepareQueryRange(ts) - + secondTimeParams, err := rule.prepareQueryRange(ts) + assert.NoError(t, err) assert.Equal(t, c.expectedQuery, secondTimeParams.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx) } } @@ -994,17 +994,17 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) { fm := featureManager.StartManager() for idx, c := range cases { - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } - params := rule.prepareQueryRange(ts) - + params, err := rule.prepareQueryRange(ts) + assert.NoError(t, err) assert.Equal(t, c.expectedQuery, params.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx) - secondTimeParams := rule.prepareQueryRange(ts) - + secondTimeParams, err := rule.prepareQueryRange(ts) + assert.NoError(t, err) assert.Equal(t, c.expectedQuery, secondTimeParams.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx) } } @@ -1137,7 +1137,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "") - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, reader) + rule, err := NewThresholdRule("69", &postableRule, fm, reader) rule.temporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, @@ -1147,11 +1147,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { assert.NoError(t, err) } - queriers := Queriers{ - Ch: mock, - } - - retVal, err := rule.Eval(context.Background(), time.Now(), &queriers) + retVal, err := rule.Eval(context.Background(), time.Now()) if err != nil { assert.NoError(t, err) } @@ -1240,7 +1236,7 @@ func TestThresholdRuleNoData(t *testing.T) { options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "") - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, reader) + rule, err := NewThresholdRule("69", &postableRule, fm, reader) rule.temporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, @@ -1250,11 +1246,7 @@ func TestThresholdRuleNoData(t *testing.T) { assert.NoError(t, err) } - queriers := Queriers{ - Ch: mock, - } - - retVal, err := rule.Eval(context.Background(), time.Now(), &queriers) + retVal, err := rule.Eval(context.Background(), time.Now()) if err != nil { assert.NoError(t, err) }