feat: Disable Alerts Feature (Backend) (#1443)

* feat: added patch rule api

* feat: added backend api for patching rule status

* fix: improved patchRule and also editRule


Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
Amol Umbark 2022-08-04 11:55:54 +05:30 committed by GitHub
parent 5dc6d28f2e
commit 8146da52af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 172 additions and 47 deletions

View File

@ -309,6 +309,7 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
router.HandleFunc("/api/v1/rules", EditAccess(aH.createRule)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/rules/{id}", EditAccess(aH.editRule)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/rules/{id}", EditAccess(aH.deleteRule)).Methods(http.MethodDelete)
router.HandleFunc("/api/v1/rules/{id}", EditAccess(aH.patchRule)).Methods(http.MethodPatch)
router.HandleFunc("/api/v1/dashboards", ViewAccess(aH.getDashboards)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/dashboards", EditAccess(aH.createDashboards)).Methods(http.MethodPost)
@ -784,6 +785,28 @@ func (aH *APIHandler) deleteRule(w http.ResponseWriter, r *http.Request) {
}
// patchRule updates only requested changes in the rule
func (aH *APIHandler) patchRule(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
if err != nil {
zap.S().Errorf("msg: error in getting req body of patch rule API\n", "\t error:", err)
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
gettableRule, err := aH.ruleManager.PatchRule(string(body), id)
if err != nil {
respondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.respond(w, gettableRule)
}
func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]

View File

@ -140,7 +140,7 @@ func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) {
//todo(amol): find out a way to add exact domain or
// ip here for alert manager
AllowedOrigins: []string{"*"},
AllowedMethods: []string{"GET", "DELETE", "POST", "PUT"},
AllowedMethods: []string{"GET", "DELETE", "POST", "PUT", "PATCH"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"},
})
@ -165,7 +165,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) {
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{"GET", "DELETE", "POST", "PUT"},
AllowedMethods: []string{"GET", "DELETE", "POST", "PUT", "PATCH"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"},
})

View File

@ -47,6 +47,7 @@ const (
StateInactive AlertState = iota
StatePending
StateFiring
StateDisabled
)
func (s AlertState) String() string {
@ -57,6 +58,8 @@ func (s AlertState) String() string {
return "pending"
case StateFiring:
return "firing"
case StateDisabled:
return "disabled"
}
panic(errors.Errorf("unknown alert state: %d", s))
}

View File

@ -30,6 +30,8 @@ type PostableRule struct {
Labels map[string]string `yaml:"labels,omitempty" json:"labels,omitempty"`
Annotations map[string]string `yaml:"annotations,omitempty" json:"annotations,omitempty"`
Disabled bool `json:"disabled"`
// Source captures the source url where rule has been created
Source string `json:"source,omitempty"`
@ -43,16 +45,23 @@ func ParsePostableRule(content []byte) (*PostableRule, []error) {
}
func parsePostableRule(content []byte, kind string) (*PostableRule, []error) {
rule := PostableRule{}
return parseIntoRule(PostableRule{}, content, kind)
}
// parseIntoRule loads the content (data) into PostableRule and also
// validates the end result
func parseIntoRule(initRule PostableRule, content []byte, kind string) (*PostableRule, []error) {
rule := &initRule
var err error
if kind == "json" {
if err = json.Unmarshal(content, &rule); err != nil {
if err = json.Unmarshal(content, rule); err != nil {
zap.S().Debugf("postable rule content", string(content), "\t kind:", kind)
return nil, []error{fmt.Errorf("failed to load json")}
}
} else if kind == "yaml" {
if err = yaml.Unmarshal(content, &rule); err != nil {
if err = yaml.Unmarshal(content, rule); err != nil {
zap.S().Debugf("postable rule content", string(content), "\t kind:", kind)
return nil, []error{fmt.Errorf("failed to load yaml")}
}
@ -105,7 +114,8 @@ func parsePostableRule(content []byte, kind string) (*PostableRule, []error) {
if errs := rule.Validate(); len(errs) > 0 {
return nil, errs
}
return &rule, []error{}
return rule, []error{}
}
func isValidLabelName(ln string) bool {
@ -213,18 +223,7 @@ type GettableRules struct {
// GettableRule has info for an alerting rules.
type GettableRule struct {
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
State string `json:"state"`
Alert string `json:"alert"`
// Description string `yaml:"description,omitempty" json:"description,omitempty"`
Id string `json:"id"`
RuleType RuleType `yaml:"ruleType,omitempty" json:"ruleType,omitempty"`
EvalWindow Duration `yaml:"evalWindow,omitempty" json:"evalWindow,omitempty"`
Frequency Duration `yaml:"frequency,omitempty" json:"frequency,omitempty"`
RuleCondition RuleCondition `yaml:"condition,omitempty" json:"condition,omitempty"`
// ActiveAt *time.Time `json:"activeAt,omitempty"`
// Value float64 `json:"value"`
State string `json:"state"`
PostableRule
}

View File

@ -29,8 +29,16 @@ func ruleIdFromTaskName(n string) string {
return strings.Split(n, "-groupname")[0]
}
func prepareTaskName(ruleId int64) string {
func prepareTaskName(ruleId interface{}) string {
switch ruleId.(type) {
case int, int64:
return fmt.Sprintf("%d-groupname", ruleId)
case string:
return fmt.Sprintf("%s-groupname", ruleId)
default:
return fmt.Sprintf("%v-groupname", ruleId)
}
}
// ManagerOptions bundles options for the Manager.
@ -170,12 +178,13 @@ func (m *Manager) initiate() error {
continue
}
}
if !parsedRule.Disabled {
err := m.addTask(parsedRule, taskName)
if err != nil {
zap.S().Errorf("failed to load the rule definition (%s): %v", taskName, err)
}
}
}
return nil
}
@ -206,7 +215,7 @@ func (m *Manager) Stop() {
// EditRuleDefinition writes the rule definition to the
// datastore and also updates the rule executor
func (m *Manager) EditRule(ruleStr string, id string) error {
// todo(amol): fetch recent rule from db first
parsedRule, errs := ParsePostableRule([]byte(ruleStr))
if len(errs) > 0 {
@ -221,16 +230,9 @@ func (m *Manager) EditRule(ruleStr string, id string) error {
}
if !m.opts.DisableRules {
err = m.editTask(parsedRule, taskName)
if err != nil {
// todo(amol): using tx with sqllite3 is gets
// database locked. need to research and resolve this
//tx.Rollback()
return err
}
return m.syncRuleStateWithTask(taskName, parsedRule)
}
// return tx.Commit()
return nil
}
@ -249,8 +251,7 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {
// it to finish the current iteration. Then copy it into the new group.
oldTask, ok := m.tasks[taskName]
if !ok {
zap.S().Errorf("msg:", "rule task not found, edit task failed", "\t task name:", taskName)
return errors.New("rule task not found, edit task failed")
zap.S().Warnf("msg:", "rule task not found, a new task will be created ", "\t task name:", taskName)
}
delete(m.tasks, taskName)
@ -281,10 +282,7 @@ func (m *Manager) DeleteRule(id string) error {
taskName := prepareTaskName(int64(idInt))
if !m.opts.DisableRules {
if err := m.deleteTask(taskName); err != nil {
zap.S().Errorf("msg: ", "failed to unload the rule task from memory, please retry", "\t ruleid: ", id)
return err
}
m.deleteTask(taskName)
}
if _, _, err := m.ruleDB.DeleteRuleTx(id); err != nil {
@ -295,7 +293,7 @@ func (m *Manager) DeleteRule(id string) error {
return nil
}
func (m *Manager) deleteTask(taskName string) error {
func (m *Manager) deleteTask(taskName string) {
m.mtx.Lock()
defer m.mtx.Unlock()
@ -305,11 +303,8 @@ func (m *Manager) deleteTask(taskName string) error {
delete(m.tasks, taskName)
delete(m.rules, ruleIdFromTaskName(taskName))
} else {
zap.S().Errorf("msg:", "rule not found for deletion", "\t name:", taskName)
return fmt.Errorf("rule not found")
zap.S().Info("msg: ", "rule not found for deletion", "\t name:", taskName)
}
return nil
}
// CreateRule stores rule def into db and also
@ -555,6 +550,9 @@ func (m *Manager) ListRuleStates() (*GettableRules, error) {
// fetch rules from DB
storedRules, err := m.ruleDB.GetStoredRules()
if err != nil {
return nil, err
}
// initiate response object
resp := make([]*GettableRule, 0)
@ -571,7 +569,8 @@ func (m *Manager) ListRuleStates() (*GettableRules, error) {
// fetch state of rule from memory
if rm, ok := m.rules[ruleResponse.Id]; !ok {
zap.S().Warnf("msg:", "invalid rule id found while fetching list of rules", "\t err:", err, "\t rule_id:", ruleResponse.Id)
ruleResponse.State = StateDisabled.String()
ruleResponse.Disabled = true
} else {
ruleResponse.State = rm.State().String()
}
@ -593,3 +592,104 @@ func (m *Manager) GetRule(id string) (*GettableRule, error) {
r.Id = fmt.Sprintf("%d", s.Id)
return r, nil
}
// syncRuleStateWithTask ensures that the state of a stored rule matches
// the task state. For example - if a stored rule is disabled, then
// there is no task running against it.
func (m *Manager) syncRuleStateWithTask(taskName string, rule *PostableRule) error {
if rule.Disabled {
// check if rule has any task running
if _, ok := m.tasks[taskName]; ok {
// delete task from memory
m.deleteTask(taskName)
}
} else {
// check if rule has a task running
if _, ok := m.tasks[taskName]; !ok {
// rule has not task, start one
if err := m.addTask(rule, taskName); err != nil {
return err
}
}
}
return nil
}
// PatchRule supports attribute level changes to the rule definition unlike
// EditRule, which updates entire rule definition in the DB.
// the process:
// - get the latest rule from db
// - over write the patch attributes received in input (ruleStr)
// - re-deploy or undeploy task as necessary
// - update the patched rule in the DB
func (m *Manager) PatchRule(ruleStr string, ruleId string) (*GettableRule, error) {
if ruleId == "" {
return nil, fmt.Errorf("id is mandatory for patching rule")
}
taskName := prepareTaskName(ruleId)
// retrieve rule from DB
storedJSON, err := m.ruleDB.GetStoredRule(ruleId)
if err != nil {
zap.S().Errorf("msg:", "failed to get stored rule with given id", "\t error:", err)
return nil, err
}
// storedRule holds the current stored rule from DB
storedRule := PostableRule{}
if err := json.Unmarshal([]byte(storedJSON.Data), &storedRule); err != nil {
zap.S().Errorf("msg:", "failed to get unmarshal stored rule with given id", "\t error:", err)
return nil, err
}
// patchedRule is combo of stored rule and patch received in the request
patchedRule, errs := parseIntoRule(storedRule, []byte(ruleStr), "json")
if len(errs) > 0 {
zap.S().Errorf("failed to parse rules:", errs)
// just one rule is being parsed so expect just one error
return nil, errs[0]
}
// deploy or un-deploy task according to patched (new) rule state
if err := m.syncRuleStateWithTask(taskName, patchedRule); err != nil {
zap.S().Errorf("failed to sync stored rule state with the task")
return nil, err
}
// prepare rule json to write to update db
patchedRuleBytes, err := json.Marshal(patchedRule)
if err != nil {
return nil, err
}
// write updated rule to db
if _, _, err = m.ruleDB.EditRuleTx(string(patchedRuleBytes), ruleId); err != nil {
// write failed, rollback task state
// restore task state from the stored rule
if err := m.syncRuleStateWithTask(taskName, &storedRule); err != nil {
zap.S().Errorf("msg: ", "failed to restore rule after patch failure", "\t error:", err)
}
return nil, err
}
// prepare http response
response := GettableRule{
Id: ruleId,
PostableRule: *patchedRule,
}
// fetch state of rule from memory
if rm, ok := m.rules[ruleId]; !ok {
response.State = StateDisabled.String()
response.Disabled = true
} else {
response.State = rm.State().String()
}
return &response, nil
}