chore: add base rule and consolidate common logic (#5849)

This commit is contained in:
Srikanth Chekuri 2024-09-11 09:56:59 +05:30 committed by GitHub
parent 2cc2a43e17
commit c79520c874
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 695 additions and 1254 deletions

View File

@ -746,10 +746,7 @@ func makeRulesManager(
// create manager opts // create manager opts
managerOpts := &baserules.ManagerOptions{ managerOpts := &baserules.ManagerOptions{
NotifierOpts: notifierOpts, NotifierOpts: notifierOpts,
Queriers: &baserules.Queriers{ PqlEngine: pqle,
PqlEngine: pqle,
Ch: ch.GetConn(),
},
RepoURL: ruleRepoURL, RepoURL: ruleRepoURL,
DBConn: db, DBConn: db,
Context: context.Background(), Context: context.Background(),

View File

@ -18,11 +18,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
tr, err := baserules.NewThresholdRule( tr, err := baserules.NewThresholdRule(
ruleId, ruleId,
opts.Rule, opts.Rule,
baserules.ThresholdRuleOpts{
EvalDelay: opts.ManagerOpts.EvalDelay,
},
opts.FF, opts.FF,
opts.Reader, opts.Reader,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
) )
if err != nil { if err != nil {
@ -41,8 +39,8 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
ruleId, ruleId,
opts.Rule, opts.Rule,
opts.Logger, opts.Logger,
baserules.PromRuleOpts{},
opts.Reader, opts.Reader,
opts.ManagerOpts.PqlEngine,
) )
if err != nil { if err != nil {

View File

@ -731,10 +731,7 @@ func makeRulesManager(
// create manager opts // create manager opts
managerOpts := &rules.ManagerOptions{ managerOpts := &rules.ManagerOptions{
NotifierOpts: notifierOpts, NotifierOpts: notifierOpts,
Queriers: &rules.Queriers{ PqlEngine: pqle,
PqlEngine: pqle,
Ch: ch.GetConn(),
},
RepoURL: ruleRepoURL, RepoURL: ruleRepoURL,
DBConn: db, DBConn: db,
Context: context.Background(), Context: context.Background(),

View File

@ -0,0 +1,567 @@
package rules
import (
"context"
"fmt"
"math"
"net/url"
"sync"
"time"
"go.signoz.io/signoz/pkg/query-service/converter"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
qslabels "go.signoz.io/signoz/pkg/query-service/utils/labels"
"go.uber.org/zap"
)
// BaseRule contains common fields and methods for all rule types
type BaseRule struct {
id string
name string
source string
handledRestart bool
// Type of the rule
typ AlertType
ruleCondition *RuleCondition
// evalWindow is the time window used for evaluating the rule
// i.e each time we lookback from the current time, we look at data for the last
// evalWindow duration
evalWindow time.Duration
// holdDuration is the duration for which the alert waits before firing
holdDuration time.Duration
// evalDelay is the delay in evaluation of the rule
// this is useful in cases where the data is not available immediately
evalDelay time.Duration
// holds the static set of labels and annotations for the rule
// these are the same for all alerts created for this rule
labels qslabels.BaseLabels
annotations qslabels.BaseLabels
// preferredChannels is the list of channels to send the alert to
// if the rule is triggered
preferredChannels []string
mtx sync.Mutex
// the time it took to evaluate the rule (most recent evaluation)
evaluationDuration time.Duration
// the timestamp of the last evaluation
evaluationTimestamp time.Time
health RuleHealth
lastError error
active map[uint64]*Alert
// lastTimestampWithDatapoints is the timestamp of the last datapoint we observed
// for this rule
// this is used for missing data alerts
lastTimestampWithDatapoints time.Time
reader interfaces.Reader
logger *zap.Logger
// sendUnmatched sends observed metric values
// even if they dont match the rule condition. this is
// useful in testing the rule
sendUnmatched bool
// sendAlways will send alert irresepective of resendDelay
// or other params
sendAlways bool
}
type RuleOption func(*BaseRule)
func WithSendAlways() RuleOption {
return func(r *BaseRule) {
r.sendAlways = true
}
}
func WithSendUnmatched() RuleOption {
return func(r *BaseRule) {
r.sendUnmatched = true
}
}
func WithEvalDelay(dur time.Duration) RuleOption {
return func(r *BaseRule) {
r.evalDelay = dur
}
}
func WithLogger(logger *zap.Logger) RuleOption {
return func(r *BaseRule) {
r.logger = logger
}
}
func NewBaseRule(id string, p *PostableRule, reader interfaces.Reader, opts ...RuleOption) (*BaseRule, error) {
if p.RuleCondition == nil || !p.RuleCondition.IsValid() {
return nil, fmt.Errorf("invalid rule condition")
}
baseRule := &BaseRule{
id: id,
name: p.AlertName,
source: p.Source,
ruleCondition: p.RuleCondition,
evalWindow: time.Duration(p.EvalWindow),
labels: qslabels.FromMap(p.Labels),
annotations: qslabels.FromMap(p.Annotations),
preferredChannels: p.PreferredChannels,
health: HealthUnknown,
active: map[uint64]*Alert{},
reader: reader,
}
if baseRule.evalWindow == 0 {
baseRule.evalWindow = 5 * time.Minute
}
for _, opt := range opts {
opt(baseRule)
}
return baseRule, nil
}
func (r *BaseRule) targetVal() float64 {
if r.ruleCondition == nil || r.ruleCondition.Target == nil {
return 0
}
// get the converter for the target unit
unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit))
// convert the target value to the y-axis unit
value := unitConverter.Convert(converter.Value{
F: *r.ruleCondition.Target,
U: converter.Unit(r.ruleCondition.TargetUnit),
}, converter.Unit(r.Unit()))
return value.F
}
func (r *BaseRule) matchType() MatchType {
if r.ruleCondition == nil {
return AtleastOnce
}
return r.ruleCondition.MatchType
}
func (r *BaseRule) compareOp() CompareOp {
if r.ruleCondition == nil {
return ValueIsEq
}
return r.ruleCondition.CompareOp
}
func (r *BaseRule) currentAlerts() []*Alert {
r.mtx.Lock()
defer r.mtx.Unlock()
alerts := make([]*Alert, 0, len(r.active))
for _, a := range r.active {
anew := *a
alerts = append(alerts, &anew)
}
return alerts
}
func (r *ThresholdRule) hostFromSource() string {
parsedUrl, err := url.Parse(r.source)
if err != nil {
return ""
}
if parsedUrl.Port() != "" {
return fmt.Sprintf("%s://%s:%s", parsedUrl.Scheme, parsedUrl.Hostname(), parsedUrl.Port())
}
return fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Hostname())
}
func (r *BaseRule) ID() string { return r.id }
func (r *BaseRule) Name() string { return r.name }
func (r *BaseRule) Condition() *RuleCondition { return r.ruleCondition }
func (r *BaseRule) Labels() qslabels.BaseLabels { return r.labels }
func (r *BaseRule) Annotations() qslabels.BaseLabels { return r.annotations }
func (r *BaseRule) PreferredChannels() []string { return r.preferredChannels }
func (r *BaseRule) GeneratorURL() string {
return prepareRuleGeneratorURL(r.ID(), r.source)
}
func (r *BaseRule) Unit() string {
if r.ruleCondition != nil && r.ruleCondition.CompositeQuery != nil {
return r.ruleCondition.CompositeQuery.Unit
}
return ""
}
func (r *BaseRule) SetLastError(err error) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.lastError = err
}
func (r *BaseRule) LastError() error {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.lastError
}
func (r *BaseRule) SetHealth(health RuleHealth) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.health = health
}
func (r *BaseRule) Health() RuleHealth {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.health
}
func (r *BaseRule) SetEvaluationDuration(dur time.Duration) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.evaluationDuration = dur
}
func (r *BaseRule) GetEvaluationDuration() time.Duration {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.evaluationDuration
}
func (r *BaseRule) SetEvaluationTimestamp(ts time.Time) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.evaluationTimestamp = ts
}
func (r *BaseRule) GetEvaluationTimestamp() time.Time {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.evaluationTimestamp
}
func (r *BaseRule) State() model.AlertState {
maxState := model.StateInactive
for _, a := range r.active {
if a.State > maxState {
maxState = a.State
}
}
return maxState
}
func (r *BaseRule) ActiveAlerts() []*Alert {
var res []*Alert
for _, a := range r.currentAlerts() {
if a.ResolvedAt.IsZero() {
res = append(res, a)
}
}
return res
}
func (r *BaseRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if alert.needsSending(ts, resendDelay) {
alert.LastSentAt = ts
delta := resendDelay
if interval > resendDelay {
delta = interval
}
alert.ValidUntil = ts.Add(4 * delta)
anew := *alert
alerts = append(alerts, &anew)
}
})
notifyFunc(ctx, "", alerts...)
}
func (r *BaseRule) ForEachActiveAlert(f func(*Alert)) {
r.mtx.Lock()
defer r.mtx.Unlock()
for _, a := range r.active {
f(a)
}
}
func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) {
var alertSmpl Sample
var shouldAlert bool
var lbls qslabels.Labels
var lblsNormalized qslabels.Labels
for name, value := range series.Labels {
lbls = append(lbls, qslabels.Label{Name: name, Value: value})
lblsNormalized = append(lblsNormalized, qslabels.Label{Name: normalizeLabelName(name), Value: value})
}
series.Points = removeGroupinSetPoints(series)
// nothing to evaluate
if len(series.Points) == 0 {
return alertSmpl, false
}
switch r.matchType() {
case AtleastOnce:
// If any sample matches the condition, the rule is firing.
if r.compareOp() == ValueIsAbove {
for _, smpl := range series.Points {
if smpl.Value > r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
shouldAlert = true
break
}
}
} else if r.compareOp() == ValueIsBelow {
for _, smpl := range series.Points {
if smpl.Value < r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
shouldAlert = true
break
}
}
} else if r.compareOp() == ValueIsEq {
for _, smpl := range series.Points {
if smpl.Value == r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
shouldAlert = true
break
}
}
} else if r.compareOp() == ValueIsNotEq {
for _, smpl := range series.Points {
if smpl.Value != r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
shouldAlert = true
break
}
}
}
case AllTheTimes:
// If all samples match the condition, the rule is firing.
shouldAlert = true
alertSmpl = Sample{Point: Point{V: r.targetVal()}, Metric: lblsNormalized, MetricOrig: lbls}
if r.compareOp() == ValueIsAbove {
for _, smpl := range series.Points {
if smpl.Value <= r.targetVal() {
shouldAlert = false
break
}
}
// use min value from the series
if shouldAlert {
var minValue float64 = math.Inf(1)
for _, smpl := range series.Points {
if smpl.Value < minValue {
minValue = smpl.Value
}
}
alertSmpl = Sample{Point: Point{V: minValue}, Metric: lblsNormalized, MetricOrig: lbls}
}
} else if r.compareOp() == ValueIsBelow {
for _, smpl := range series.Points {
if smpl.Value >= r.targetVal() {
shouldAlert = false
break
}
}
if shouldAlert {
var maxValue float64 = math.Inf(-1)
for _, smpl := range series.Points {
if smpl.Value > maxValue {
maxValue = smpl.Value
}
}
alertSmpl = Sample{Point: Point{V: maxValue}, Metric: lblsNormalized, MetricOrig: lbls}
}
} else if r.compareOp() == ValueIsEq {
for _, smpl := range series.Points {
if smpl.Value != r.targetVal() {
shouldAlert = false
break
}
}
} else if r.compareOp() == ValueIsNotEq {
for _, smpl := range series.Points {
if smpl.Value == r.targetVal() {
shouldAlert = false
break
}
}
// use any non-inf or nan value from the series
if shouldAlert {
for _, smpl := range series.Points {
if !math.IsInf(smpl.Value, 0) && !math.IsNaN(smpl.Value) {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
break
}
}
}
}
case OnAverage:
// If the average of all samples matches the condition, the rule is firing.
var sum, count float64
for _, smpl := range series.Points {
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
continue
}
sum += smpl.Value
count++
}
avg := sum / count
alertSmpl = Sample{Point: Point{V: avg}, Metric: lblsNormalized, MetricOrig: lbls}
if r.compareOp() == ValueIsAbove {
if avg > r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsBelow {
if avg < r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsEq {
if avg == r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsNotEq {
if avg != r.targetVal() {
shouldAlert = true
}
}
case InTotal:
// If the sum of all samples matches the condition, the rule is firing.
var sum float64
for _, smpl := range series.Points {
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
continue
}
sum += smpl.Value
}
alertSmpl = Sample{Point: Point{V: sum}, Metric: lblsNormalized, MetricOrig: lbls}
if r.compareOp() == ValueIsAbove {
if sum > r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsBelow {
if sum < r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsEq {
if sum == r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsNotEq {
if sum != r.targetVal() {
shouldAlert = true
}
}
}
return alertSmpl, shouldAlert
}
func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error {
zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd))
revisedItemsToAdd := map[uint64]v3.RuleStateHistory{}
lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID())
if err != nil {
return err
}
// if the query-service has been restarted, or the rule has been modified (which re-initializes the rule),
// the state would reset so we need to add the corresponding state changes to previously saved states
if !r.handledRestart && len(lastSavedState) > 0 {
zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState))
l := map[uint64]v3.RuleStateHistory{}
for _, item := range itemsToAdd {
l[item.Fingerprint] = item
}
shouldSkip := map[uint64]bool{}
for _, item := range lastSavedState {
// for the last saved item with fingerprint, check if there is a corresponding entry in the current state
currentState, ok := l[item.Fingerprint]
if !ok {
// there was a state change in the past, but not in the current state
// if the state was firing, then we should add a resolved state change
if item.State == model.StateFiring || item.State == model.StateNoData {
item.State = model.StateInactive
item.StateChanged = true
item.UnixMilli = time.Now().UnixMilli()
revisedItemsToAdd[item.Fingerprint] = item
}
// there is nothing to do if the prev state was normal
} else {
if item.State != currentState.State {
item.State = currentState.State
item.StateChanged = true
item.UnixMilli = time.Now().UnixMilli()
revisedItemsToAdd[item.Fingerprint] = item
}
}
// do not add this item to revisedItemsToAdd as it is already processed
shouldSkip[item.Fingerprint] = true
}
zap.L().Debug("after lastSavedState loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
// if there are any new state changes that were not saved, add them to the revised items
for _, item := range itemsToAdd {
if _, ok := revisedItemsToAdd[item.Fingerprint]; !ok && !shouldSkip[item.Fingerprint] {
revisedItemsToAdd[item.Fingerprint] = item
}
}
zap.L().Debug("after itemsToAdd loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
newState := model.StateInactive
for _, item := range revisedItemsToAdd {
if item.State == model.StateFiring || item.State == model.StateNoData {
newState = model.StateFiring
break
}
}
zap.L().Debug("newState", zap.String("ruleid", r.ID()), zap.Any("newState", newState))
// if there is a change in the overall state, update the overall state
if lastSavedState[0].OverallState != newState {
for fingerprint, item := range revisedItemsToAdd {
item.OverallState = newState
item.OverallStateChanged = true
revisedItemsToAdd[fingerprint] = item
}
}
zap.L().Debug("revisedItemsToAdd after newState", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
} else {
for _, item := range itemsToAdd {
revisedItemsToAdd[item.Fingerprint] = item
}
}
if len(revisedItemsToAdd) > 0 && r.reader != nil {
zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
entries := make([]v3.RuleStateHistory, 0, len(revisedItemsToAdd))
for _, item := range revisedItemsToAdd {
entries = append(entries, item)
}
err := r.reader.AddRuleStateHistory(ctx, entries)
if err != nil {
zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd))
}
}
r.handledRestart = true
return nil
}

