From 8146da52afb15ac400705650ae291ae593b9249e Mon Sep 17 00:00:00 2001 From: Amol Umbark Date: Thu, 4 Aug 2022 11:55:54 +0530 Subject: [PATCH] feat: Disable Alerts Feature (Backend) (#1443) * feat: added patch rule api * feat: added backend api for patching rule status * fix: improved patchRule and also editRule Co-authored-by: Srikanth Chekuri --- pkg/query-service/app/http_handler.go | 23 ++++ pkg/query-service/app/server.go | 4 +- pkg/query-service/rules/alerting.go | 3 + pkg/query-service/rules/apiParams.go | 35 +++--- pkg/query-service/rules/manager.go | 154 +++++++++++++++++++++----- 5 files changed, 172 insertions(+), 47 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 5d7b8cce5c..c01d504e4a 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -309,6 +309,7 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) { router.HandleFunc("/api/v1/rules", EditAccess(aH.createRule)).Methods(http.MethodPost) router.HandleFunc("/api/v1/rules/{id}", EditAccess(aH.editRule)).Methods(http.MethodPut) router.HandleFunc("/api/v1/rules/{id}", EditAccess(aH.deleteRule)).Methods(http.MethodDelete) + router.HandleFunc("/api/v1/rules/{id}", EditAccess(aH.patchRule)).Methods(http.MethodPatch) router.HandleFunc("/api/v1/dashboards", ViewAccess(aH.getDashboards)).Methods(http.MethodGet) router.HandleFunc("/api/v1/dashboards", EditAccess(aH.createDashboards)).Methods(http.MethodPost) @@ -784,6 +785,28 @@ func (aH *APIHandler) deleteRule(w http.ResponseWriter, r *http.Request) { } +// patchRule updates only requested changes in the rule +func (aH *APIHandler) patchRule(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + + defer r.Body.Close() + body, err := ioutil.ReadAll(r.Body) + if err != nil { + zap.S().Errorf("msg: error in getting req body of patch rule API\n", "\t error:", err) + respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + gettableRule, err := aH.ruleManager.PatchRule(string(body), id) + + if err != nil { + respondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.respond(w, gettableRule) +} + func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 1815e5c7f0..845b75e9c4 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -140,7 +140,7 @@ func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) { //todo(amol): find out a way to add exact domain or // ip here for alert manager AllowedOrigins: []string{"*"}, - AllowedMethods: []string{"GET", "DELETE", "POST", "PUT"}, + AllowedMethods: []string{"GET", "DELETE", "POST", "PUT", "PATCH"}, AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"}, }) @@ -165,7 +165,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) { c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, - AllowedMethods: []string{"GET", "DELETE", "POST", "PUT"}, + AllowedMethods: []string{"GET", "DELETE", "POST", "PUT", "PATCH"}, AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"}, }) diff --git a/pkg/query-service/rules/alerting.go b/pkg/query-service/rules/alerting.go index 66435baf36..65ee1f60a9 100644 --- a/pkg/query-service/rules/alerting.go +++ b/pkg/query-service/rules/alerting.go @@ -47,6 +47,7 @@ const ( StateInactive AlertState = iota StatePending StateFiring + StateDisabled ) func (s AlertState) String() string { @@ -57,6 +58,8 @@ func (s AlertState) String() string { return "pending" case StateFiring: return "firing" + case StateDisabled: + return "disabled" } panic(errors.Errorf("unknown alert state: %d", s)) } diff --git a/pkg/query-service/rules/apiParams.go b/pkg/query-service/rules/apiParams.go index 6f3b466d11..346b515824 100644 --- a/pkg/query-service/rules/apiParams.go +++ b/pkg/query-service/rules/apiParams.go @@ -30,6 +30,8 @@ type PostableRule struct { Labels map[string]string `yaml:"labels,omitempty" json:"labels,omitempty"` Annotations map[string]string `yaml:"annotations,omitempty" json:"annotations,omitempty"` + Disabled bool `json:"disabled"` + // Source captures the source url where rule has been created Source string `json:"source,omitempty"` @@ -43,16 +45,23 @@ func ParsePostableRule(content []byte) (*PostableRule, []error) { } func parsePostableRule(content []byte, kind string) (*PostableRule, []error) { - rule := PostableRule{} + return parseIntoRule(PostableRule{}, content, kind) +} + +// parseIntoRule loads the content (data) into PostableRule and also +// validates the end result +func parseIntoRule(initRule PostableRule, content []byte, kind string) (*PostableRule, []error) { + + rule := &initRule var err error if kind == "json" { - if err = json.Unmarshal(content, &rule); err != nil { + if err = json.Unmarshal(content, rule); err != nil { zap.S().Debugf("postable rule content", string(content), "\t kind:", kind) return nil, []error{fmt.Errorf("failed to load json")} } } else if kind == "yaml" { - if err = yaml.Unmarshal(content, &rule); err != nil { + if err = yaml.Unmarshal(content, rule); err != nil { zap.S().Debugf("postable rule content", string(content), "\t kind:", kind) return nil, []error{fmt.Errorf("failed to load yaml")} } @@ -105,7 +114,8 @@ func parsePostableRule(content []byte, kind string) (*PostableRule, []error) { if errs := rule.Validate(); len(errs) > 0 { return nil, errs } - return &rule, []error{} + + return rule, []error{} } func isValidLabelName(ln string) bool { @@ -213,18 +223,7 @@ type GettableRules struct { // GettableRule has info for an alerting rules. type GettableRule struct { - Labels map[string]string `json:"labels"` - Annotations map[string]string `json:"annotations"` - State string `json:"state"` - Alert string `json:"alert"` - // Description string `yaml:"description,omitempty" json:"description,omitempty"` - - Id string `json:"id"` - RuleType RuleType `yaml:"ruleType,omitempty" json:"ruleType,omitempty"` - EvalWindow Duration `yaml:"evalWindow,omitempty" json:"evalWindow,omitempty"` - Frequency Duration `yaml:"frequency,omitempty" json:"frequency,omitempty"` - RuleCondition RuleCondition `yaml:"condition,omitempty" json:"condition,omitempty"` - - // ActiveAt *time.Time `json:"activeAt,omitempty"` - // Value float64 `json:"value"` + Id string `json:"id"` + State string `json:"state"` + PostableRule } diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index 9a040fdf74..a20ec05941 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -29,8 +29,16 @@ func ruleIdFromTaskName(n string) string { return strings.Split(n, "-groupname")[0] } -func prepareTaskName(ruleId int64) string { - return fmt.Sprintf("%d-groupname", ruleId) +func prepareTaskName(ruleId interface{}) string { + switch ruleId.(type) { + case int, int64: + return fmt.Sprintf("%d-groupname", ruleId) + case string: + return fmt.Sprintf("%s-groupname", ruleId) + default: + return fmt.Sprintf("%v-groupname", ruleId) + } + } // ManagerOptions bundles options for the Manager. @@ -170,10 +178,11 @@ func (m *Manager) initiate() error { continue } } - - err := m.addTask(parsedRule, taskName) - if err != nil { - zap.S().Errorf("failed to load the rule definition (%s): %v", taskName, err) + if !parsedRule.Disabled { + err := m.addTask(parsedRule, taskName) + if err != nil { + zap.S().Errorf("failed to load the rule definition (%s): %v", taskName, err) + } } } @@ -206,7 +215,7 @@ func (m *Manager) Stop() { // EditRuleDefinition writes the rule definition to the // datastore and also updates the rule executor func (m *Manager) EditRule(ruleStr string, id string) error { - // todo(amol): fetch recent rule from db first + parsedRule, errs := ParsePostableRule([]byte(ruleStr)) if len(errs) > 0 { @@ -221,16 +230,9 @@ func (m *Manager) EditRule(ruleStr string, id string) error { } if !m.opts.DisableRules { - err = m.editTask(parsedRule, taskName) - if err != nil { - // todo(amol): using tx with sqllite3 is gets - // database locked. need to research and resolve this - //tx.Rollback() - return err - } + return m.syncRuleStateWithTask(taskName, parsedRule) } - // return tx.Commit() return nil } @@ -249,8 +251,7 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error { // it to finish the current iteration. Then copy it into the new group. oldTask, ok := m.tasks[taskName] if !ok { - zap.S().Errorf("msg:", "rule task not found, edit task failed", "\t task name:", taskName) - return errors.New("rule task not found, edit task failed") + zap.S().Warnf("msg:", "rule task not found, a new task will be created ", "\t task name:", taskName) } delete(m.tasks, taskName) @@ -281,10 +282,7 @@ func (m *Manager) DeleteRule(id string) error { taskName := prepareTaskName(int64(idInt)) if !m.opts.DisableRules { - if err := m.deleteTask(taskName); err != nil { - zap.S().Errorf("msg: ", "failed to unload the rule task from memory, please retry", "\t ruleid: ", id) - return err - } + m.deleteTask(taskName) } if _, _, err := m.ruleDB.DeleteRuleTx(id); err != nil { @@ -295,7 +293,7 @@ func (m *Manager) DeleteRule(id string) error { return nil } -func (m *Manager) deleteTask(taskName string) error { +func (m *Manager) deleteTask(taskName string) { m.mtx.Lock() defer m.mtx.Unlock() @@ -305,11 +303,8 @@ func (m *Manager) deleteTask(taskName string) error { delete(m.tasks, taskName) delete(m.rules, ruleIdFromTaskName(taskName)) } else { - zap.S().Errorf("msg:", "rule not found for deletion", "\t name:", taskName) - return fmt.Errorf("rule not found") + zap.S().Info("msg: ", "rule not found for deletion", "\t name:", taskName) } - - return nil } // CreateRule stores rule def into db and also @@ -555,6 +550,9 @@ func (m *Manager) ListRuleStates() (*GettableRules, error) { // fetch rules from DB storedRules, err := m.ruleDB.GetStoredRules() + if err != nil { + return nil, err + } // initiate response object resp := make([]*GettableRule, 0) @@ -571,7 +569,8 @@ func (m *Manager) ListRuleStates() (*GettableRules, error) { // fetch state of rule from memory if rm, ok := m.rules[ruleResponse.Id]; !ok { - zap.S().Warnf("msg:", "invalid rule id found while fetching list of rules", "\t err:", err, "\t rule_id:", ruleResponse.Id) + ruleResponse.State = StateDisabled.String() + ruleResponse.Disabled = true } else { ruleResponse.State = rm.State().String() } @@ -593,3 +592,104 @@ func (m *Manager) GetRule(id string) (*GettableRule, error) { r.Id = fmt.Sprintf("%d", s.Id) return r, nil } + +// syncRuleStateWithTask ensures that the state of a stored rule matches +// the task state. For example - if a stored rule is disabled, then +// there is no task running against it. +func (m *Manager) syncRuleStateWithTask(taskName string, rule *PostableRule) error { + + if rule.Disabled { + // check if rule has any task running + if _, ok := m.tasks[taskName]; ok { + // delete task from memory + m.deleteTask(taskName) + } + } else { + // check if rule has a task running + if _, ok := m.tasks[taskName]; !ok { + // rule has not task, start one + if err := m.addTask(rule, taskName); err != nil { + return err + } + } + } + return nil +} + +// PatchRule supports attribute level changes to the rule definition unlike +// EditRule, which updates entire rule definition in the DB. +// the process: +// - get the latest rule from db +// - over write the patch attributes received in input (ruleStr) +// - re-deploy or undeploy task as necessary +// - update the patched rule in the DB +func (m *Manager) PatchRule(ruleStr string, ruleId string) (*GettableRule, error) { + + if ruleId == "" { + return nil, fmt.Errorf("id is mandatory for patching rule") + } + + taskName := prepareTaskName(ruleId) + + // retrieve rule from DB + storedJSON, err := m.ruleDB.GetStoredRule(ruleId) + if err != nil { + zap.S().Errorf("msg:", "failed to get stored rule with given id", "\t error:", err) + return nil, err + } + + // storedRule holds the current stored rule from DB + storedRule := PostableRule{} + if err := json.Unmarshal([]byte(storedJSON.Data), &storedRule); err != nil { + zap.S().Errorf("msg:", "failed to get unmarshal stored rule with given id", "\t error:", err) + return nil, err + } + + // patchedRule is combo of stored rule and patch received in the request + patchedRule, errs := parseIntoRule(storedRule, []byte(ruleStr), "json") + if len(errs) > 0 { + zap.S().Errorf("failed to parse rules:", errs) + // just one rule is being parsed so expect just one error + return nil, errs[0] + } + + // deploy or un-deploy task according to patched (new) rule state + if err := m.syncRuleStateWithTask(taskName, patchedRule); err != nil { + zap.S().Errorf("failed to sync stored rule state with the task") + return nil, err + } + + // prepare rule json to write to update db + patchedRuleBytes, err := json.Marshal(patchedRule) + if err != nil { + return nil, err + } + + // write updated rule to db + if _, _, err = m.ruleDB.EditRuleTx(string(patchedRuleBytes), ruleId); err != nil { + // write failed, rollback task state + + // restore task state from the stored rule + if err := m.syncRuleStateWithTask(taskName, &storedRule); err != nil { + zap.S().Errorf("msg: ", "failed to restore rule after patch failure", "\t error:", err) + } + + return nil, err + } + + // prepare http response + response := GettableRule{ + Id: ruleId, + PostableRule: *patchedRule, + } + + // fetch state of rule from memory + if rm, ok := m.rules[ruleId]; !ok { + response.State = StateDisabled.String() + response.Disabled = true + } else { + response.State = rm.State().String() + } + + return &response, nil +}