diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 343b5beec7..a7a0a40c70 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -304,12 +304,14 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) { router.HandleFunc("/api/v1/channels/{id}", AdminAccess(aH.deleteChannel)).Methods(http.MethodDelete) router.HandleFunc("/api/v1/channels", EditAccess(aH.createChannel)).Methods(http.MethodPost) router.HandleFunc("/api/v1/testChannel", EditAccess(aH.testChannel)).Methods(http.MethodPost) + router.HandleFunc("/api/v1/rules", ViewAccess(aH.listRules)).Methods(http.MethodGet) router.HandleFunc("/api/v1/rules/{id}", ViewAccess(aH.getRule)).Methods(http.MethodGet) router.HandleFunc("/api/v1/rules", EditAccess(aH.createRule)).Methods(http.MethodPost) router.HandleFunc("/api/v1/rules/{id}", EditAccess(aH.editRule)).Methods(http.MethodPut) router.HandleFunc("/api/v1/rules/{id}", EditAccess(aH.deleteRule)).Methods(http.MethodDelete) router.HandleFunc("/api/v1/rules/{id}", EditAccess(aH.patchRule)).Methods(http.MethodPatch) + router.HandleFunc("/api/v1/testRule", EditAccess(aH.testRule)).Methods(http.MethodPost) router.HandleFunc("/api/v1/dashboards", ViewAccess(aH.getDashboards)).Methods(http.MethodGet) router.HandleFunc("/api/v1/dashboards", EditAccess(aH.createDashboards)).Methods(http.MethodPost) @@ -771,6 +773,32 @@ func (aH *APIHandler) createDashboards(w http.ResponseWriter, r *http.Request) { } +func (aH *APIHandler) testRule(w http.ResponseWriter, r *http.Request) { + + defer r.Body.Close() + body, err := ioutil.ReadAll(r.Body) + if err != nil { + zap.S().Errorf("Error in getting req body in test rule API\n", err) + respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + + alertCount, apiRrr := aH.ruleManager.TestNotification(ctx, string(body)) + if apiRrr != nil { + respondError(w, apiRrr, nil) + return + } + + response := map[string]interface{}{ + "alertCount": alertCount, + "message": "notification sent", + } + aH.respond(w, response) +} + func (aH *APIHandler) deleteRule(w http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] @@ -963,6 +991,7 @@ func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) { aH.respond(w, "rule successfully added") } + func (aH *APIHandler) queryRangeMetricsFromClickhouse(w http.ResponseWriter, r *http.Request) { } diff --git a/pkg/query-service/rules/alerting.go b/pkg/query-service/rules/alerting.go index ecd7205557..b7655733d0 100644 --- a/pkg/query-service/rules/alerting.go +++ b/pkg/query-service/rules/alerting.go @@ -23,6 +23,8 @@ const ( // AlertForStateMetricName is the metric name for 'for' state of alert. alertForStateMetricName = "ALERTS_FOR_STATE" + + TestAlertPostFix = "_TEST_ALERT" ) type RuleType string diff --git a/pkg/query-service/rules/apiParams.go b/pkg/query-service/rules/apiParams.go index b88fa98fb6..1d488c026d 100644 --- a/pkg/query-service/rules/apiParams.go +++ b/pkg/query-service/rules/apiParams.go @@ -18,6 +18,16 @@ import ( // this file contains api request and responses to be // served over http +// newApiErrorInternal returns a new api error object of type internal +func newApiErrorInternal(err error) *model.ApiError { + return &model.ApiError{Typ: model.ErrorInternal, Err: err} +} + +// newApiErrorBadData returns a new api error object of bad request type +func newApiErrorBadData(err error) *model.ApiError { + return &model.ApiError{Typ: model.ErrorBadData, Err: err} +} + // PostableRule is used to create alerting rule from HTTP api type PostableRule struct { Alert string `yaml:"alert,omitempty" json:"alert,omitempty"` diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index 6494320043..bf1e70b956 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/google/uuid" "sort" "strconv" "strings" @@ -19,6 +20,8 @@ import ( // opentracing "github.com/opentracing/opentracing-go" am "go.signoz.io/query-service/integrations/alertManager" + "go.signoz.io/query-service/model" + "go.signoz.io/query-service/utils/labels" ) // namespace for prom metrics @@ -38,7 +41,6 @@ func prepareTaskName(ruleId interface{}) string { default: return fmt.Sprintf("%v-groupname", ruleId) } - } // ManagerOptions bundles options for the Manager. @@ -382,6 +384,7 @@ func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string tr, err := NewThresholdRule( ruleId, r, + ThresholdRuleOpts{}, ) if err != nil { @@ -403,6 +406,7 @@ func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string ruleId, r, log.With(m.logger, "alert", r.Alert), + PromRuleOpts{}, ) if err != nil { @@ -683,3 +687,84 @@ func (m *Manager) PatchRule(ruleStr string, ruleId string) (*GettableRule, error return &response, nil } + +// TestNotification prepares a dummy rule for given rule parameters and +// sends a test notification. returns alert count and error (if any) +func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *model.ApiError) { + + parsedRule, errs := ParsePostableRule([]byte(ruleStr)) + + if len(errs) > 0 { + zap.S().Errorf("msg: failed to parse rule from request:", "\t error: ", errs) + return 0, newApiErrorBadData(errs[0]) + } + + var alertname = parsedRule.Alert + if alertname == "" { + // alertname is not mandatory for testing, so picking + // a random string here + alertname = uuid.New().String() + } + + // append name to indicate this is test alert + parsedRule.Alert = fmt.Sprintf("%s%s", alertname, TestAlertPostFix) + + var rule Rule + var err error + + if parsedRule.RuleType == RuleTypeThreshold { + + // add special labels for test alerts + parsedRule.Labels[labels.AlertAdditionalInfoLabel] = fmt.Sprintf("The rule threshold is set to %.4f, and the observed metric value is {{$value}}.", *parsedRule.RuleCondition.Target) + parsedRule.Annotations[labels.AlertSummaryLabel] = fmt.Sprintf("The rule threshold is set to %.4f, and the observed metric value is {{$value}}.", *parsedRule.RuleCondition.Target) + parsedRule.Labels[labels.RuleSourceLabel] = "" + parsedRule.Labels[labels.AlertRuleIdLabel] = "" + + // create a threshold rule + rule, err = NewThresholdRule( + alertname, + parsedRule, + ThresholdRuleOpts{ + SendUnmatched: true, + SendAlways: true, + }, + ) + + if err != nil { + zap.S().Errorf("msg: failed to prepare a new threshold rule for test:", "\t error: ", err) + return 0, newApiErrorBadData(err) + } + + } else if parsedRule.RuleType == RuleTypeProm { + + // create promql rule + rule, err = NewPromRule( + alertname, + parsedRule, + log.With(m.logger, "alert", alertname), + PromRuleOpts{ + SendAlways: true, + }, + ) + + if err != nil { + zap.S().Errorf("msg: failed to prepare a new promql rule for test:", "\t error: ", err) + return 0, newApiErrorBadData(err) + } + } else { + return 0, newApiErrorBadData(fmt.Errorf("failed to derive ruletype with given information")) + } + + // set timestamp to current utc time + ts := time.Now().UTC() + + count, err := rule.Eval(ctx, ts, m.opts.Queriers) + if err != nil { + zap.S().Warn("msg:", "Evaluating rule failed", "\t rule:", rule, "\t err: ", err) + return 0, newApiErrorInternal(fmt.Errorf("rule evaluation failed")) + } + alertsFound := count.(int) + rule.SendAlerts(ctx, ts, 0, time.Duration(1*time.Minute), m.prepareNotifyFunc()) + + return alertsFound, nil +} diff --git a/pkg/query-service/rules/promRule.go b/pkg/query-service/rules/promRule.go index bb995de73a..761ca8ddee 100644 --- a/pkg/query-service/rules/promRule.go +++ b/pkg/query-service/rules/promRule.go @@ -18,6 +18,12 @@ import ( yaml "gopkg.in/yaml.v2" ) +type PromRuleOpts struct { + // SendAlways will send alert irresepective of resendDelay + // or other params + SendAlways bool +} + type PromRule struct { id string name string @@ -43,12 +49,14 @@ type PromRule struct { active map[uint64]*Alert logger log.Logger + opts PromRuleOpts } func NewPromRule( id string, postableRule *PostableRule, logger log.Logger, + opts PromRuleOpts, ) (*PromRule, error) { if postableRule.RuleCondition == nil { @@ -69,13 +77,20 @@ func NewPromRule( health: HealthUnknown, active: map[uint64]*Alert{}, logger: logger, + opts: opts, } if int64(p.evalWindow) == 0 { p.evalWindow = 5 * time.Minute } + query, err := p.getPqlQuery() - zap.S().Info("msg:", "creating new alerting rule", "\t name:", p.name, "\t condition:", p.ruleCondition.String()) + if err != nil { + // can not generate a valid prom QL query + return nil, err + } + + zap.S().Info("msg:", "creating new alerting rule", "\t name:", p.name, "\t condition:", p.ruleCondition.String(), "\t query:", query) return &p, nil } @@ -172,24 +187,6 @@ func (r *PromRule) sample(alert *Alert, ts time.Time) pql.Sample { return s } -// forStateSample returns the sample for ALERTS_FOR_STATE. -func (r *PromRule) forStateSample(alert *Alert, ts time.Time, v float64) pql.Sample { - lb := plabels.NewBuilder(r.labels) - alertLabels := alert.Labels.(plabels.Labels) - for _, l := range alertLabels { - lb.Set(l.Name, l.Value) - } - - lb.Set(plabels.MetricName, alertForStateMetricName) - lb.Set(plabels.AlertName, r.name) - - s := pql.Sample{ - Metric: lb.Labels(), - Point: pql.Point{T: timestamp.FromTime(ts), V: v}, - } - return s -} - // GetEvaluationDuration returns the time in seconds it took to evaluate the alerting rule. func (r *PromRule) GetEvaluationDuration() time.Duration { r.mtx.Lock() @@ -265,7 +262,7 @@ func (r *PromRule) ForEachActiveAlert(f func(*Alert)) { func (r *PromRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) { alerts := []*Alert{} r.ForEachActiveAlert(func(alert *Alert) { - if alert.needsSending(ts, resendDelay) { + if r.opts.SendAlways || alert.needsSending(ts, resendDelay) { alert.LastSentAt = ts // Allow for two Eval or Alertmanager send failures. delta := resendDelay @@ -289,7 +286,6 @@ func (r *PromRule) getPqlQuery() (string, error) { if query == "" { return query, fmt.Errorf("a promquery needs to be set for this rule to function") } - if r.ruleCondition.Target != nil && r.ruleCondition.CompareOp != CompareOpNone { query = fmt.Sprintf("%s %s %f", query, ResolveCompareOp(r.ruleCondition.CompareOp), *r.ruleCondition.Target) return query, nil @@ -321,7 +317,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( defer r.mtx.Unlock() resultFPs := map[uint64]struct{}{} - var vec pql.Vector + var alerts = make(map[uint64]*Alert, len(res)) for _, smpl := range res { @@ -358,6 +354,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( for _, l := range r.labels { lb.Set(l.Name, expand(l.Value)) } + lb.Set(qslabels.AlertNameLabel, r.Name()) lb.Set(qslabels.AlertRuleIdLabel, r.ID()) lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL()) @@ -429,8 +426,8 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( } r.health = HealthGood r.lastError = err - return vec, nil + return len(r.active), nil } func (r *PromRule) String() string { diff --git a/pkg/query-service/rules/promRuleTask.go b/pkg/query-service/rules/promRuleTask.go index c06d3e1135..fa84d80d57 100644 --- a/pkg/query-service/rules/promRuleTask.go +++ b/pkg/query-service/rules/promRuleTask.go @@ -6,7 +6,6 @@ import ( "github.com/go-kit/log" opentracing "github.com/opentracing/opentracing-go" plabels "github.com/prometheus/prometheus/pkg/labels" - pql "github.com/prometheus/prometheus/promql" "go.uber.org/zap" "sort" "sync" @@ -313,7 +312,6 @@ func (g *PromRuleTask) CopyState(fromTask Task) error { // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) { zap.S().Info("promql rule task:", g.name, "\t eval started at:", ts) - var samplesTotal float64 for i, rule := range g.rules { if rule == nil { continue @@ -336,7 +334,7 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) { rule.SetEvaluationTimestamp(t) }(time.Now()) - data, err := rule.Eval(ctx, ts, g.opts.Queriers) + _, err := rule.Eval(ctx, ts, g.opts.Queriers) if err != nil { rule.SetHealth(HealthBad) rule.SetLastError(err) @@ -350,21 +348,8 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) { //} return } - vector := data.(pql.Vector) - samplesTotal += float64(len(vector)) - rule.SendAlerts(ctx, ts, g.opts.ResendDelay, g.frequency, g.notify) - seriesReturned := make(map[string]plabels.Labels, len(g.seriesInPreviousEval[i])) - - defer func() { - g.seriesInPreviousEval[i] = seriesReturned - }() - - for _, s := range vector { - seriesReturned[s.Metric.String()] = s.Metric - } - }(i, rule) } } diff --git a/pkg/query-service/rules/ruleTask.go b/pkg/query-service/rules/ruleTask.go index 59b25f05e0..4075d9888e 100644 --- a/pkg/query-service/rules/ruleTask.go +++ b/pkg/query-service/rules/ruleTask.go @@ -14,17 +14,15 @@ import ( // RuleTask holds a rule (with composite queries) // and evaluates the rule at a given frequency type RuleTask struct { - name string - file string - frequency time.Duration - rules []Rule - seriesInPreviousEval []map[string]labels.Labels // One per Rule. - staleSeries []labels.Labels - opts *ManagerOptions - mtx sync.Mutex - evaluationDuration time.Duration - evaluationTime time.Duration - lastEvaluation time.Time + name string + file string + frequency time.Duration + rules []Rule + opts *ManagerOptions + mtx sync.Mutex + evaluationDuration time.Duration + evaluationTime time.Duration + lastEvaluation time.Time markStale bool done chan struct{} @@ -46,16 +44,15 @@ func newRuleTask(name, file string, frequency time.Duration, rules []Rule, opts zap.S().Info("msg:", "initiating a new rule task", "\t name:", name, "\t frequency:", frequency) return &RuleTask{ - name: name, - file: file, - pause: false, - frequency: frequency, - rules: rules, - opts: opts, - seriesInPreviousEval: make([]map[string]labels.Labels, len(rules)), - done: make(chan struct{}), - terminated: make(chan struct{}), - notify: notify, + name: name, + file: file, + pause: false, + frequency: frequency, + rules: rules, + opts: opts, + done: make(chan struct{}), + terminated: make(chan struct{}), + notify: notify, } } @@ -126,24 +123,6 @@ func (g *RuleTask) Run(ctx context.Context) { tick := time.NewTicker(g.frequency) defer tick.Stop() - // defer cleanup - defer func() { - if !g.markStale { - return - } - go func(now time.Time) { - for _, rule := range g.seriesInPreviousEval { - for _, r := range rule { - g.staleSeries = append(g.staleSeries, r) - } - } - // That can be garbage collected at this point. - g.seriesInPreviousEval = nil - - }(time.Now()) - - }() - iter() // let the group iterate and run @@ -285,17 +264,15 @@ func (g *RuleTask) CopyState(fromTask Task) error { ruleMap[nameAndLabels] = append(l, fi) } - for i, rule := range g.rules { + for _, rule := range g.rules { nameAndLabels := nameAndLabels(rule) indexes := ruleMap[nameAndLabels] if len(indexes) == 0 { continue } fi := indexes[0] - g.seriesInPreviousEval[i] = from.seriesInPreviousEval[fi] ruleMap[nameAndLabels] = indexes[1:] - // todo(amol): support other rules too here ar, ok := rule.(*ThresholdRule) if !ok { continue @@ -310,18 +287,6 @@ func (g *RuleTask) CopyState(fromTask Task) error { } } - // Handle deleted and unmatched duplicate rules. - // todo(amol): possibly not needed any more - g.staleSeries = from.staleSeries - for fi, fromRule := range from.rules { - nameAndLabels := nameAndLabels(fromRule) - l := ruleMap[nameAndLabels] - if len(l) != 0 { - for _, series := range from.seriesInPreviousEval[fi] { - g.staleSeries = append(g.staleSeries, series) - } - } - } return nil } @@ -330,7 +295,6 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) { zap.S().Debugf("msg:", "rule task eval started", "\t name:", g.name, "\t start time:", ts) - var samplesTotal float64 for i, rule := range g.rules { if rule == nil { continue @@ -353,7 +317,7 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) { rule.SetEvaluationTimestamp(t) }(time.Now()) - data, err := rule.Eval(ctx, ts, g.opts.Queriers) + _, err := rule.Eval(ctx, ts, g.opts.Queriers) if err != nil { rule.SetHealth(HealthBad) rule.SetLastError(err) @@ -368,18 +332,8 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) { return } - vector := data.(Vector) - samplesTotal += float64(len(vector)) - rule.SendAlerts(ctx, ts, g.opts.ResendDelay, g.frequency, g.notify) - seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) - - for _, s := range vector { - seriesReturned[s.Metric.String()] = s.Metric - } - - g.seriesInPreviousEval[i] = seriesReturned }(i, rule) } } diff --git a/pkg/query-service/rules/thresholdRule.go b/pkg/query-service/rules/thresholdRule.go index dc0c29be97..5234e88a72 100644 --- a/pkg/query-service/rules/thresholdRule.go +++ b/pkg/query-service/rules/thresholdRule.go @@ -43,11 +43,25 @@ type ThresholdRule struct { // map of active alerts active map[uint64]*Alert + + opts ThresholdRuleOpts +} + +type ThresholdRuleOpts struct { + // sendUnmatched sends observed metric values + // even if they dont match the rule condition. this is + // useful in testing the rule + SendUnmatched bool + + // sendAlways will send alert irresepective of resendDelay + // or other params + SendAlways bool } func NewThresholdRule( id string, p *PostableRule, + opts ThresholdRuleOpts, ) (*ThresholdRule, error) { if p.RuleCondition == nil { @@ -67,6 +81,7 @@ func NewThresholdRule( preferredChannels: p.PreferredChannels, health: HealthUnknown, active: map[uint64]*Alert{}, + opts: opts, } if int64(t.evalWindow) == 0 { @@ -105,6 +120,14 @@ func (r *ThresholdRule) target() *float64 { return r.ruleCondition.Target } +func (r *ThresholdRule) targetVal() float64 { + if r.ruleCondition == nil || r.ruleCondition.Target == nil { + return 0 + } + + return *r.ruleCondition.Target +} + func (r *ThresholdRule) matchType() MatchType { if r.ruleCondition == nil { return AtleastOnce @@ -188,25 +211,7 @@ func (r *ThresholdRule) sample(alert *Alert, ts time.Time) Sample { Metric: lb.Labels(), Point: Point{T: timestamp.FromTime(ts), V: 1}, } - return s -} -// forStateSample returns the sample for ALERTS_FOR_STATE. -func (r *ThresholdRule) forStateSample(alert *Alert, ts time.Time, v float64) Sample { - lb := labels.NewBuilder(r.labels) - - alertLabels := alert.Labels.(labels.Labels) - for _, l := range alertLabels { - lb.Set(l.Name, l.Value) - } - - lb.Set(labels.MetricNameLabel, alertForStateMetricName) - lb.Set(labels.AlertNameLabel, r.name) - - s := Sample{ - Metric: lb.Labels(), - Point: Point{T: timestamp.FromTime(ts), V: v}, - } return s } @@ -283,10 +288,10 @@ func (r *ThresholdRule) ForEachActiveAlert(f func(*Alert)) { } func (r *ThresholdRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) { - zap.S().Info("msg:", "initiating send alerts (if any)", "\t rule:", r.Name()) + zap.S().Info("msg:", "sending alerts", "\t rule:", r.Name()) alerts := []*Alert{} r.ForEachActiveAlert(func(alert *Alert) { - if alert.needsSending(ts, resendDelay) { + if r.opts.SendAlways || alert.needsSending(ts, resendDelay) { alert.LastSentAt = ts // Allow for two Eval or Alertmanager send failures. delta := resendDelay @@ -483,8 +488,9 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer zap.S().Debugf("ruleid:", r.ID(), "\t resultmap(potential alerts):", len(resultMap)) for _, sample := range resultMap { - // check alert rule condition before dumping results - if r.CheckCondition(sample.Point.V) { + // check alert rule condition before dumping results, if sendUnmatchedResults + // is set then add results irrespective of condition + if r.opts.SendUnmatched || r.CheckCondition(sample.Point.V) { result = append(result, sample) } } @@ -549,7 +555,6 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie defer r.mtx.Unlock() resultFPs := map[uint64]struct{}{} - var vec Vector var alerts = make(map[uint64]*Alert, len(res)) for _, smpl := range res { @@ -563,6 +568,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie // who are not used to Go's templating system. defs := "{{$labels := .Labels}}{{$value := .Value}}" + // utility function to apply go template on labels and annots expand := func(text string) string { tmpl := NewTemplateExpander( @@ -662,8 +668,8 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie } r.health = HealthGood r.lastError = err - return vec, nil + return len(r.active), nil } func (r *ThresholdRule) String() string { diff --git a/pkg/query-service/utils/labels/labels.go b/pkg/query-service/utils/labels/labels.go index 52ef1867f5..b7f6fdc09d 100644 --- a/pkg/query-service/utils/labels/labels.go +++ b/pkg/query-service/utils/labels/labels.go @@ -24,6 +24,10 @@ const ( AlertRuleIdLabel = "ruleId" RuleSourceLabel = "ruleSource" + + RuleThresholdLabel = "threshold" + AlertAdditionalInfoLabel = "additionalInfo" + AlertSummaryLabel = "summary" ) // Label is a key/value pair of strings.