View File

@ -21,6 +21,7 @@ import (
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model"
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
"go.signoz.io/signoz/pkg/query-service/telemetry" "go.signoz.io/signoz/pkg/query-service/telemetry"
"go.signoz.io/signoz/pkg/query-service/utils/labels" "go.signoz.io/signoz/pkg/query-service/utils/labels"
) )
@ -56,7 +57,7 @@ func prepareTaskName(ruleId interface{}) string {
// ManagerOptions bundles options for the Manager. // ManagerOptions bundles options for the Manager.
type ManagerOptions struct { type ManagerOptions struct {
NotifierOpts am.NotifierOptions NotifierOpts am.NotifierOptions
Queriers *Queriers PqlEngine *pqle.PqlEngine
// RepoURL is used to generate a backlink in sent alert messages // RepoURL is used to generate a backlink in sent alert messages
RepoURL string RepoURL string
@ -127,11 +128,9 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
tr, err := NewThresholdRule( tr, err := NewThresholdRule(
ruleId, ruleId,
opts.Rule, opts.Rule,
ThresholdRuleOpts{
EvalDelay: opts.ManagerOpts.EvalDelay,
},
opts.FF, opts.FF,
opts.Reader, opts.Reader,
WithEvalDelay(opts.ManagerOpts.EvalDelay),
) )
if err != nil { if err != nil {
@ -150,8 +149,8 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
ruleId, ruleId,
opts.Rule, opts.Rule,
opts.Logger, opts.Logger,
PromRuleOpts{},
opts.Reader, opts.Reader,
opts.ManagerOpts.PqlEngine,
) )
if err != nil { if err != nil {
@ -793,12 +792,10 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
rule, err = NewThresholdRule( rule, err = NewThresholdRule(
alertname, alertname,
parsedRule, parsedRule,
ThresholdRuleOpts{
SendUnmatched: true,
SendAlways: true,
},
m.featureFlags, m.featureFlags,
m.reader, m.reader,
WithSendAlways(),
WithSendUnmatched(),
) )
if err != nil { if err != nil {
@ -813,10 +810,10 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
alertname, alertname,
parsedRule, parsedRule,
m.logger, m.logger,
PromRuleOpts{
SendAlways: true,
},
m.reader, m.reader,
m.opts.PqlEngine,
WithSendAlways(),
WithSendUnmatched(),
) )
if err != nil { if err != nil {
@ -830,7 +827,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
// set timestamp to current utc time // set timestamp to current utc time
ts := time.Now().UTC() ts := time.Now().UTC()
count, err := rule.Eval(ctx, ts, m.opts.Queriers) count, err := rule.Eval(ctx, ts)
if err != nil { if err != nil {
zap.L().Error("evaluating rule failed", zap.String("rule", rule.Name()), zap.Error(err)) zap.L().Error("evaluating rule failed", zap.String("rule", rule.Name()), zap.Error(err))
return 0, newApiErrorInternal(fmt.Errorf("rule evaluation failed")) return 0, newApiErrorInternal(fmt.Errorf("rule evaluation failed"))

View File

@ -4,295 +4,60 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math"
"sync"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
plabels "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql"
pql "github.com/prometheus/prometheus/promql"
"go.signoz.io/signoz/pkg/query-service/converter"
"go.signoz.io/signoz/pkg/query-service/formatter" "go.signoz.io/signoz/pkg/query-service/formatter"
"go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
qslabels "go.signoz.io/signoz/pkg/query-service/utils/labels" qslabels "go.signoz.io/signoz/pkg/query-service/utils/labels"
"go.signoz.io/signoz/pkg/query-service/utils/times" "go.signoz.io/signoz/pkg/query-service/utils/times"
"go.signoz.io/signoz/pkg/query-service/utils/timestamp" "go.signoz.io/signoz/pkg/query-service/utils/timestamp"
yaml "gopkg.in/yaml.v2" yaml "gopkg.in/yaml.v2"
) )
type PromRuleOpts struct {
// SendAlways will send alert irresepective of resendDelay
// or other params
SendAlways bool
}
type PromRule struct { type PromRule struct {
id string *BaseRule
name string pqlEngine *pqle.PqlEngine
source string
ruleCondition *RuleCondition
evalWindow time.Duration
holdDuration time.Duration
labels plabels.Labels
annotations plabels.Labels
preferredChannels []string
mtx sync.Mutex
evaluationDuration time.Duration
evaluationTimestamp time.Time
health RuleHealth
lastError error
// map of active alerts
active map[uint64]*Alert
logger *zap.Logger
opts PromRuleOpts
reader interfaces.Reader
handledRestart bool
} }
func NewPromRule( func NewPromRule(
id string, id string,
postableRule *PostableRule, postableRule *PostableRule,
logger *zap.Logger, logger *zap.Logger,
opts PromRuleOpts,
reader interfaces.Reader, reader interfaces.Reader,
pqlEngine *pqle.PqlEngine,
opts ...RuleOption,
) (*PromRule, error) { ) (*PromRule, error) {
if postableRule.RuleCondition == nil { baseRule, err := NewBaseRule(id, postableRule, reader, opts...)
return nil, fmt.Errorf("no rule condition") if err != nil {
} else if !postableRule.RuleCondition.IsValid() { return nil, err
return nil, fmt.Errorf("invalid rule condition")
} }
p := PromRule{ p := PromRule{
id: id, BaseRule: baseRule,
name: postableRule.AlertName, pqlEngine: pqlEngine,
source: postableRule.Source,
ruleCondition: postableRule.RuleCondition,
evalWindow: time.Duration(postableRule.EvalWindow),
labels: plabels.FromMap(postableRule.Labels),
annotations: plabels.FromMap(postableRule.Annotations),
preferredChannels: postableRule.PreferredChannels,
health: HealthUnknown,
active: map[uint64]*Alert{},
logger: logger,
opts: opts,
} }
p.reader = reader
if int64(p.evalWindow) == 0 {
p.evalWindow = 5 * time.Minute
}
query, err := p.getPqlQuery() query, err := p.getPqlQuery()
if err != nil { if err != nil {
// can not generate a valid prom QL query // can not generate a valid prom QL query
return nil, err return nil, err
} }
zap.L().Info("creating new prom rule", zap.String("name", p.name), zap.String("query", query))
zap.L().Info("creating new alerting rule", zap.String("name", p.name), zap.String("condition", p.ruleCondition.String()), zap.String("query", query))
return &p, nil return &p, nil
} }
func (r *PromRule) Name() string {
return r.name
}
func (r *PromRule) ID() string {
return r.id
}
func (r *PromRule) Condition() *RuleCondition {
return r.ruleCondition
}
// targetVal returns the target value for the rule condition
// when the y-axis and target units are non-empty, it
// converts the target value to the y-axis unit
func (r *PromRule) targetVal() float64 {
if r.ruleCondition == nil || r.ruleCondition.Target == nil {
return 0
}
// get the converter for the target unit
unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit))
// convert the target value to the y-axis unit
value := unitConverter.Convert(converter.Value{
F: *r.ruleCondition.Target,
U: converter.Unit(r.ruleCondition.TargetUnit),
}, converter.Unit(r.Unit()))
return value.F
}
func (r *PromRule) Type() RuleType { func (r *PromRule) Type() RuleType {
return RuleTypeProm return RuleTypeProm
} }
func (r *PromRule) GeneratorURL() string {
return prepareRuleGeneratorURL(r.ID(), r.source)
}
func (r *PromRule) PreferredChannels() []string {
return r.preferredChannels
}
func (r *PromRule) SetLastError(err error) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.lastError = err
}
func (r *PromRule) LastError() error {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.lastError
}
func (r *PromRule) SetHealth(health RuleHealth) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.health = health
}
func (r *PromRule) Health() RuleHealth {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.health
}
// SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation.
func (r *PromRule) SetEvaluationDuration(dur time.Duration) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.evaluationDuration = dur
}
func (r *PromRule) HoldDuration() time.Duration {
return r.holdDuration
}
func (r *PromRule) EvalWindow() time.Duration {
return r.evalWindow
}
// Labels returns the labels of the alerting rule.
func (r *PromRule) Labels() qslabels.BaseLabels {
return r.labels
}
// Annotations returns the annotations of the alerting rule.
func (r *PromRule) Annotations() qslabels.BaseLabels {
return r.annotations
}
// GetEvaluationDuration returns the time in seconds it took to evaluate the alerting rule.
func (r *PromRule) GetEvaluationDuration() time.Duration {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.evaluationDuration
}
// SetEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule was last evaluated.
func (r *PromRule) SetEvaluationTimestamp(ts time.Time) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.evaluationTimestamp = ts
}
// GetEvaluationTimestamp returns the time the evaluation took place.
func (r *PromRule) GetEvaluationTimestamp() time.Time {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.evaluationTimestamp
}
// State returns the maximum state of alert instances for this rule.
// StateFiring > StatePending > StateInactive
func (r *PromRule) State() model.AlertState {
maxState := model.StateInactive
for _, a := range r.active {
if a.State > maxState {
maxState = a.State
}
}
return maxState
}
func (r *PromRule) currentAlerts() []*Alert {
r.mtx.Lock()
defer r.mtx.Unlock()
alerts := make([]*Alert, 0, len(r.active))
for _, a := range r.active {
anew := *a
alerts = append(alerts, &anew)
}
return alerts
}
func (r *PromRule) ActiveAlerts() []*Alert {
var res []*Alert
for _, a := range r.currentAlerts() {
if a.ResolvedAt.IsZero() {
res = append(res, a)
}
}
return res
}
func (r *PromRule) Unit() string {
if r.ruleCondition != nil && r.ruleCondition.CompositeQuery != nil {
return r.ruleCondition.CompositeQuery.Unit
}
return ""
}
// ForEachActiveAlert runs the given function on each alert.
// This should be used when you want to use the actual alerts from the ThresholdRule
// and not on its copy.
// If you want to run on a copy of alerts then don't use this, get the alerts from 'ActiveAlerts()'.
func (r *PromRule) ForEachActiveAlert(f func(*Alert)) {
r.mtx.Lock()
defer r.mtx.Unlock()
for _, a := range r.active {
f(a)
}
}
func (r *PromRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if r.opts.SendAlways || alert.needsSending(ts, resendDelay) {
alert.LastSentAt = ts
// Allow for two Eval or Alertmanager send failures.
delta := resendDelay
if interval > resendDelay {
delta = interval
}
alert.ValidUntil = ts.Add(4 * delta)
anew := *alert
alerts = append(alerts, &anew)
}
})
notifyFunc(ctx, "", alerts...)
}
func (r *PromRule) GetSelectedQuery() string { func (r *PromRule) GetSelectedQuery() string {
if r.ruleCondition != nil { if r.ruleCondition != nil {
// If the user has explicitly set the selected query, we return that. // If the user has explicitly set the selected query, we return that.
@ -327,117 +92,7 @@ func (r *PromRule) getPqlQuery() (string, error) {
return "", fmt.Errorf("invalid promql rule query") return "", fmt.Errorf("invalid promql rule query")
} }
func (r *PromRule) matchType() MatchType { func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) {
if r.ruleCondition == nil {
return AtleastOnce
}
return r.ruleCondition.MatchType
}
func (r *PromRule) compareOp() CompareOp {
if r.ruleCondition == nil {
return ValueIsEq
}
return r.ruleCondition.CompareOp
}
// TODO(srikanthccv): implement base rule and use for all types of rules
func (r *PromRule) recordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error {
zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd))
revisedItemsToAdd := map[uint64]v3.RuleStateHistory{}
lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID())
if err != nil {
return err
}
// if the query-service has been restarted, or the rule has been modified (which re-initializes the rule),
// the state would reset so we need to add the corresponding state changes to previously saved states
if !r.handledRestart && len(lastSavedState) > 0 {
zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState))
l := map[uint64]v3.RuleStateHistory{}
for _, item := range itemsToAdd {
l[item.Fingerprint] = item
}
shouldSkip := map[uint64]bool{}
for _, item := range lastSavedState {
// for the last saved item with fingerprint, check if there is a corresponding entry in the current state
currentState, ok := l[item.Fingerprint]
if !ok {
// there was a state change in the past, but not in the current state
// if the state was firing, then we should add a resolved state change
if item.State == model.StateFiring || item.State == model.StateNoData {
item.State = model.StateInactive
item.StateChanged = true
item.UnixMilli = time.Now().UnixMilli()
revisedItemsToAdd[item.Fingerprint] = item
}
// there is nothing to do if the prev state was normal
} else {
if item.State != currentState.State {
item.State = currentState.State
item.StateChanged = true
item.UnixMilli = time.Now().UnixMilli()
revisedItemsToAdd[item.Fingerprint] = item
}
}
// do not add this item to revisedItemsToAdd as it is already processed
shouldSkip[item.Fingerprint] = true
}
zap.L().Debug("after lastSavedState loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
// if there are any new state changes that were not saved, add them to the revised items
for _, item := range itemsToAdd {
if _, ok := revisedItemsToAdd[item.Fingerprint]; !ok && !shouldSkip[item.Fingerprint] {
revisedItemsToAdd[item.Fingerprint] = item
}
}
zap.L().Debug("after itemsToAdd loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
newState := model.StateInactive
for _, item := range revisedItemsToAdd {
if item.State == model.StateFiring || item.State == model.StateNoData {
newState = model.StateFiring
break
}
}
zap.L().Debug("newState", zap.String("ruleid", r.ID()), zap.Any("newState", newState))
// if there is a change in the overall state, update the overall state
if lastSavedState[0].OverallState != newState {
for fingerprint, item := range revisedItemsToAdd {
item.OverallState = newState
item.OverallStateChanged = true
revisedItemsToAdd[fingerprint] = item
}
}
zap.L().Debug("revisedItemsToAdd after newState", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
} else {
for _, item := range itemsToAdd {
revisedItemsToAdd[item.Fingerprint] = item
}
}
if len(revisedItemsToAdd) > 0 && r.reader != nil {
zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
entries := make([]v3.RuleStateHistory, 0, len(revisedItemsToAdd))
for _, item := range revisedItemsToAdd {
entries = append(entries, item)
}
err := r.reader.AddRuleStateHistory(ctx, entries)
if err != nil {
zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd))
}
}
r.handledRestart = true
return nil
}
func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) {
prevState := r.State() prevState := r.State()
@ -452,7 +107,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
return nil, err return nil, err
} }
zap.L().Info("evaluating promql query", zap.String("name", r.Name()), zap.String("query", q)) zap.L().Info("evaluating promql query", zap.String("name", r.Name()), zap.String("query", q))
res, err := queriers.PqlEngine.RunAlertQuery(ctx, q, start, end, interval) res, err := r.pqlEngine.RunAlertQuery(ctx, q, start, end, interval)
if err != nil { if err != nil {
r.SetHealth(HealthBad) r.SetHealth(HealthBad)
r.SetLastError(err) r.SetLastError(err)
@ -476,7 +131,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
continue continue
} }
alertSmpl, shouldAlert := r.shouldAlert(series) alertSmpl, shouldAlert := r.shouldAlert(toCommonSeries(series))
if !shouldAlert { if !shouldAlert {
continue continue
} }
@ -484,7 +139,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
threshold := valueFormatter.Format(r.targetVal(), r.Unit()) threshold := valueFormatter.Format(r.targetVal(), r.Unit())
tmplData := AlertTemplateData(l, valueFormatter.Format(alertSmpl.F, r.Unit()), threshold) tmplData := AlertTemplateData(l, valueFormatter.Format(alertSmpl.V, r.Unit()), threshold)
// Inject some convenience variables that are easier to remember for users // Inject some convenience variables that are easier to remember for users
// who are not used to Go's templating system. // who are not used to Go's templating system.
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}" defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
@ -507,20 +162,20 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
return result return result
} }
lb := plabels.NewBuilder(alertSmpl.Metric).Del(plabels.MetricName) lb := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel)
resultLabels := plabels.NewBuilder(alertSmpl.Metric).Del(plabels.MetricName).Labels() resultLabels := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel).Labels()
for _, l := range r.labels { for name, value := range r.labels.Map() {
lb.Set(l.Name, expand(l.Value)) lb.Set(name, expand(value))
} }
lb.Set(qslabels.AlertNameLabel, r.Name()) lb.Set(qslabels.AlertNameLabel, r.Name())
lb.Set(qslabels.AlertRuleIdLabel, r.ID()) lb.Set(qslabels.AlertRuleIdLabel, r.ID())
lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL()) lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL())
annotations := make(plabels.Labels, 0, len(r.annotations)) annotations := make(qslabels.Labels, 0, len(r.annotations.Map()))
for _, a := range r.annotations { for name, value := range r.annotations.Map() {
annotations = append(annotations, plabels.Label{Name: a.Name, Value: expand(a.Value)}) annotations = append(annotations, qslabels.Label{Name: name, Value: expand(value)})
} }
lbs := lb.Labels() lbs := lb.Labels()
@ -542,7 +197,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
Annotations: annotations, Annotations: annotations,
ActiveAt: ts, ActiveAt: ts,
State: model.StatePending, State: model.StatePending,
Value: alertSmpl.F, Value: alertSmpl.V,
GeneratorURL: r.GeneratorURL(), GeneratorURL: r.GeneratorURL(),
Receivers: r.preferredChannels, Receivers: r.preferredChannels,
} }
@ -626,169 +281,11 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
itemsToAdd[idx] = item itemsToAdd[idx] = item
} }
r.recordRuleStateHistory(ctx, prevState, currentState, itemsToAdd) r.RecordRuleStateHistory(ctx, prevState, currentState, itemsToAdd)
return len(r.active), nil return len(r.active), nil
} }
func (r *PromRule) shouldAlert(series pql.Series) (pql.Sample, bool) {
var alertSmpl pql.Sample
var shouldAlert bool
switch r.matchType() {
case AtleastOnce:
// If any sample matches the condition, the rule is firing.
if r.compareOp() == ValueIsAbove {
for _, smpl := range series.Floats {
if smpl.F > r.targetVal() {
alertSmpl = pql.Sample{F: smpl.F, T: smpl.T, Metric: series.Metric}
shouldAlert = true
break
}
}
} else if r.compareOp() == ValueIsBelow {
for _, smpl := range series.Floats {
if smpl.F < r.targetVal() {
alertSmpl = pql.Sample{F: smpl.F, T: smpl.T, Metric: series.Metric}
shouldAlert = true
break
}
}
} else if r.compareOp() == ValueIsEq {
for _, smpl := range series.Floats {
if smpl.F == r.targetVal() {
alertSmpl = pql.Sample{F: smpl.F, T: smpl.T, Metric: series.Metric}
shouldAlert = true
break
}
}
} else if r.compareOp() == ValueIsNotEq {
for _, smpl := range series.Floats {
if smpl.F != r.targetVal() {
alertSmpl = pql.Sample{F: smpl.F, T: smpl.T, Metric: series.Metric}
shouldAlert = true
break
}
}
}
case AllTheTimes:
// If all samples match the condition, the rule is firing.
shouldAlert = true
alertSmpl = pql.Sample{F: r.targetVal(), Metric: series.Metric}
if r.compareOp() == ValueIsAbove {
for _, smpl := range series.Floats {
if smpl.F <= r.targetVal() {
shouldAlert = false
break
}
}
// use min value from the series
if shouldAlert {
var minValue float64 = math.Inf(1)
for _, smpl := range series.Floats {
if smpl.F < minValue {
minValue = smpl.F
}
}
alertSmpl = pql.Sample{F: minValue, Metric: series.Metric}
}
} else if r.compareOp() == ValueIsBelow {
for _, smpl := range series.Floats {
if smpl.F >= r.targetVal() {
shouldAlert = false
break
}
}
if shouldAlert {
var maxValue float64 = math.Inf(-1)
for _, smpl := range series.Floats {
if smpl.F > maxValue {
maxValue = smpl.F
}
}
alertSmpl = pql.Sample{F: maxValue, Metric: series.Metric}
}
} else if r.compareOp() == ValueIsEq {
for _, smpl := range series.Floats {
if smpl.F != r.targetVal() {
shouldAlert = false
break
}
}
} else if r.compareOp() == ValueIsNotEq {
for _, smpl := range series.Floats {
if smpl.F == r.targetVal() {
shouldAlert = false
break
}
}
if shouldAlert {
for _, smpl := range series.Floats {
if !math.IsInf(smpl.F, 0) && !math.IsNaN(smpl.F) {
alertSmpl = pql.Sample{F: smpl.F, Metric: series.Metric}
break
}
}
}
}
case OnAverage:
// If the average of all samples matches the condition, the rule is firing.
var sum float64
for _, smpl := range series.Floats {
if math.IsNaN(smpl.F) {
continue
}
sum += smpl.F
}
avg := sum / float64(len(series.Floats))
alertSmpl = pql.Sample{F: avg, Metric: series.Metric}
if r.compareOp() == ValueIsAbove {
if avg > r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsBelow {
if avg < r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsEq {
if avg == r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsNotEq {
if avg != r.targetVal() {
shouldAlert = true
}
}
case InTotal:
// If the sum of all samples matches the condition, the rule is firing.
var sum float64
for _, smpl := range series.Floats {
if math.IsNaN(smpl.F) {
continue
}
sum += smpl.F
}
alertSmpl = pql.Sample{F: sum, Metric: series.Metric}
if r.compareOp() == ValueIsAbove {
if sum > r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsBelow {
if sum < r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsEq {
if sum == r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsNotEq {
if sum != r.targetVal() {
shouldAlert = true
}
}
}
return alertSmpl, shouldAlert
}
func (r *PromRule) String() string { func (r *PromRule) String() string {
ar := PostableRule{ ar := PostableRule{
@ -807,3 +304,23 @@ func (r *PromRule) String() string {
return string(byt) return string(byt)
} }
func toCommonSeries(series promql.Series) v3.Series {
commonSeries := v3.Series{}
for _, lbl := range series.Metric {
commonSeries.Labels[lbl.Name] = lbl.Value
commonSeries.LabelsArray = append(commonSeries.LabelsArray, map[string]string{
lbl.Name: lbl.Value,
})
}
for _, f := range series.Floats {
commonSeries.Points = append(commonSeries.Points, v3.Point{
Timestamp: f.T,
Value: f.F,
})
}
return commonSeries
}

View File

@ -367,7 +367,7 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
} }
ctx = context.WithValue(ctx, common.LogCommentKey, kvs) ctx = context.WithValue(ctx, common.LogCommentKey, kvs)
_, err := rule.Eval(ctx, ts, g.opts.Queriers) _, err := rule.Eval(ctx, ts)
if err != nil { if err != nil {
rule.SetHealth(HealthBad) rule.SetHealth(HealthBad)
rule.SetLastError(err) rule.SetLastError(err)

View File

@ -656,12 +656,12 @@ func TestPromRuleShouldAlert(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target postableRule.RuleCondition.Target = &c.target
rule, err := NewPromRule("69", &postableRule, zap.NewNop(), PromRuleOpts{}, nil) rule, err := NewPromRule("69", &postableRule, zap.NewNop(), nil, nil)
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
_, shoulAlert := rule.shouldAlert(c.values) _, shoulAlert := rule.shouldAlert(toCommonSeries(c.values))
assert.Equal(t, c.expectAlert, shoulAlert, "Test case %d", idx) assert.Equal(t, c.expectAlert, shoulAlert, "Test case %d", idx)
} }
} }

View File

@ -1,21 +1 @@
package rules package rules
import (
"github.com/ClickHouse/clickhouse-go/v2"
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
)
// Queriers register the options for querying metrics or event sources
// which return a condition that results in a alert. Currently we support
// promql engine and clickhouse queries but in future we may include
// api readers for Machine Learning (ML) use cases.
// Note: each rule will pick up the querier it is interested in
// and use it. This allows rules to have flexibility in choosing
// the query engines.
type Queriers struct {
// promql engine
PqlEngine *pqle.PqlEngine
// metric querier
Ch clickhouse.Conn
}

View File

@ -5,6 +5,7 @@ import (
"time" "time"
"go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils/labels" "go.signoz.io/signoz/pkg/query-service/utils/labels"
) )
@ -23,9 +24,8 @@ type Rule interface {
PreferredChannels() []string PreferredChannels() []string
Eval(context.Context, time.Time, *Queriers) (interface{}, error) Eval(context.Context, time.Time) (interface{}, error)
String() string String() string
// Query() string
SetLastError(error) SetLastError(error)
LastError() error LastError() error
SetHealth(RuleHealth) SetHealth(RuleHealth)
@ -35,5 +35,7 @@ type Rule interface {
SetEvaluationTimestamp(time.Time) SetEvaluationTimestamp(time.Time)
GetEvaluationTimestamp() time.Time GetEvaluationTimestamp() time.Time
RecordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error
SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc)
} }

View File

@ -349,7 +349,7 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
} }
ctx = context.WithValue(ctx, common.LogCommentKey, kvs) ctx = context.WithValue(ctx, common.LogCommentKey, kvs)
_, err := rule.Eval(ctx, ts, g.opts.Queriers) _, err := rule.Eval(ctx, ts)
if err != nil { if err != nil {
rule.SetHealth(HealthBad) rule.SetHealth(HealthBad)
rule.SetLastError(err) rule.SetLastError(err)

View File

@ -9,17 +9,13 @@ import (
"net/url" "net/url"
"regexp" "regexp"
"sort" "sort"
"sync"
"text/template" "text/template"
"time" "time"
"unicode" "unicode"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/converter"
"go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/postprocess" "go.signoz.io/signoz/pkg/query-service/postprocess"
@ -41,38 +37,7 @@ import (
) )
type ThresholdRule struct { type ThresholdRule struct {
id string *BaseRule
name string
source string
ruleCondition *RuleCondition
// evalWindow is the time window used for evaluating the rule
// i.e each time we lookback from the current time, we look at data for the last
// evalWindow duration
evalWindow time.Duration
// holdDuration is the duration for which the alert waits before firing
holdDuration time.Duration
// holds the static set of labels and annotations for the rule
// these are the same for all alerts created for this rule
labels labels.Labels
annotations labels.Labels
// preferredChannels is the list of channels to send the alert to
// if the rule is triggered
preferredChannels []string
mtx sync.Mutex
// the time it took to evaluate the rule
evaluationDuration time.Duration
// the timestamp of the last evaluation
evaluationTimestamp time.Time
health RuleHealth
lastError error
// map of active alerts
active map[uint64]*Alert
// Ever since we introduced the new metrics query builder, the version is "v4" // Ever since we introduced the new metrics query builder, the version is "v4"
// for all the rules // for all the rules
// if the version is "v3", then we use the old querier // if the version is "v3", then we use the old querier
@ -84,80 +49,31 @@ type ThresholdRule struct {
// should be fast but we can still avoid the query if we have the data in memory // should be fast but we can still avoid the query if we have the data in memory
temporalityMap map[string]map[v3.Temporality]bool temporalityMap map[string]map[v3.Temporality]bool
opts ThresholdRuleOpts
// lastTimestampWithDatapoints is the timestamp of the last datapoint we observed
// for this rule
// this is used for missing data alerts
lastTimestampWithDatapoints time.Time
// Type of the rule
typ AlertType
// querier is used for alerts created before the introduction of new metrics query builder // querier is used for alerts created before the introduction of new metrics query builder
querier interfaces.Querier querier interfaces.Querier
// querierV2 is used for alerts created after the introduction of new metrics query builder // querierV2 is used for alerts created after the introduction of new metrics query builder
querierV2 interfaces.Querier querierV2 interfaces.Querier
reader interfaces.Reader
evalDelay time.Duration
handledRestart bool
}
type ThresholdRuleOpts struct {
// sendUnmatched sends observed metric values
// even if they dont match the rule condition. this is
// useful in testing the rule
SendUnmatched bool
// sendAlways will send alert irresepective of resendDelay
// or other params
SendAlways bool
// EvalDelay is the time to wait for data to be available
// before evaluating the rule. This is useful in scenarios
// where data might not be available in the system immediately
// after the timestamp.
EvalDelay time.Duration
} }
func NewThresholdRule( func NewThresholdRule(
id string, id string,
p *PostableRule, p *PostableRule,
opts ThresholdRuleOpts,
featureFlags interfaces.FeatureLookup, featureFlags interfaces.FeatureLookup,
reader interfaces.Reader, reader interfaces.Reader,
opts ...RuleOption,
) (*ThresholdRule, error) { ) (*ThresholdRule, error) {
zap.L().Info("creating new ThresholdRule", zap.String("id", id), zap.Any("opts", opts)) zap.L().Info("creating new ThresholdRule", zap.String("id", id), zap.Any("opts", opts))
if p.RuleCondition == nil { baseRule, err := NewBaseRule(id, p, reader, opts...)
return nil, fmt.Errorf("no rule condition") if err != nil {
} else if !p.RuleCondition.IsValid() { return nil, err
return nil, fmt.Errorf("invalid rule condition")
} }
t := ThresholdRule{ t := ThresholdRule{
id: id, BaseRule: baseRule,
name: p.AlertName, version: p.Version,
source: p.Source, temporalityMap: make(map[string]map[v3.Temporality]bool),
ruleCondition: p.RuleCondition,
evalWindow: time.Duration(p.EvalWindow),
labels: labels.FromMap(p.Labels),
annotations: labels.FromMap(p.Annotations),
preferredChannels: p.PreferredChannels,
health: HealthUnknown,
active: map[uint64]*Alert{},
opts: opts,
typ: p.AlertType,
version: p.Version,
temporalityMap: make(map[string]map[v3.Temporality]bool),
evalDelay: opts.EvalDelay,
}
if int64(t.evalWindow) == 0 {
t.evalWindow = 5 * time.Minute
} }
querierOption := querier.QuerierOptions{ querierOption := querier.QuerierOptions{
@ -177,203 +93,15 @@ func NewThresholdRule(
t.querier = querier.NewQuerier(querierOption) t.querier = querier.NewQuerier(querierOption)
t.querierV2 = querierV2.NewQuerier(querierOptsV2) t.querierV2 = querierV2.NewQuerier(querierOptsV2)
t.reader = reader t.reader = reader
zap.L().Info("creating new ThresholdRule", zap.String("name", t.name), zap.String("id", t.id))
return &t, nil return &t, nil
} }
func (r *ThresholdRule) Name() string {
return r.name
}
func (r *ThresholdRule) ID() string {
return r.id
}
func (r *ThresholdRule) Condition() *RuleCondition {
return r.ruleCondition
}
func (r *ThresholdRule) GeneratorURL() string {
return prepareRuleGeneratorURL(r.ID(), r.source)
}
func (r *ThresholdRule) PreferredChannels() []string {
return r.preferredChannels
}
// targetVal returns the target value for the rule condition
// when the y-axis and target units are non-empty, it
// converts the target value to the y-axis unit
func (r *ThresholdRule) targetVal() float64 {
if r.ruleCondition == nil || r.ruleCondition.Target == nil {
return 0
}
// get the converter for the target unit
unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit))
// convert the target value to the y-axis unit
value := unitConverter.Convert(converter.Value{
F: *r.ruleCondition.Target,
U: converter.Unit(r.ruleCondition.TargetUnit),
}, converter.Unit(r.Unit()))
return value.F
}
func (r *ThresholdRule) matchType() MatchType {
if r.ruleCondition == nil {
return AtleastOnce
}
return r.ruleCondition.MatchType
}
func (r *ThresholdRule) compareOp() CompareOp {
if r.ruleCondition == nil {
return ValueIsEq
}
return r.ruleCondition.CompareOp
}
func (r *ThresholdRule) Type() RuleType { func (r *ThresholdRule) Type() RuleType {
return RuleTypeThreshold return RuleTypeThreshold
} }
func (r *ThresholdRule) SetLastError(err error) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.lastError = err
}
func (r *ThresholdRule) LastError() error {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.lastError
}
func (r *ThresholdRule) SetHealth(health RuleHealth) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.health = health
}
func (r *ThresholdRule) Health() RuleHealth {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.health
}
// SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation.
func (r *ThresholdRule) SetEvaluationDuration(dur time.Duration) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.evaluationDuration = dur
}
func (r *ThresholdRule) HoldDuration() time.Duration {
return r.holdDuration
}
func (r *ThresholdRule) EvalWindow() time.Duration {
return r.evalWindow
}
// Labels returns the labels of the alerting rule.
func (r *ThresholdRule) Labels() labels.BaseLabels {
return r.labels
}
// Annotations returns the annotations of the alerting rule.
func (r *ThresholdRule) Annotations() labels.BaseLabels {
return r.annotations
}
// GetEvaluationDuration returns the time in seconds it took to evaluate the alerting rule.
func (r *ThresholdRule) GetEvaluationDuration() time.Duration {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.evaluationDuration
}
// SetEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule was last evaluated.
func (r *ThresholdRule) SetEvaluationTimestamp(ts time.Time) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.evaluationTimestamp = ts
}
// GetEvaluationTimestamp returns the time the evaluation took place.
func (r *ThresholdRule) GetEvaluationTimestamp() time.Time {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.evaluationTimestamp
}
// State returns the maximum state of alert instances for this rule.
// StateFiring > StatePending > StateInactive
func (r *ThresholdRule) State() model.AlertState {
maxState := model.StateInactive
for _, a := range r.active {
if a.State > maxState {
maxState = a.State
}
}
return maxState
}
func (r *ThresholdRule) currentAlerts() []*Alert {
r.mtx.Lock()
defer r.mtx.Unlock()
alerts := make([]*Alert, 0, len(r.active))
for _, a := range r.active {
anew := *a
alerts = append(alerts, &anew)
}
return alerts
}
func (r *ThresholdRule) ActiveAlerts() []*Alert {
var res []*Alert
for _, a := range r.currentAlerts() {
if a.ResolvedAt.IsZero() {
res = append(res, a)
}
}
return res
}
func (r *ThresholdRule) FetchTemporality(ctx context.Context, metricNames []string, ch driver.Conn) (map[string]map[v3.Temporality]bool, error) {
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME)
rows, err := ch.Query(ctx, query, metricNames)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var metricName, temporality string
err := rows.Scan(&metricName, &temporality)
if err != nil {
return nil, err
}
if _, ok := metricNameToTemporality[metricName]; !ok {
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
}
metricNameToTemporality[metricName][v3.Temporality(temporality)] = true
}
return metricNameToTemporality, nil
}
// populateTemporality same as addTemporality but for v4 and better // populateTemporality same as addTemporality but for v4 and better
func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3, ch driver.Conn) error { func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
missingTemporality := make([]string, 0) missingTemporality := make([]string, 0)
metricNameToTemporality := make(map[string]map[v3.Temporality]bool) metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
@ -405,7 +133,7 @@ func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRan
var err error var err error
if len(missingTemporality) > 0 { if len(missingTemporality) > 0 {
nameToTemporality, err = r.FetchTemporality(ctx, missingTemporality, ch) nameToTemporality, err = r.reader.FetchTemporality(ctx, missingTemporality)
if err != nil { if err != nil {
return err return err
} }
@ -429,52 +157,13 @@ func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRan
return nil return nil
} }
// ForEachActiveAlert runs the given function on each alert. func (r *ThresholdRule) prepareQueryRange(ts time.Time) (*v3.QueryRangeParamsV3, error) {
// This should be used when you want to use the actual alerts from the ThresholdRule
// and not on its copy.
// If you want to run on a copy of alerts then don't use this, get the alerts from 'ActiveAlerts()'.
func (r *ThresholdRule) ForEachActiveAlert(f func(*Alert)) {
r.mtx.Lock()
defer r.mtx.Unlock()
for _, a := range r.active {
f(a)
}
}
func (r *ThresholdRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if r.opts.SendAlways || alert.needsSending(ts, resendDelay) {
alert.LastSentAt = ts
// Allow for two Eval or Alertmanager send failures.
delta := resendDelay
if interval > resendDelay {
delta = interval
}
alert.ValidUntil = ts.Add(4 * delta)
anew := *alert
alerts = append(alerts, &anew)
} else {
zap.L().Debug("skipping send alert due to resend delay", zap.String("rule", r.Name()), zap.Any("alert", alert.Labels))
}
})
notifyFunc(ctx, "", alerts...)
}
func (r *ThresholdRule) Unit() string {
if r.ruleCondition != nil && r.ruleCondition.CompositeQuery != nil {
return r.ruleCondition.CompositeQuery.Unit
}
return ""
}
func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 {
zap.L().Info("prepareQueryRange", zap.Int64("ts", ts.UnixMilli()), zap.Int64("evalWindow", r.evalWindow.Milliseconds()), zap.Int64("evalDelay", r.evalDelay.Milliseconds())) zap.L().Info("prepareQueryRange", zap.Int64("ts", ts.UnixMilli()), zap.Int64("evalWindow", r.evalWindow.Milliseconds()), zap.Int64("evalDelay", r.evalDelay.Milliseconds()))
start := ts.Add(-time.Duration(r.evalWindow)).UnixMilli() start := ts.Add(-time.Duration(r.evalWindow)).UnixMilli()
end := ts.UnixMilli() end := ts.UnixMilli()
if r.evalDelay > 0 { if r.evalDelay > 0 {
start = start - int64(r.evalDelay.Milliseconds()) start = start - int64(r.evalDelay.Milliseconds())
end = end - int64(r.evalDelay.Milliseconds()) end = end - int64(r.evalDelay.Milliseconds())
@ -507,16 +196,12 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 {
tmpl := template.New("clickhouse-query") tmpl := template.New("clickhouse-query")
tmpl, err := tmpl.Parse(chQuery.Query) tmpl, err := tmpl.Parse(chQuery.Query)
if err != nil { if err != nil {
zap.L().Error("failed to parse clickhouse query to populate vars", zap.String("ruleid", r.ID()), zap.Error(err)) return nil, err
r.SetHealth(HealthBad)
return params
} }
var query bytes.Buffer var query bytes.Buffer
err = tmpl.Execute(&query, params.Variables) err = tmpl.Execute(&query, params.Variables)
if err != nil { if err != nil {
zap.L().Error("failed to populate clickhouse query", zap.String("ruleid", r.ID()), zap.Error(err)) return nil, err
r.SetHealth(HealthBad)
return params
} }
params.CompositeQuery.ClickHouseQueries[name] = &v3.ClickHouseQuery{ params.CompositeQuery.ClickHouseQueries[name] = &v3.ClickHouseQuery{
Query: query.String(), Query: query.String(),
@ -524,7 +209,7 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 {
Legend: chQuery.Legend, Legend: chQuery.Legend,
} }
} }
return params return params, nil
} }
if r.ruleCondition.CompositeQuery != nil && r.ruleCondition.CompositeQuery.BuilderQueries != nil { if r.ruleCondition.CompositeQuery != nil && r.ruleCondition.CompositeQuery.BuilderQueries != nil {
@ -548,7 +233,7 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 {
CompositeQuery: r.ruleCondition.CompositeQuery, CompositeQuery: r.ruleCondition.CompositeQuery,
Variables: make(map[string]interface{}, 0), Variables: make(map[string]interface{}, 0),
NoCache: true, NoCache: true,
} }, nil
} }
// The following function is used to prepare the where clause for the query // The following function is used to prepare the where clause for the query
@ -625,7 +310,10 @@ func (r *ThresholdRule) prepareLinksToLogs(ts time.Time, lbls labels.Labels) str
return "" return ""
} }
q := r.prepareQueryRange(ts) q, err := r.prepareQueryRange(ts)
if err != nil {
return ""
}
// Logs list view expects time in milliseconds // Logs list view expects time in milliseconds
tr := v3.URLShareableTimeRange{ tr := v3.URLShareableTimeRange{
Start: q.Start, Start: q.Start,
@ -689,7 +377,10 @@ func (r *ThresholdRule) prepareLinksToTraces(ts time.Time, lbls labels.Labels) s
return "" return ""
} }
q := r.prepareQueryRange(ts) q, err := r.prepareQueryRange(ts)
if err != nil {
return ""
}
// Traces list view expects time in nanoseconds // Traces list view expects time in nanoseconds
tr := v3.URLShareableTimeRange{ tr := v3.URLShareableTimeRange{
Start: q.Start * time.Second.Microseconds(), Start: q.Start * time.Second.Microseconds(),
@ -745,17 +436,6 @@ func (r *ThresholdRule) prepareLinksToTraces(ts time.Time, lbls labels.Labels) s
return fmt.Sprintf("compositeQuery=%s&timeRange=%s&startTime=%d&endTime=%d&options=%s", compositeQuery, urlEncodedTimeRange, tr.Start, tr.End, urlEncodedOptions) return fmt.Sprintf("compositeQuery=%s&timeRange=%s&startTime=%d&endTime=%d&options=%s", compositeQuery, urlEncodedTimeRange, tr.Start, tr.End, urlEncodedOptions)
} }
func (r *ThresholdRule) hostFromSource() string {
parsedUrl, err := url.Parse(r.source)
if err != nil {
return ""
}
if parsedUrl.Port() != "" {
return fmt.Sprintf("%s://%s:%s", parsedUrl.Scheme, parsedUrl.Hostname(), parsedUrl.Port())
}
return fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Hostname())
}
func (r *ThresholdRule) GetSelectedQuery() string { func (r *ThresholdRule) GetSelectedQuery() string {
if r.ruleCondition != nil { if r.ruleCondition != nil {
if r.ruleCondition.SelectedQuery != "" { if r.ruleCondition.SelectedQuery != "" {
@ -797,18 +477,14 @@ func (r *ThresholdRule) GetSelectedQuery() string {
return "" return ""
} }
func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch clickhouse.Conn) (Vector, error) { func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time) (Vector, error) {
if r.ruleCondition == nil || r.ruleCondition.CompositeQuery == nil {
r.SetHealth(HealthBad)
r.SetLastError(fmt.Errorf("no rule condition"))
return nil, fmt.Errorf("invalid rule condition")
}
params := r.prepareQueryRange(ts) params, err := r.prepareQueryRange(ts)
err := r.populateTemporality(ctx, params, ch) if err != nil {
return nil, err
}
err = r.populateTemporality(ctx, params)
if err != nil { if err != nil {
r.SetHealth(HealthBad)
zap.L().Error("failed to set temporality", zap.String("rule", r.Name()), zap.Error(err))
return nil, fmt.Errorf("internal error while setting temporality") return nil, fmt.Errorf("internal error while setting temporality")
} }
@ -822,24 +498,22 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch c
} }
var results []*v3.Result var results []*v3.Result
var errQuriesByName map[string]error var queryErrors map[string]error
if r.version == "v4" { if r.version == "v4" {
results, errQuriesByName, err = r.querierV2.QueryRange(ctx, params, map[string]v3.AttributeKey{}) results, queryErrors, err = r.querierV2.QueryRange(ctx, params, map[string]v3.AttributeKey{})
} else { } else {
results, errQuriesByName, err = r.querier.QueryRange(ctx, params, map[string]v3.AttributeKey{}) results, queryErrors, err = r.querier.QueryRange(ctx, params, map[string]v3.AttributeKey{})
} }
if err != nil { if err != nil {
zap.L().Error("failed to get alert query result", zap.String("rule", r.Name()), zap.Error(err), zap.Any("queries", errQuriesByName)) zap.L().Error("failed to get alert query result", zap.String("rule", r.Name()), zap.Error(err), zap.Any("errors", queryErrors))
r.SetHealth(HealthBad)
return nil, fmt.Errorf("internal error while querying") return nil, fmt.Errorf("internal error while querying")
} }
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder { if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
results, err = postprocess.PostProcessResult(results, params) results, err = postprocess.PostProcessResult(results, params)
if err != nil { if err != nil {
r.SetHealth(HealthBad)
zap.L().Error("failed to post process result", zap.String("rule", r.Name()), zap.Error(err)) zap.L().Error("failed to post process result", zap.String("rule", r.Name()), zap.Error(err))
return nil, fmt.Errorf("internal error while post processing") return nil, fmt.Errorf("internal error while post processing")
} }
@ -901,113 +575,14 @@ func normalizeLabelName(name string) string {
return normalized return normalized
} }
// TODO(srikanthccv): implement base rule and use for all types of rules func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) {
func (r *ThresholdRule) recordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error {
zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd))
revisedItemsToAdd := map[uint64]v3.RuleStateHistory{}
lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID())
if err != nil {
return err
}
// if the query-service has been restarted, or the rule has been modified (which re-initializes the rule),
// the state would reset so we need to add the corresponding state changes to previously saved states
if !r.handledRestart && len(lastSavedState) > 0 {
zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState))
l := map[uint64]v3.RuleStateHistory{}
for _, item := range itemsToAdd {
l[item.Fingerprint] = item
}
shouldSkip := map[uint64]bool{}
for _, item := range lastSavedState {
// for the last saved item with fingerprint, check if there is a corresponding entry in the current state
currentState, ok := l[item.Fingerprint]
if !ok {
// there was a state change in the past, but not in the current state
// if the state was firing, then we should add a resolved state change
if item.State == model.StateFiring || item.State == model.StateNoData {
item.State = model.StateInactive
item.StateChanged = true
item.UnixMilli = time.Now().UnixMilli()
revisedItemsToAdd[item.Fingerprint] = item
}
// there is nothing to do if the prev state was normal
} else {
if item.State != currentState.State {
item.State = currentState.State
item.StateChanged = true
item.UnixMilli = time.Now().UnixMilli()
revisedItemsToAdd[item.Fingerprint] = item
}
}
// do not add this item to revisedItemsToAdd as it is already processed
shouldSkip[item.Fingerprint] = true
}
zap.L().Debug("after lastSavedState loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
// if there are any new state changes that were not saved, add them to the revised items
for _, item := range itemsToAdd {
if _, ok := revisedItemsToAdd[item.Fingerprint]; !ok && !shouldSkip[item.Fingerprint] {
revisedItemsToAdd[item.Fingerprint] = item
}
}
zap.L().Debug("after itemsToAdd loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
newState := model.StateInactive
for _, item := range revisedItemsToAdd {
if item.State == model.StateFiring || item.State == model.StateNoData {
newState = model.StateFiring
break
}
}
zap.L().Debug("newState", zap.String("ruleid", r.ID()), zap.Any("newState", newState))
// if there is a change in the overall state, update the overall state
if lastSavedState[0].OverallState != newState {
for fingerprint, item := range revisedItemsToAdd {
item.OverallState = newState
item.OverallStateChanged = true
revisedItemsToAdd[fingerprint] = item
}
}
zap.L().Debug("revisedItemsToAdd after newState", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
} else {
for _, item := range itemsToAdd {
revisedItemsToAdd[item.Fingerprint] = item
}
}
if len(revisedItemsToAdd) > 0 && r.reader != nil {
zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
entries := make([]v3.RuleStateHistory, 0, len(revisedItemsToAdd))
for _, item := range revisedItemsToAdd {
entries = append(entries, item)
}
err := r.reader.AddRuleStateHistory(ctx, entries)
if err != nil {
zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd))
}
}
r.handledRestart = true
return nil
}
func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) {
prevState := r.State() prevState := r.State()
valueFormatter := formatter.FromUnit(r.Unit()) valueFormatter := formatter.FromUnit(r.Unit())
res, err := r.buildAndRunQuery(ctx, ts, queriers.Ch) res, err := r.buildAndRunQuery(ctx, ts)
if err != nil { if err != nil {
r.SetHealth(HealthBad)
r.SetLastError(err)
zap.L().Error("failure in buildAndRunQuery", zap.String("ruleid", r.ID()), zap.Error(err))
return nil, err return nil, err
} }
@ -1054,17 +629,17 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel) lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel)
resultLabels := labels.NewBuilder(smpl.MetricOrig).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel).Labels() resultLabels := labels.NewBuilder(smpl.MetricOrig).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel).Labels()
for _, l := range r.labels { for name, value := range r.labels.Map() {
lb.Set(l.Name, expand(l.Value)) lb.Set(name, expand(value))
} }
lb.Set(labels.AlertNameLabel, r.Name()) lb.Set(labels.AlertNameLabel, r.Name())
lb.Set(labels.AlertRuleIdLabel, r.ID()) lb.Set(labels.AlertRuleIdLabel, r.ID())
lb.Set(labels.RuleSourceLabel, r.GeneratorURL()) lb.Set(labels.RuleSourceLabel, r.GeneratorURL())
annotations := make(labels.Labels, 0, len(r.annotations)) annotations := make(labels.Labels, 0, len(r.annotations.Map()))
for _, a := range r.annotations { for name, value := range r.annotations.Map() {
annotations = append(annotations, labels.Label{Name: normalizeLabelName(a.Name), Value: expand(a.Value)}) annotations = append(annotations, labels.Label{Name: normalizeLabelName(name), Value: expand(value)})
} }
if smpl.IsMissing { if smpl.IsMissing {
lb.Set(labels.AlertNameLabel, "[No data] "+r.Name()) lb.Set(labels.AlertNameLabel, "[No data] "+r.Name())
@ -1092,10 +667,6 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
if _, ok := alerts[h]; ok { if _, ok := alerts[h]; ok {
zap.L().Error("the alert query returns duplicate records", zap.String("ruleid", r.ID()), zap.Any("alert", alerts[h])) zap.L().Error("the alert query returns duplicate records", zap.String("ruleid", r.ID()), zap.Any("alert", alerts[h]))
err = fmt.Errorf("duplicate alert found, vector contains metrics with the same labelset after applying alert labels") err = fmt.Errorf("duplicate alert found, vector contains metrics with the same labelset after applying alert labels")
// We have already acquired the lock above hence using SetHealth and
// SetLastError will deadlock.
r.health = HealthBad
r.lastError = err
return nil, err return nil, err
} }
@ -1112,7 +683,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
} }
} }
zap.L().Info("alerts found", zap.String("name", r.Name()), zap.Int("count", len(alerts))) zap.L().Info("number of alerts found", zap.String("name", r.Name()), zap.Int("count", len(alerts)))
// alerts[h] is ready, add or update active list now // alerts[h] is ready, add or update active list now
for h, a := range alerts { for h, a := range alerts {
@ -1127,7 +698,6 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
} }
r.active[h] = a r.active[h] = a
} }
itemsToAdd := []v3.RuleStateHistory{} itemsToAdd := []v3.RuleStateHistory{}
@ -1190,7 +760,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
itemsToAdd[idx] = item itemsToAdd[idx] = item
} }
r.recordRuleStateHistory(ctx, prevState, currentState, itemsToAdd) r.RecordRuleStateHistory(ctx, prevState, currentState, itemsToAdd)
r.health = HealthGood r.health = HealthGood
r.lastError = err r.lastError = err
@ -1226,179 +796,3 @@ func removeGroupinSetPoints(series v3.Series) []v3.Point {
} }
return result return result
} }
func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) {
var alertSmpl Sample
var shouldAlert bool
var lbls labels.Labels
var lblsNormalized labels.Labels
for name, value := range series.Labels {
lbls = append(lbls, labels.Label{Name: name, Value: value})
lblsNormalized = append(lblsNormalized, labels.Label{Name: normalizeLabelName(name), Value: value})
}
series.Points = removeGroupinSetPoints(series)
// nothing to evaluate
if len(series.Points) == 0 {
return alertSmpl, false
}
switch r.matchType() {
case AtleastOnce:
// If any sample matches the condition, the rule is firing.
if r.compareOp() == ValueIsAbove {
for _, smpl := range series.Points {
if smpl.Value > r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
shouldAlert = true
break
}
}
} else if r.compareOp() == ValueIsBelow {
for _, smpl := range series.Points {
if smpl.Value < r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
shouldAlert = true
break
}
}
} else if r.compareOp() == ValueIsEq {
for _, smpl := range series.Points {
if smpl.Value == r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
shouldAlert = true
break
}
}
} else if r.compareOp() == ValueIsNotEq {
for _, smpl := range series.Points {
if smpl.Value != r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
shouldAlert = true
break
}
}
}
case AllTheTimes:
// If all samples match the condition, the rule is firing.
shouldAlert = true
alertSmpl = Sample{Point: Point{V: r.targetVal()}, Metric: lblsNormalized, MetricOrig: lbls}
if r.compareOp() == ValueIsAbove {
for _, smpl := range series.Points {
if smpl.Value <= r.targetVal() {
shouldAlert = false
break
}
}
// use min value from the series
if shouldAlert {
var minValue float64 = math.Inf(1)
for _, smpl := range series.Points {
if smpl.Value < minValue {
minValue = smpl.Value
}
}
alertSmpl = Sample{Point: Point{V: minValue}, Metric: lblsNormalized, MetricOrig: lbls}
}
} else if r.compareOp() == ValueIsBelow {
for _, smpl := range series.Points {
if smpl.Value >= r.targetVal() {
shouldAlert = false
break
}
}
if shouldAlert {
var maxValue float64 = math.Inf(-1)
for _, smpl := range series.Points {
if smpl.Value > maxValue {
maxValue = smpl.Value
}
}
alertSmpl = Sample{Point: Point{V: maxValue}, Metric: lblsNormalized, MetricOrig: lbls}
}
} else if r.compareOp() == ValueIsEq {
for _, smpl := range series.Points {
if smpl.Value != r.targetVal() {
shouldAlert = false
break
}
}
} else if r.compareOp() == ValueIsNotEq {
for _, smpl := range series.Points {
if smpl.Value == r.targetVal() {
shouldAlert = false
break
}
}
// use any non-inf or nan value from the series
if shouldAlert {
for _, smpl := range series.Points {
if !math.IsInf(smpl.Value, 0) && !math.IsNaN(smpl.Value) {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
break
}
}
}
}
case OnAverage:
// If the average of all samples matches the condition, the rule is firing.
var sum, count float64
for _, smpl := range series.Points {
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
continue
}
sum += smpl.Value
count++
}
avg := sum / count
alertSmpl = Sample{Point: Point{V: avg}, Metric: lblsNormalized, MetricOrig: lbls}
if r.compareOp() == ValueIsAbove {
if avg > r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsBelow {
if avg < r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsEq {
if avg == r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsNotEq {
if avg != r.targetVal() {
shouldAlert = true
}
}
case InTotal:
// If the sum of all samples matches the condition, the rule is firing.
var sum float64
for _, smpl := range series.Points {
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
continue
}
sum += smpl.Value
}
alertSmpl = Sample{Point: Point{V: sum}, Metric: lblsNormalized, MetricOrig: lbls}
if r.compareOp() == ValueIsAbove {
if sum > r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsBelow {
if sum < r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsEq {
if sum == r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueIsNotEq {
if sum != r.targetVal() {
shouldAlert = true
}
}
}
return alertSmpl, shouldAlert
}

View File

@ -685,7 +685,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -774,7 +774,7 @@ func TestPrepareLinksToLogs(t *testing.T) {
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -816,7 +816,7 @@ func TestPrepareLinksToTraces(t *testing.T) {
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -892,7 +892,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -945,17 +945,17 @@ func TestThresholdRuleEvalDelay(t *testing.T) {
fm := featureManager.StartManager() fm := featureManager.StartManager()
for idx, c := range cases { for idx, c := range cases {
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil) // no eval delay rule, err := NewThresholdRule("69", &postableRule, fm, nil) // no eval delay
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
params := rule.prepareQueryRange(ts) params, err := rule.prepareQueryRange(ts)
assert.NoError(t, err)
assert.Equal(t, c.expectedQuery, params.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx) assert.Equal(t, c.expectedQuery, params.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx)
secondTimeParams := rule.prepareQueryRange(ts) secondTimeParams, err := rule.prepareQueryRange(ts)
assert.NoError(t, err)
assert.Equal(t, c.expectedQuery, secondTimeParams.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx) assert.Equal(t, c.expectedQuery, secondTimeParams.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx)
} }
} }
@ -994,17 +994,17 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) {
fm := featureManager.StartManager() fm := featureManager.StartManager()
for idx, c := range cases { for idx, c := range cases {
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute))
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
params := rule.prepareQueryRange(ts) params, err := rule.prepareQueryRange(ts)
assert.NoError(t, err)
assert.Equal(t, c.expectedQuery, params.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx) assert.Equal(t, c.expectedQuery, params.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx)
secondTimeParams := rule.prepareQueryRange(ts) secondTimeParams, err := rule.prepareQueryRange(ts)
assert.NoError(t, err)
assert.Equal(t, c.expectedQuery, secondTimeParams.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx) assert.Equal(t, c.expectedQuery, secondTimeParams.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx)
} }
} }
@ -1137,7 +1137,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "") reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "")
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, reader) rule, err := NewThresholdRule("69", &postableRule, fm, reader)
rule.temporalityMap = map[string]map[v3.Temporality]bool{ rule.temporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": { "signoz_calls_total": {
v3.Delta: true, v3.Delta: true,
@ -1147,11 +1147,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
} }
queriers := Queriers{ retVal, err := rule.Eval(context.Background(), time.Now())
Ch: mock,
}
retVal, err := rule.Eval(context.Background(), time.Now(), &queriers)
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -1240,7 +1236,7 @@ func TestThresholdRuleNoData(t *testing.T) {
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "") reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "")
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, reader) rule, err := NewThresholdRule("69", &postableRule, fm, reader)
rule.temporalityMap = map[string]map[v3.Temporality]bool{ rule.temporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": { "signoz_calls_total": {
v3.Delta: true, v3.Delta: true,
@ -1250,11 +1246,7 @@ func TestThresholdRuleNoData(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
} }
queriers := Queriers{ retVal, err := rule.Eval(context.Background(), time.Now())
Ch: mock,
}
retVal, err := rule.Eval(context.Background(), time.Now(), &queriers)
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }