chore: scheduled maintenance query-service impl (#4834)

This commit is contained in:
Srikanth Chekuri 2024-05-31 17:43:13 +05:30 committed by GitHub
parent f927969c7d
commit 67779d6c2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 927 additions and 11 deletions

View File

@ -83,6 +83,22 @@ func InitDB(dataSourceName string) (*sqlx.DB, error) {
return nil, fmt.Errorf("error in creating notification_channles table: %s", err.Error())
}
tableSchema := `CREATE TABLE IF NOT EXISTS planned_maintenance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
alert_ids TEXT,
schedule TEXT NOT NULL,
created_at datetime NOT NULL,
created_by TEXT NOT NULL,
updated_at datetime NOT NULL,
updated_by TEXT NOT NULL
);`
_, err = db.Exec(tableSchema)
if err != nil {
return nil, fmt.Errorf("error in creating planned_maintenance table: %s", err.Error())
}
table_schema = `CREATE TABLE IF NOT EXISTS ttl_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transaction_id TEXT NOT NULL,

View File

@ -375,6 +375,12 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
router.HandleFunc("/api/v1/rules/{id}", am.EditAccess(aH.patchRule)).Methods(http.MethodPatch)
router.HandleFunc("/api/v1/testRule", am.EditAccess(aH.testRule)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/downtime_schedules", am.OpenAccess(aH.listDowntimeSchedules)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/downtime_schedules/{id}", am.OpenAccess(aH.getDowntimeSchedule)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/downtime_schedules", am.OpenAccess(aH.createDowntimeSchedule)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/downtime_schedules/{id}", am.OpenAccess(aH.editDowntimeSchedule)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/downtime_schedules/{id}", am.OpenAccess(aH.deleteDowntimeSchedule)).Methods(http.MethodDelete)
router.HandleFunc("/api/v1/dashboards", am.ViewAccess(aH.getDashboards)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/dashboards", am.EditAccess(aH.createDashboards)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/dashboards/grafana", am.EditAccess(aH.createDashboardsTransform)).Methods(http.MethodPost)
@ -535,6 +541,102 @@ func (aH *APIHandler) populateTemporality(ctx context.Context, qp *v3.QueryRange
return nil
}
func (aH *APIHandler) listDowntimeSchedules(w http.ResponseWriter, r *http.Request) {
schedules, err := aH.ruleManager.RuleDB().GetAllPlannedMaintenance(r.Context())
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
// The schedules are stored as JSON in the database, so we need to filter them here
// Since the number of schedules is expected to be small, this should be fine
if r.URL.Query().Get("active") != "" {
activeSchedules := make([]rules.PlannedMaintenance, 0)
active, _ := strconv.ParseBool(r.URL.Query().Get("active"))
for _, schedule := range schedules {
now := time.Now().In(time.FixedZone(schedule.Schedule.Timezone, 0))
if schedule.IsActive(now) == active {
activeSchedules = append(activeSchedules, schedule)
}
}
schedules = activeSchedules
}
if r.URL.Query().Get("recurring") != "" {
recurringSchedules := make([]rules.PlannedMaintenance, 0)
recurring, _ := strconv.ParseBool(r.URL.Query().Get("recurring"))
for _, schedule := range schedules {
if schedule.IsRecurring() == recurring {
recurringSchedules = append(recurringSchedules, schedule)
}
}
schedules = recurringSchedules
}
aH.Respond(w, schedules)
}
func (aH *APIHandler) getDowntimeSchedule(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
schedule, err := aH.ruleManager.RuleDB().GetPlannedMaintenanceByID(r.Context(), id)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, schedule)
}
func (aH *APIHandler) createDowntimeSchedule(w http.ResponseWriter, r *http.Request) {
var schedule rules.PlannedMaintenance
err := json.NewDecoder(r.Body).Decode(&schedule)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
if err := schedule.Validate(); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
_, err = aH.ruleManager.RuleDB().CreatePlannedMaintenance(r.Context(), schedule)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, nil)
}
func (aH *APIHandler) editDowntimeSchedule(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
var schedule rules.PlannedMaintenance
err := json.NewDecoder(r.Body).Decode(&schedule)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
if err := schedule.Validate(); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
_, err = aH.ruleManager.RuleDB().EditPlannedMaintenance(r.Context(), schedule, id)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, nil)
}
func (aH *APIHandler) deleteDowntimeSchedule(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
_, err := aH.ruleManager.RuleDB().DeletePlannedMaintenance(r.Context(), id)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, nil)
}
func (aH *APIHandler) listRules(w http.ResponseWriter, r *http.Request) {
rules, err := aH.ruleManager.ListRuleStates(r.Context())

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/common"
"go.uber.org/zap"
)
@ -27,6 +28,21 @@ type RuleDB interface {
// GetStoredRule for a given ID from DB
GetStoredRule(ctx context.Context, id string) (*StoredRule, error)
// CreatePlannedMaintenance stores a given maintenance in db
CreatePlannedMaintenance(ctx context.Context, maintenance PlannedMaintenance) (int64, error)
// DeletePlannedMaintenance deletes the given maintenance in the db
DeletePlannedMaintenance(ctx context.Context, id string) (string, error)
// GetPlannedMaintenanceByID fetches the maintenance definition from db by id
GetPlannedMaintenanceByID(ctx context.Context, id string) (*PlannedMaintenance, error)
// EditPlannedMaintenance updates the given maintenance in the db
EditPlannedMaintenance(ctx context.Context, maintenance PlannedMaintenance, id string) (string, error)
// GetAllPlannedMaintenance fetches the maintenance definitions from db
GetAllPlannedMaintenance(ctx context.Context) ([]PlannedMaintenance, error)
}
type StoredRule struct {
@ -202,3 +218,80 @@ func (r *ruleDB) GetStoredRule(ctx context.Context, id string) (*StoredRule, err
return rule, nil
}
func (r *ruleDB) GetAllPlannedMaintenance(ctx context.Context) ([]PlannedMaintenance, error) {
maintenances := []PlannedMaintenance{}
query := "SELECT id, name, description, schedule, alert_ids, created_at, created_by, updated_at, updated_by FROM planned_maintenance"
err := r.Select(&maintenances, query)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, err
}
return maintenances, nil
}
func (r *ruleDB) GetPlannedMaintenanceByID(ctx context.Context, id string) (*PlannedMaintenance, error) {
maintenance := &PlannedMaintenance{}
query := "SELECT id, name, description, schedule, alert_ids, created_at, created_by, updated_at, updated_by FROM planned_maintenance WHERE id=$1"
err := r.Get(maintenance, query, id)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, err
}
return maintenance, nil
}
func (r *ruleDB) CreatePlannedMaintenance(ctx context.Context, maintenance PlannedMaintenance) (int64, error) {
email, _ := auth.GetEmailFromJwt(ctx)
maintenance.CreatedBy = email
maintenance.CreatedAt = time.Now()
maintenance.UpdatedBy = email
maintenance.UpdatedAt = time.Now()
query := "INSERT INTO planned_maintenance (name, description, schedule, alert_ids, created_at, created_by, updated_at, updated_by) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
result, err := r.Exec(query, maintenance.Name, maintenance.Description, maintenance.Schedule, maintenance.AlertIds, maintenance.CreatedAt, maintenance.CreatedBy, maintenance.UpdatedAt, maintenance.UpdatedBy)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return 0, err
}
return result.LastInsertId()
}
func (r *ruleDB) DeletePlannedMaintenance(ctx context.Context, id string) (string, error) {
query := "DELETE FROM planned_maintenance WHERE id=$1"
_, err := r.Exec(query, id)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return "", err
}
return "", nil
}
func (r *ruleDB) EditPlannedMaintenance(ctx context.Context, maintenance PlannedMaintenance, id string) (string, error) {
email, _ := auth.GetEmailFromJwt(ctx)
maintenance.UpdatedBy = email
maintenance.UpdatedAt = time.Now()
query := "UPDATE planned_maintenance SET name=$1, description=$2, schedule=$3, alert_ids=$4, updated_at=$5, updated_by=$6 WHERE id=$7"
_, err := r.Exec(query, maintenance.Name, maintenance.Description, maintenance.Schedule, maintenance.AlertIds, maintenance.UpdatedAt, maintenance.UpdatedBy, id)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return "", err
}
return "", nil
}

View File

@ -0,0 +1,423 @@
package rules
import (
"database/sql/driver"
"encoding/json"
"slices"
"strings"
"time"
"github.com/pkg/errors"
"go.uber.org/zap"
)
var (
ErrMissingName = errors.New("missing name")
ErrMissingSchedule = errors.New("missing schedule")
ErrMissingTimezone = errors.New("missing timezone")
ErrMissingRepeatType = errors.New("missing repeat type")
ErrMissingDuration = errors.New("missing duration")
)
type PlannedMaintenance struct {
Id int64 `json:"id" db:"id"`
Name string `json:"name" db:"name"`
Description string `json:"description" db:"description"`
Schedule *Schedule `json:"schedule" db:"schedule"`
AlertIds *AlertIds `json:"alertIds" db:"alert_ids"`
CreatedAt time.Time `json:"createdAt" db:"created_at"`
CreatedBy string `json:"createdBy" db:"created_by"`
UpdatedAt time.Time `json:"updatedAt" db:"updated_at"`
UpdatedBy string `json:"updatedBy" db:"updated_by"`
Status string `json:"status"`
Kind string `json:"kind"`
}
type AlertIds []string
func (a *AlertIds) Scan(src interface{}) error {
if data, ok := src.([]byte); ok {
return json.Unmarshal(data, a)
}
return nil
}
func (a *AlertIds) Value() (driver.Value, error) {
return json.Marshal(a)
}
type Schedule struct {
Timezone string `json:"timezone"`
StartTime time.Time `json:"startTime,omitempty"`
EndTime time.Time `json:"endTime,omitempty"`
Recurrence *Recurrence `json:"recurrence"`
}
func (s *Schedule) Scan(src interface{}) error {
if data, ok := src.([]byte); ok {
return json.Unmarshal(data, s)
}
return nil
}
func (s *Schedule) Value() (driver.Value, error) {
return json.Marshal(s)
}
type RepeatType string
const (
RepeatTypeDaily RepeatType = "daily"
RepeatTypeWeekly RepeatType = "weekly"
RepeatTypeMonthly RepeatType = "monthly"
)
type RepeatOn string
const (
RepeatOnSunday RepeatOn = "sunday"
RepeatOnMonday RepeatOn = "monday"
RepeatOnTuesday RepeatOn = "tuesday"
RepeatOnWednesday RepeatOn = "wednesday"
RepeatOnThursday RepeatOn = "thursday"
RepeatOnFriday RepeatOn = "friday"
RepeatOnSaturday RepeatOn = "saturday"
)
type Recurrence struct {
StartTime time.Time `json:"startTime"`
EndTime *time.Time `json:"endTime,omitempty"`
Duration Duration `json:"duration"`
RepeatType RepeatType `json:"repeatType"`
RepeatOn []RepeatOn `json:"repeatOn"`
}
func (r *Recurrence) Scan(src interface{}) error {
if data, ok := src.([]byte); ok {
return json.Unmarshal(data, r)
}
return nil
}
func (r *Recurrence) Value() (driver.Value, error) {
return json.Marshal(r)
}
func (s Schedule) MarshalJSON() ([]byte, error) {
loc, err := time.LoadLocation(s.Timezone)
if err != nil {
return nil, err
}
var startTime, endTime time.Time
if !s.StartTime.IsZero() {
startTime = time.Date(s.StartTime.Year(), s.StartTime.Month(), s.StartTime.Day(), s.StartTime.Hour(), s.StartTime.Minute(), s.StartTime.Second(), s.StartTime.Nanosecond(), loc)
}
if !s.EndTime.IsZero() {
endTime = time.Date(s.EndTime.Year(), s.EndTime.Month(), s.EndTime.Day(), s.EndTime.Hour(), s.EndTime.Minute(), s.EndTime.Second(), s.EndTime.Nanosecond(), loc)
}
var recurrence *Recurrence
if s.Recurrence != nil {
recStartTime := time.Date(s.Recurrence.StartTime.Year(), s.Recurrence.StartTime.Month(), s.Recurrence.StartTime.Day(), s.Recurrence.StartTime.Hour(), s.Recurrence.StartTime.Minute(), s.Recurrence.StartTime.Second(), s.Recurrence.StartTime.Nanosecond(), loc)
var recEndTime *time.Time
if s.Recurrence.EndTime != nil {
end := time.Date(s.Recurrence.EndTime.Year(), s.Recurrence.EndTime.Month(), s.Recurrence.EndTime.Day(), s.Recurrence.EndTime.Hour(), s.Recurrence.EndTime.Minute(), s.Recurrence.EndTime.Second(), s.Recurrence.EndTime.Nanosecond(), loc)
recEndTime = &end
}
recurrence = &Recurrence{
StartTime: recStartTime,
EndTime: recEndTime,
Duration: s.Recurrence.Duration,
RepeatType: s.Recurrence.RepeatType,
RepeatOn: s.Recurrence.RepeatOn,
}
}
return json.Marshal(&struct {
Timezone string `json:"timezone"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
Recurrence *Recurrence `json:"recurrence,omitempty"`
}{
Timezone: s.Timezone,
StartTime: startTime.Format(time.RFC3339),
EndTime: endTime.Format(time.RFC3339),
Recurrence: recurrence,
})
}
func (s *Schedule) UnmarshalJSON(data []byte) error {
aux := &struct {
Timezone string `json:"timezone"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
Recurrence *Recurrence `json:"recurrence,omitempty"`
}{}
if err := json.Unmarshal(data, aux); err != nil {
return err
}
loc, err := time.LoadLocation(aux.Timezone)
if err != nil {
return err
}
var startTime time.Time
if aux.StartTime != "" {
startTime, err = time.Parse(time.RFC3339, aux.StartTime)
if err != nil {
return err
}
s.StartTime = time.Date(startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), startTime.Second(), startTime.Nanosecond(), loc)
}
var endTime time.Time
if aux.EndTime != "" {
endTime, err = time.Parse(time.RFC3339, aux.EndTime)
if err != nil {
return err
}
s.EndTime = time.Date(endTime.Year(), endTime.Month(), endTime.Day(), endTime.Hour(), endTime.Minute(), endTime.Second(), endTime.Nanosecond(), loc)
}
s.Timezone = aux.Timezone
if aux.Recurrence != nil {
recStartTime, err := time.Parse(time.RFC3339, aux.Recurrence.StartTime.Format(time.RFC3339))
if err != nil {
return err
}
var recEndTime *time.Time
if aux.Recurrence.EndTime != nil {
end, err := time.Parse(time.RFC3339, aux.Recurrence.EndTime.Format(time.RFC3339))
if err != nil {
return err
}
endConverted := time.Date(end.Year(), end.Month(), end.Day(), end.Hour(), end.Minute(), end.Second(), end.Nanosecond(), loc)
recEndTime = &endConverted
}
s.Recurrence = &Recurrence{
StartTime: time.Date(recStartTime.Year(), recStartTime.Month(), recStartTime.Day(), recStartTime.Hour(), recStartTime.Minute(), recStartTime.Second(), recStartTime.Nanosecond(), loc),
EndTime: recEndTime,
Duration: aux.Recurrence.Duration,
RepeatType: aux.Recurrence.RepeatType,
RepeatOn: aux.Recurrence.RepeatOn,
}
}
return nil
}
func (m *PlannedMaintenance) shouldSkip(ruleID string, now time.Time) bool {
found := false
if m.AlertIds != nil {
for _, alertID := range *m.AlertIds {
if alertID == ruleID {
found = true
break
}
}
}
// If no alert ids, then skip all alerts
if m.AlertIds == nil || len(*m.AlertIds) == 0 {
found = true
}
if found {
zap.L().Info("alert found in maintenance", zap.String("alert", ruleID), zap.Any("maintenance", m.Name))
// If alert is found, we check if it should be skipped based on the schedule
// If it should be skipped, we return true
// If it should not be skipped, we return false
// fixed schedule
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
// if the current time in the timezone is between the start and end time
loc, err := time.LoadLocation(m.Schedule.Timezone)
if err != nil {
zap.L().Error("Error loading location", zap.String("timezone", m.Schedule.Timezone), zap.Error(err))
return false
}
currentTime := now.In(loc)
zap.L().Info("checking fixed schedule", zap.Any("rule", ruleID), zap.String("maintenance", m.Name), zap.Time("currentTime", currentTime), zap.Time("startTime", m.Schedule.StartTime), zap.Time("endTime", m.Schedule.EndTime))
if currentTime.After(m.Schedule.StartTime) && currentTime.Before(m.Schedule.EndTime) {
return true
}
}
// recurring schedule
if m.Schedule.Recurrence != nil {
zap.L().Info("evaluating recurrence schedule")
start := m.Schedule.Recurrence.StartTime
end := m.Schedule.Recurrence.StartTime.Add(time.Duration(m.Schedule.Recurrence.Duration))
// if the current time in the timezone is between the start and end time
loc, err := time.LoadLocation(m.Schedule.Timezone)
if err != nil {
zap.L().Error("Error loading location", zap.String("timezone", m.Schedule.Timezone), zap.Error(err))
return false
}
currentTime := now.In(loc)
zap.L().Info("checking recurring schedule", zap.Any("rule", ruleID), zap.String("maintenance", m.Name), zap.Time("currentTime", currentTime), zap.Time("startTime", start), zap.Time("endTime", end))
// make sure the start time is not after the current time
if currentTime.Before(start.In(loc)) {
zap.L().Info("current time is before start time", zap.Any("rule", ruleID), zap.String("maintenance", m.Name), zap.Time("currentTime", currentTime), zap.Time("startTime", start.In(loc)))
return false
}
var endTime time.Time
if m.Schedule.Recurrence.EndTime != nil {
endTime = *m.Schedule.Recurrence.EndTime
}
if !endTime.IsZero() && currentTime.After(endTime.In(loc)) {
zap.L().Info("current time is after end time", zap.Any("rule", ruleID), zap.String("maintenance", m.Name), zap.Time("currentTime", currentTime), zap.Time("endTime", end.In(loc)))
return false
}
switch m.Schedule.Recurrence.RepeatType {
case RepeatTypeDaily:
// take the hours and minutes from the start time and add them to the current time
startTime := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), start.Hour(), start.Minute(), 0, 0, loc)
endTime := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), end.Hour(), end.Minute(), 0, 0, loc)
zap.L().Info("checking daily schedule", zap.Any("rule", ruleID), zap.String("maintenance", m.Name), zap.Time("currentTime", currentTime), zap.Time("startTime", startTime), zap.Time("endTime", endTime))
if currentTime.After(startTime) && currentTime.Before(endTime) {
return true
}
case RepeatTypeWeekly:
// if the current time in the timezone is between the start and end time on the RepeatOn day
startTime := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), start.Hour(), start.Minute(), 0, 0, loc)
endTime := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), end.Hour(), end.Minute(), 0, 0, loc)
zap.L().Info("checking weekly schedule", zap.Any("rule", ruleID), zap.String("maintenance", m.Name), zap.Time("currentTime", currentTime), zap.Time("startTime", startTime), zap.Time("endTime", endTime))
if currentTime.After(startTime) && currentTime.Before(endTime) {
if len(m.Schedule.Recurrence.RepeatOn) == 0 {
return true
} else if slices.Contains(m.Schedule.Recurrence.RepeatOn, RepeatOn(strings.ToLower(currentTime.Weekday().String()))) {
return true
}
}
case RepeatTypeMonthly:
// if the current time in the timezone is between the start and end time on the day of the current month
startTime := time.Date(currentTime.Year(), currentTime.Month(), start.Day(), start.Hour(), start.Minute(), 0, 0, loc)
endTime := time.Date(currentTime.Year(), currentTime.Month(), end.Day(), end.Hour(), end.Minute(), 0, 0, loc)
zap.L().Info("checking monthly schedule", zap.Any("rule", ruleID), zap.String("maintenance", m.Name), zap.Time("currentTime", currentTime), zap.Time("startTime", startTime), zap.Time("endTime", endTime))
if currentTime.After(startTime) && currentTime.Before(endTime) && currentTime.Day() == start.Day() {
return true
}
}
}
}
// If alert is not found, we return false
return false
}
func (m *PlannedMaintenance) IsActive(now time.Time) bool {
ruleID := "maintenance"
if m.AlertIds != nil && len(*m.AlertIds) > 0 {
ruleID = (*m.AlertIds)[0]
}
return m.shouldSkip(ruleID, now)
}
func (m *PlannedMaintenance) IsUpcoming() bool {
now := time.Now().In(time.FixedZone(m.Schedule.Timezone, 0))
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
return now.Before(m.Schedule.StartTime)
}
if m.Schedule.Recurrence != nil {
return now.Before(m.Schedule.Recurrence.StartTime)
}
return false
}
func (m *PlannedMaintenance) IsRecurring() bool {
return m.Schedule.Recurrence != nil
}
func (m *PlannedMaintenance) Validate() error {
if m.Name == "" {
return ErrMissingName
}
if m.Schedule == nil {
return ErrMissingSchedule
}
if m.Schedule.Timezone == "" {
return ErrMissingTimezone
}
_, err := time.LoadLocation(m.Schedule.Timezone)
if err != nil {
return errors.New("invalid timezone")
}
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
if m.Schedule.StartTime.After(m.Schedule.EndTime) {
return errors.New("start time cannot be after end time")
}
}
if m.Schedule.Recurrence != nil {
if m.Schedule.Recurrence.RepeatType == "" {
return ErrMissingRepeatType
}
if m.Schedule.Recurrence.Duration == 0 {
return ErrMissingDuration
}
if m.Schedule.Recurrence.EndTime != nil && m.Schedule.Recurrence.EndTime.Before(m.Schedule.Recurrence.StartTime) {
return errors.New("end time cannot be before start time")
}
}
return nil
}
func (m PlannedMaintenance) MarshalJSON() ([]byte, error) {
now := time.Now().In(time.FixedZone(m.Schedule.Timezone, 0))
var status string
if m.IsActive(now) {
status = "active"
} else if m.IsUpcoming() {
status = "upcoming"
} else {
status = "expired"
}
var kind string
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() && m.Schedule.EndTime.After(m.Schedule.StartTime) {
kind = "fixed"
} else {
kind = "recurring"
}
return json.Marshal(struct {
Id int64 `json:"id" db:"id"`
Name string `json:"name" db:"name"`
Description string `json:"description" db:"description"`
Schedule *Schedule `json:"schedule" db:"schedule"`
AlertIds *AlertIds `json:"alertIds" db:"alert_ids"`
CreatedAt time.Time `json:"createdAt" db:"created_at"`
CreatedBy string `json:"createdBy" db:"created_by"`
UpdatedAt time.Time `json:"updatedAt" db:"updated_at"`
UpdatedBy string `json:"updatedBy" db:"updated_by"`
Status string `json:"status"`
Kind string `json:"kind"`
}{
Id: m.Id,
Name: m.Name,
Description: m.Description,
Schedule: m.Schedule,
AlertIds: m.AlertIds,
CreatedAt: m.CreatedAt,
CreatedBy: m.CreatedBy,
UpdatedAt: m.UpdatedAt,
UpdatedBy: m.UpdatedBy,
Status: status,
Kind: kind,
})
}

View File

@ -0,0 +1,230 @@
package rules
import (
"testing"
"time"
)
func TestShouldSkipMaintenance(t *testing.T) {
cases := []struct {
name string
maintenance *PlannedMaintenance
ts time.Time
expected bool
}{
{
name: "fixed planned maintenance start <= ts <= end",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
StartTime: time.Now().UTC().Add(-time.Hour),
EndTime: time.Now().UTC().Add(time.Hour * 2),
},
},
ts: time.Now().UTC(),
expected: true,
},
{
name: "fixed planned maintenance start >= ts",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
StartTime: time.Now().UTC().Add(time.Hour),
EndTime: time.Now().UTC().Add(time.Hour * 2),
},
},
ts: time.Now().UTC(),
expected: false,
},
{
name: "fixed planned maintenance ts < start",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
StartTime: time.Now().UTC().Add(time.Hour),
EndTime: time.Now().UTC().Add(time.Hour * 2),
},
},
ts: time.Now().UTC().Add(-time.Hour),
expected: false,
},
{
name: "recurring maintenance, repeat daily from 12:00 to 14:00",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC),
Duration: Duration(time.Hour * 2),
RepeatType: RepeatTypeDaily,
},
},
},
ts: time.Date(2024, 1, 1, 12, 10, 0, 0, time.UTC),
expected: true,
},
{
name: "recurring maintenance, repeat daily from 12:00 to 14:00",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC),
Duration: Duration(time.Hour * 2),
RepeatType: RepeatTypeDaily,
},
},
},
ts: time.Date(2024, 1, 1, 14, 0, 0, 0, time.UTC),
expected: false,
},
{
name: "recurring maintenance, repeat daily from 12:00 to 14:00",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC),
Duration: Duration(time.Hour * 2),
RepeatType: RepeatTypeDaily,
},
},
},
ts: time.Date(2024, 04, 1, 12, 10, 0, 0, time.UTC),
expected: true,
},
{
name: "recurring maintenance, repeat weekly on monday from 12:00 to 14:00",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
Duration: Duration(time.Hour * 2),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 04, 15, 12, 10, 0, 0, time.UTC),
expected: true,
},
{
name: "recurring maintenance, repeat weekly on monday from 12:00 to 14:00",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
Duration: Duration(time.Hour * 2),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 04, 14, 12, 10, 0, 0, time.UTC), // 14th 04 is sunday
expected: false,
},
{
name: "recurring maintenance, repeat weekly on monday from 12:00 to 14:00",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
Duration: Duration(time.Hour * 2),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 04, 16, 12, 10, 0, 0, time.UTC), // 16th 04 is tuesday
expected: false,
},
{
name: "recurring maintenance, repeat weekly on monday from 12:00 to 14:00",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
Duration: Duration(time.Hour * 2),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 05, 06, 12, 10, 0, 0, time.UTC),
expected: true,
},
{
name: "recurring maintenance, repeat weekly on monday from 12:00 to 14:00",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
Duration: Duration(time.Hour * 2),
RepeatType: RepeatTypeWeekly,
RepeatOn: []RepeatOn{RepeatOnMonday},
},
},
},
ts: time.Date(2024, 05, 06, 14, 00, 0, 0, time.UTC),
expected: false,
},
{
name: "recurring maintenance, repeat monthly on 4th from 12:00 to 14:00",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
Duration: Duration(time.Hour * 2),
RepeatType: RepeatTypeMonthly,
},
},
},
ts: time.Date(2024, 04, 04, 12, 10, 0, 0, time.UTC),
expected: true,
},
{
name: "recurring maintenance, repeat monthly on 4th from 12:00 to 14:00",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
Duration: Duration(time.Hour * 2),
RepeatType: RepeatTypeMonthly,
},
},
},
ts: time.Date(2024, 04, 04, 14, 10, 0, 0, time.UTC),
expected: false,
},
{
name: "recurring maintenance, repeat monthly on 4th from 12:00 to 14:00",
maintenance: &PlannedMaintenance{
Schedule: &Schedule{
Timezone: "UTC",
Recurrence: &Recurrence{
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
Duration: Duration(time.Hour * 2),
RepeatType: RepeatTypeMonthly,
},
},
},
ts: time.Date(2024, 05, 04, 12, 10, 0, 0, time.UTC),
expected: true,
},
}
for _, c := range cases {
result := c.maintenance.shouldSkip(c.name, c.ts)
if result != c.expected {
t.Errorf("expected %v, got %v", c.expected, result)
}
}
}

View File

@ -133,6 +133,10 @@ func (m *Manager) Start() {
m.run()
}
func (m *Manager) RuleDB() RuleDB {
return m.ruleDB
}
func (m *Manager) Pause(b bool) {
m.mtx.Lock()
defer m.mtx.Unlock()
@ -529,7 +533,7 @@ func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string
rules = append(rules, tr)
// create ch rule task for evalution
task = newTask(TaskTypeCh, taskName, taskNamesuffix, time.Duration(r.Frequency), rules, m.opts, m.prepareNotifyFunc())
task = newTask(TaskTypeCh, taskName, taskNamesuffix, time.Duration(r.Frequency), rules, m.opts, m.prepareNotifyFunc(), m.ruleDB)
// add rule to memory
m.rules[ruleId] = tr
@ -551,7 +555,7 @@ func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string
rules = append(rules, pr)
// create promql rule task for evalution
task = newTask(TaskTypeProm, taskName, taskNamesuffix, time.Duration(r.Frequency), rules, m.opts, m.prepareNotifyFunc())
task = newTask(TaskTypeProm, taskName, taskNamesuffix, time.Duration(r.Frequency), rules, m.opts, m.prepareNotifyFunc(), m.ruleDB)
// add rule to memory
m.rules[ruleId] = pr

View File

@ -30,16 +30,17 @@ type PromRuleTask struct {
markStale bool
done chan struct{}
terminated chan struct{}
managerDone chan struct{}
pause bool
logger log.Logger
notify NotifyFunc
ruleDB RuleDB
}
// newPromRuleTask holds rules that have promql condition
// and evalutes the rule at a given frequency
func newPromRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc) *PromRuleTask {
func newPromRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, ruleDB RuleDB) *PromRuleTask {
zap.L().Info("Initiating a new rule group", zap.String("name", name), zap.Duration("frequency", frequency))
if time.Now() == time.Now().Add(frequency) {
@ -57,6 +58,7 @@ func newPromRuleTask(name, file string, frequency time.Duration, rules []Rule, o
done: make(chan struct{}),
terminated: make(chan struct{}),
notify: notify,
ruleDB: ruleDB,
logger: log.With(opts.Logger, "group", name),
}
}
@ -313,10 +315,32 @@ func (g *PromRuleTask) CopyState(fromTask Task) error {
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
zap.L().Info("promql rule task", zap.String("name", g.name), zap.Time("eval started at", ts))
maintenance, err := g.ruleDB.GetAllPlannedMaintenance(ctx)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
}
for i, rule := range g.rules {
if rule == nil {
continue
}
shouldSkip := false
for _, m := range maintenance {
zap.L().Info("checking if rule should be skipped", zap.String("rule", rule.ID()), zap.Any("maintenance", m))
if m.shouldSkip(rule.ID(), ts) {
shouldSkip = true
break
}
}
if shouldSkip {
zap.L().Info("rule should be skipped", zap.String("rule", rule.ID()))
continue
}
select {
case <-g.done:
return

View File

@ -30,12 +30,14 @@ type RuleTask struct {
pause bool
notify NotifyFunc
ruleDB RuleDB
}
const DefaultFrequency = 1 * time.Minute
// newRuleTask makes a new RuleTask with the given name, options, and rules.
func newRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc) *RuleTask {
func newRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, ruleDB RuleDB) *RuleTask {
if time.Now() == time.Now().Add(frequency) {
frequency = DefaultFrequency
@ -52,6 +54,7 @@ func newRuleTask(name, file string, frequency time.Duration, rules []Rule, opts
done: make(chan struct{}),
terminated: make(chan struct{}),
notify: notify,
ruleDB: ruleDB,
}
}
@ -294,10 +297,31 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
zap.L().Debug("rule task eval started", zap.String("name", g.name), zap.Time("start time", ts))
maintenance, err := g.ruleDB.GetAllPlannedMaintenance(ctx)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
}
for i, rule := range g.rules {
if rule == nil {
continue
}
shouldSkip := false
for _, m := range maintenance {
zap.L().Info("checking if rule should be skipped", zap.String("rule", rule.ID()), zap.Any("maintenance", m))
if m.shouldSkip(rule.ID(), ts) {
shouldSkip = true
break
}
}
if shouldSkip {
zap.L().Info("rule should be skipped", zap.String("rule", rule.ID()))
continue
}
select {
case <-g.done:
return

View File

@ -29,9 +29,9 @@ type Task interface {
// newTask returns an appropriate group for
// rule type
func newTask(taskType TaskType, name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc) Task {
func newTask(taskType TaskType, name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, ruleDB RuleDB) Task {
if taskType == TaskTypeCh {
return newRuleTask(name, file, frequency, rules, opts, notify)
return newRuleTask(name, file, frequency, rules, opts, notify, ruleDB)
}
return newPromRuleTask(name, file, frequency, rules, opts, notify)
return newPromRuleTask(name, file, frequency, rules, opts, notify, ruleDB)
}