chore: alert state change and overall status (#5845)

This commit is contained in:
Srikanth Chekuri 2024-09-09 13:06:09 +05:30 committed by GitHub
parent 74c994fbab
commit 3e32dabf46
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 506 additions and 187 deletions

View File

@ -64,7 +64,7 @@ const (
archiveNamespace = "clickhouse-archive"
signozTraceDBName = "signoz_traces"
signozHistoryDBName = "signoz_analytics"
ruleStateHistoryTableName = "distributed_rule_state_history"
ruleStateHistoryTableName = "distributed_rule_state_history_v0"
signozDurationMVTable = "distributed_durationSort"
signozUsageExplorerTable = "distributed_usage_explorer"
signozSpansTable = "distributed_signoz_spans"
@ -5332,6 +5332,18 @@ func (r *ClickHouseReader) AddRuleStateHistory(ctx context.Context, ruleStateHis
return nil
}
func (r *ClickHouseReader) GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]v3.RuleStateHistory, error) {
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE rule_id = '%s' AND state_changed = true ORDER BY unix_milli DESC LIMIT 1 BY fingerprint",
signozHistoryDBName, ruleStateHistoryTableName, ruleID)
history := []v3.RuleStateHistory{}
err := r.db.Select(ctx, &history, query)
if err != nil {
return nil, err
}
return history, nil
}
func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.RuleStateTimeline, error) {
@ -5397,6 +5409,7 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
signozHistoryDBName, ruleStateHistoryTableName, whereClause, params.Order, params.Limit, params.Offset)
history := []v3.RuleStateHistory{}
zap.L().Debug("rule state history query", zap.String("query", query))
err := r.db.Select(ctx, &history, query)
if err != nil {
zap.L().Error("Error while reading rule state history", zap.Error(err))
@ -5404,15 +5417,43 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
}
var total uint64
zap.L().Debug("rule state history total query", zap.String("query", fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE %s",
signozHistoryDBName, ruleStateHistoryTableName, whereClause)))
err = r.db.QueryRow(ctx, fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE %s",
signozHistoryDBName, ruleStateHistoryTableName, whereClause)).Scan(&total)
if err != nil {
return nil, err
}
labelsQuery := fmt.Sprintf("SELECT DISTINCT labels FROM %s.%s WHERE rule_id = $1",
signozHistoryDBName, ruleStateHistoryTableName)
rows, err := r.db.Query(ctx, labelsQuery, ruleID)
if err != nil {
return nil, err
}
defer rows.Close()
labelsMap := make(map[string][]string)
for rows.Next() {
var rawLabel string
err = rows.Scan(&rawLabel)
if err != nil {
return nil, err
}
label := map[string]string{}
err = json.Unmarshal([]byte(rawLabel), &label)
if err != nil {
return nil, err
}
for k, v := range label {
labelsMap[k] = append(labelsMap[k], v)
}
}
timeline := &v3.RuleStateTimeline{
Items: history,
Total: total,
Items: history,
Total: total,
Labels: labelsMap,
}
return timeline, nil
@ -5425,11 +5466,13 @@ func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID(
any(labels) as labels,
count(*) as count
FROM %s.%s
WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d
WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d
GROUP BY fingerprint
HAVING labels != '{}'
ORDER BY count DESC`,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End)
zap.L().Debug("rule state history top contributors query", zap.String("query", query))
contributors := []v3.RuleStateHistoryContributor{}
err := r.db.Select(ctx, &contributors, query)
if err != nil {
@ -5440,7 +5483,7 @@ func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID(
return contributors, nil
}
func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateTransition, error) {
func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.ReleStateItem, error) {
tmpl := `WITH firing_events AS (
SELECT
@ -5448,7 +5491,7 @@ func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleI
state,
unix_milli AS firing_time
FROM %s.%s
WHERE overall_state = 'firing'
WHERE overall_state = '` + model.StateFiring.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
@ -5459,7 +5502,7 @@ resolution_events AS (
state,
unix_milli AS resolution_time
FROM %s.%s
WHERE overall_state = 'normal'
WHERE overall_state = '` + model.StateInactive.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
@ -5484,13 +5527,87 @@ ORDER BY firing_time ASC;`
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
zap.L().Debug("overall state transitions query", zap.String("query", query))
transitions := []v3.RuleStateTransition{}
err := r.db.Select(ctx, &transitions, query)
if err != nil {
return nil, err
}
return transitions, nil
stateItems := []v3.ReleStateItem{}
for idx, item := range transitions {
start := item.FiringTime
end := item.ResolutionTime
stateItems = append(stateItems, v3.ReleStateItem{
State: item.State,
Start: start,
End: end,
})
if idx < len(transitions)-1 {
nextStart := transitions[idx+1].FiringTime
if nextStart > end {
stateItems = append(stateItems, v3.ReleStateItem{
State: model.StateInactive,
Start: end,
End: nextStart,
})
}
}
}
// fetch the most recent overall_state from the table
var state model.AlertState
stateQuery := fmt.Sprintf("SELECT state FROM %s.%s WHERE rule_id = '%s' AND unix_milli <= %d ORDER BY unix_milli DESC LIMIT 1",
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.End)
if err := r.db.QueryRow(ctx, stateQuery).Scan(&state); err != nil {
if err != sql.ErrNoRows {
return nil, err
}
state = model.StateInactive
}
if len(transitions) == 0 {
// no transitions found, it is either firing or inactive for whole time range
stateItems = append(stateItems, v3.ReleStateItem{
State: state,
Start: params.Start,
End: params.End,
})
} else {
// there were some transitions, we need to add the last state at the end
if state == model.StateInactive {
stateItems = append(stateItems, v3.ReleStateItem{
State: model.StateInactive,
Start: transitions[len(transitions)-1].ResolutionTime,
End: params.End,
})
} else {
// fetch the most recent firing event from the table in the given time range
var firingTime int64
firingQuery := fmt.Sprintf(`
SELECT
unix_milli
FROM %s.%s
WHERE rule_id = '%s' AND overall_state_changed = true AND overall_state = '%s' AND unix_milli <= %d
ORDER BY unix_milli DESC LIMIT 1`, signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.End)
if err := r.db.QueryRow(ctx, firingQuery).Scan(&firingTime); err != nil {
return nil, err
}
stateItems = append(stateItems, v3.ReleStateItem{
State: model.StateInactive,
Start: transitions[len(transitions)-1].ResolutionTime,
End: firingTime,
})
stateItems = append(stateItems, v3.ReleStateItem{
State: model.StateFiring,
Start: firingTime,
End: params.End,
})
}
}
return stateItems, nil
}
func (r *ClickHouseReader) GetAvgResolutionTime(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (float64, error) {
@ -5502,7 +5619,7 @@ WITH firing_events AS (
state,
unix_milli AS firing_time
FROM %s.%s
WHERE overall_state = 'firing'
WHERE overall_state = '` + model.StateFiring.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
@ -5513,7 +5630,7 @@ resolution_events AS (
state,
unix_milli AS resolution_time
FROM %s.%s
WHERE overall_state = 'normal'
WHERE overall_state = '` + model.StateInactive.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
@ -5538,6 +5655,7 @@ FROM matched_events;
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
zap.L().Debug("avg resolution time query", zap.String("query", query))
var avgResolutionTime float64
err := r.db.QueryRow(ctx, query).Scan(&avgResolutionTime)
if err != nil {
@ -5558,7 +5676,7 @@ WITH firing_events AS (
state,
unix_milli AS firing_time
FROM %s.%s
WHERE overall_state = 'firing'
WHERE overall_state = '` + model.StateFiring.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
@ -5569,7 +5687,7 @@ resolution_events AS (
state,
unix_milli AS resolution_time
FROM %s.%s
WHERE overall_state = 'normal'
WHERE overall_state = '` + model.StateInactive.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
@ -5595,6 +5713,7 @@ ORDER BY ts ASC;`
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End, step)
zap.L().Debug("avg resolution time by interval query", zap.String("query", query))
result, err := r.GetTimeSeriesResultV3(ctx, query)
if err != nil || len(result) == 0 {
return nil, err
@ -5604,10 +5723,11 @@ ORDER BY ts ASC;`
}
func (r *ClickHouseReader) GetTotalTriggers(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (uint64, error) {
query := fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d",
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
query := fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d",
signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End)
var totalTriggers uint64
err := r.db.QueryRow(ctx, query).Scan(&totalTriggers)
if err != nil {
return 0, err
@ -5619,8 +5739,8 @@ func (r *ClickHouseReader) GetTotalTriggers(ctx context.Context, ruleID string,
func (r *ClickHouseReader) GetTriggersByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) {
step := common.MinAllowedStepInterval(params.Start, params.End)
query := fmt.Sprintf("SELECT count(*), toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d GROUP BY ts ORDER BY ts ASC",
step, signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
query := fmt.Sprintf("SELECT count(*), toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d GROUP BY ts ORDER BY ts ASC",
step, signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End)
result, err := r.GetTimeSeriesResultV3(ctx, query)
if err != nil || len(result) == 0 {

View File

@ -746,34 +746,12 @@ func (aH *APIHandler) getOverallStateTransitions(w http.ResponseWriter, r *http.
return
}
res, err := aH.reader.GetOverallStateTransitions(r.Context(), ruleID, &params)
stateItems, err := aH.reader.GetOverallStateTransitions(r.Context(), ruleID, &params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
stateItems := []v3.ReleStateItem{}
for idx, item := range res {
start := item.FiringTime
end := item.ResolutionTime
stateItems = append(stateItems, v3.ReleStateItem{
State: item.State,
Start: start,
End: end,
})
if idx < len(res)-1 {
nextStart := res[idx+1].FiringTime
if nextStart > end {
stateItems = append(stateItems, v3.ReleStateItem{
State: "normal",
Start: end,
End: nextStart,
})
}
}
}
aH.Respond(w, stateItems)
}

View File

@ -109,13 +109,15 @@ type Reader interface {
GetMetricMetadata(context.Context, string, string) (*v3.MetricMetadataResponse, error)
AddRuleStateHistory(ctx context.Context, ruleStateHistory []v3.RuleStateHistory) error
GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateTransition, error)
GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.ReleStateItem, error)
ReadRuleStateHistoryByRuleID(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.RuleStateTimeline, error)
GetTotalTriggers(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (uint64, error)
GetTriggersByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error)
GetAvgResolutionTime(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (float64, error)
GetAvgResolutionTimeByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error)
ReadRuleStateHistoryTopContributorsByRuleID(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateHistoryContributor, error)
GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]v3.RuleStateHistory, error)
GetMinAndMaxTimestampForTraceID(ctx context.Context, traceID []string) (int64, int64, error)
// Query Progress tracking helpers.

View File

@ -60,7 +60,7 @@ func ClickHouseMigrate(conn driver.Conn, cluster string) error {
database := "CREATE DATABASE IF NOT EXISTS signoz_analytics ON CLUSTER %s"
localTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.rule_state_history ON CLUSTER %s
localTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.rule_state_history_v0 ON CLUSTER %s
(
_retention_days UInt32 DEFAULT 180,
rule_id LowCardinality(String),
@ -80,7 +80,7 @@ ORDER BY (rule_id, unix_milli)
TTL toDateTime(unix_milli / 1000) + toIntervalDay(_retention_days)
SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192`
distributedTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.distributed_rule_state_history ON CLUSTER %s
distributedTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.distributed_rule_state_history_v0 ON CLUSTER %s
(
rule_id LowCardinality(String),
rule_name LowCardinality(String),
@ -93,7 +93,7 @@ SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192`
value Float64 CODEC(Gorilla, ZSTD(1)),
labels String CODEC(ZSTD(5)),
)
ENGINE = Distributed(%s, signoz_analytics, rule_state_history, cityHash64(rule_id, rule_name, fingerprint))`
ENGINE = Distributed(%s, signoz_analytics, rule_state_history_v0, cityHash64(rule_id, rule_name, fingerprint))`
// check if db exists
dbExists := `SELECT count(*) FROM system.databases WHERE name = 'signoz_analytics'`
@ -111,7 +111,7 @@ ENGINE = Distributed(%s, signoz_analytics, rule_state_history, cityHash64(rule_i
}
// check if table exists
tableExists := `SELECT count(*) FROM system.tables WHERE name = 'rule_state_history' AND database = 'signoz_analytics'`
tableExists := `SELECT count(*) FROM system.tables WHERE name = 'rule_state_history_v0' AND database = 'signoz_analytics'`
var tableCount uint64
err = conn.QueryRow(context.Background(), tableExists).Scan(&tableCount)
if err != nil {
@ -126,7 +126,7 @@ ENGINE = Distributed(%s, signoz_analytics, rule_state_history, cityHash64(rule_i
}
// check if distributed table exists
distributedTableExists := `SELECT count(*) FROM system.tables WHERE name = 'distributed_rule_state_history' AND database = 'signoz_analytics'`
distributedTableExists := `SELECT count(*) FROM system.tables WHERE name = 'distributed_rule_state_history_v0' AND database = 'signoz_analytics'`
var distributedTableCount uint64
err = conn.QueryRow(context.Background(), distributedTableExists).Scan(&distributedTableCount)
if err != nil {

View File

@ -0,0 +1,90 @@
package model
import (
"database/sql/driver"
"encoding/json"
"github.com/pkg/errors"
)
// AlertState denotes the state of an active alert.
type AlertState int
const (
StateInactive AlertState = iota
StatePending
StateFiring
StateNoData
StateDisabled
)
func (s AlertState) String() string {
switch s {
case StateInactive:
return "inactive"
case StatePending:
return "pending"
case StateFiring:
return "firing"
case StateNoData:
return "nodata"
case StateDisabled:
return "disabled"
}
panic(errors.Errorf("unknown alert state: %d", s))
}
func (s AlertState) MarshalJSON() ([]byte, error) {
return json.Marshal(s.String())
}
func (s *AlertState) UnmarshalJSON(b []byte) error {
var v interface{}
if err := json.Unmarshal(b, &v); err != nil {
return err
}
switch value := v.(type) {
case string:
switch value {
case "inactive":
*s = StateInactive
case "pending":
*s = StatePending
case "firing":
*s = StateFiring
case "nodata":
*s = StateNoData
case "disabled":
*s = StateDisabled
default:
return errors.New("invalid alert state")
}
return nil
default:
return errors.New("invalid alert state")
}
}
func (s *AlertState) Scan(value interface{}) error {
v, ok := value.(string)
if !ok {
return errors.New("invalid alert state")
}
switch v {
case "inactive":
*s = StateInactive
case "pending":
*s = StatePending
case "firing":
*s = StateFiring
case "nodata":
*s = StateNoData
case "disabled":
*s = StateDisabled
}
return nil
}
func (s *AlertState) Value() (driver.Value, error) {
return s.String(), nil
}

View File

@ -1183,23 +1183,24 @@ func (l LabelsString) String() string {
}
type RuleStateTimeline struct {
Items []RuleStateHistory `json:"items"`
Total uint64 `json:"total"`
Items []RuleStateHistory `json:"items"`
Total uint64 `json:"total"`
Labels map[string][]string `json:"labels"`
}
type RuleStateHistory struct {
RuleID string `json:"ruleID" ch:"rule_id"`
RuleName string `json:"ruleName" ch:"rule_name"`
// One of ["normal", "firing"]
OverallState string `json:"overallState" ch:"overall_state"`
OverallStateChanged bool `json:"overallStateChanged" ch:"overall_state_changed"`
OverallState model.AlertState `json:"overallState" ch:"overall_state"`
OverallStateChanged bool `json:"overallStateChanged" ch:"overall_state_changed"`
// One of ["normal", "firing", "no_data", "muted"]
State string `json:"state" ch:"state"`
StateChanged bool `json:"stateChanged" ch:"state_changed"`
UnixMilli int64 `json:"unixMilli" ch:"unix_milli"`
Labels LabelsString `json:"labels" ch:"labels"`
Fingerprint uint64 `json:"fingerprint" ch:"fingerprint"`
Value float64 `json:"value" ch:"value"`
State model.AlertState `json:"state" ch:"state"`
StateChanged bool `json:"stateChanged" ch:"state_changed"`
UnixMilli int64 `json:"unixMilli" ch:"unix_milli"`
Labels LabelsString `json:"labels" ch:"labels"`
Fingerprint uint64 `json:"fingerprint" ch:"fingerprint"`
Value float64 `json:"value" ch:"value"`
RelatedTracesLink string `json:"relatedTracesLink"`
RelatedLogsLink string `json:"relatedLogsLink"`
@ -1237,16 +1238,16 @@ type RuleStateHistoryContributor struct {
}
type RuleStateTransition struct {
RuleID string `json:"ruleID" ch:"rule_id"`
State string `json:"state" ch:"state"`
FiringTime int64 `json:"firingTime" ch:"firing_time"`
ResolutionTime int64 `json:"resolutionTime" ch:"resolution_time"`
RuleID string `json:"ruleID" ch:"rule_id"`
State model.AlertState `json:"state" ch:"state"`
FiringTime int64 `json:"firingTime" ch:"firing_time"`
ResolutionTime int64 `json:"resolutionTime" ch:"resolution_time"`
}
type ReleStateItem struct {
State string `json:"state"`
Start int64 `json:"start"`
End int64 `json:"end"`
State model.AlertState `json:"state"`
Start int64 `json:"start"`
End int64 `json:"end"`
}
type Stats struct {

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils/labels"
)
@ -37,61 +38,8 @@ const (
HealthBad RuleHealth = "err"
)
// AlertState denotes the state of an active alert.
type AlertState int
const (
StateInactive AlertState = iota
StatePending
StateFiring
StateDisabled
)
func (s AlertState) String() string {
switch s {
case StateInactive:
return "inactive"
case StatePending:
return "pending"
case StateFiring:
return "firing"
case StateDisabled:
return "disabled"
}
panic(errors.Errorf("unknown alert state: %d", s))
}
func (s AlertState) MarshalJSON() ([]byte, error) {
return json.Marshal(s.String())
}
func (s *AlertState) UnmarshalJSON(b []byte) error {
var v interface{}
if err := json.Unmarshal(b, &v); err != nil {
return err
}
switch value := v.(type) {
case string:
switch value {
case "inactive":
*s = StateInactive
case "pending":
*s = StatePending
case "firing":
*s = StateFiring
case "disabled":
*s = StateDisabled
default:
return errors.New("invalid alert state")
}
return nil
default:
return errors.New("invalid alert state")
}
}
type Alert struct {
State AlertState
State model.AlertState
Labels labels.BaseLabels
Annotations labels.BaseLabels
@ -114,7 +62,7 @@ type Alert struct {
}
func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
if a.State == StatePending {
if a.State == model.StatePending {
return false
}

View File

@ -259,8 +259,8 @@ type GettableRules struct {
// GettableRule has info for an alerting rules.
type GettableRule struct {
Id string `json:"id"`
State AlertState `json:"state"`
Id string `json:"id"`
State model.AlertState `json:"state"`
PostableRule
CreatedAt *time.Time `json:"createAt"`
CreatedBy *string `json:"createBy"`

View File

@ -617,7 +617,7 @@ func (m *Manager) ListRuleStates(ctx context.Context) (*GettableRules, error) {
// fetch state of rule from memory
if rm, ok := m.rules[ruleResponse.Id]; !ok {
ruleResponse.State = StateDisabled
ruleResponse.State = model.StateDisabled
ruleResponse.Disabled = true
} else {
ruleResponse.State = rm.State()
@ -644,7 +644,7 @@ func (m *Manager) GetRule(ctx context.Context, id string) (*GettableRule, error)
r.Id = fmt.Sprintf("%d", s.Id)
// fetch state of rule from memory
if rm, ok := m.rules[r.Id]; !ok {
r.State = StateDisabled
r.State = model.StateDisabled
r.Disabled = true
} else {
r.State = rm.State()
@ -751,7 +751,7 @@ func (m *Manager) PatchRule(ctx context.Context, ruleStr string, ruleId string)
// fetch state of rule from memory
if rm, ok := m.rules[ruleId]; !ok {
response.State = StateDisabled
response.State = model.StateDisabled
response.Disabled = true
} else {
response.State = rm.State()

View File

@ -15,6 +15,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/converter"
"go.signoz.io/signoz/pkg/query-service/formatter"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
qslabels "go.signoz.io/signoz/pkg/query-service/utils/labels"
"go.signoz.io/signoz/pkg/query-service/utils/times"
@ -56,6 +57,8 @@ type PromRule struct {
opts PromRuleOpts
reader interfaces.Reader
handledRestart bool
}
func NewPromRule(
@ -218,9 +221,9 @@ func (r *PromRule) GetEvaluationTimestamp() time.Time {
// State returns the maximum state of alert instances for this rule.
// StateFiring > StatePending > StateInactive
func (r *PromRule) State() AlertState {
func (r *PromRule) State() model.AlertState {
maxState := StateInactive
maxState := model.StateInactive
for _, a := range r.active {
if a.State > maxState {
maxState = a.State
@ -338,6 +341,102 @@ func (r *PromRule) compareOp() CompareOp {
return r.ruleCondition.CompareOp
}
// TODO(srikanthccv): implement base rule and use for all types of rules
func (r *PromRule) recordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error {
zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd))
revisedItemsToAdd := map[uint64]v3.RuleStateHistory{}
lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID())
if err != nil {
return err
}
// if the query-service has been restarted, or the rule has been modified (which re-initializes the rule),
// the state would reset so we need to add the corresponding state changes to previously saved states
if !r.handledRestart && len(lastSavedState) > 0 {
zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState))
l := map[uint64]v3.RuleStateHistory{}
for _, item := range itemsToAdd {
l[item.Fingerprint] = item
}
shouldSkip := map[uint64]bool{}
for _, item := range lastSavedState {
// for the last saved item with fingerprint, check if there is a corresponding entry in the current state
currentState, ok := l[item.Fingerprint]
if !ok {
// there was a state change in the past, but not in the current state
// if the state was firing, then we should add a resolved state change
if item.State == model.StateFiring || item.State == model.StateNoData {
item.State = model.StateInactive
item.StateChanged = true
item.UnixMilli = time.Now().UnixMilli()
revisedItemsToAdd[item.Fingerprint] = item
}
// there is nothing to do if the prev state was normal
} else {
if item.State != currentState.State {
item.State = currentState.State
item.StateChanged = true
item.UnixMilli = time.Now().UnixMilli()
revisedItemsToAdd[item.Fingerprint] = item
}
}
// do not add this item to revisedItemsToAdd as it is already processed
shouldSkip[item.Fingerprint] = true
}
zap.L().Debug("after lastSavedState loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
// if there are any new state changes that were not saved, add them to the revised items
for _, item := range itemsToAdd {
if _, ok := revisedItemsToAdd[item.Fingerprint]; !ok && !shouldSkip[item.Fingerprint] {
revisedItemsToAdd[item.Fingerprint] = item
}
}
zap.L().Debug("after itemsToAdd loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
newState := model.StateInactive
for _, item := range revisedItemsToAdd {
if item.State == model.StateFiring || item.State == model.StateNoData {
newState = model.StateFiring
break
}
}
zap.L().Debug("newState", zap.String("ruleid", r.ID()), zap.Any("newState", newState))
// if there is a change in the overall state, update the overall state
if lastSavedState[0].OverallState != newState {
for fingerprint, item := range revisedItemsToAdd {
item.OverallState = newState
item.OverallStateChanged = true
revisedItemsToAdd[fingerprint] = item
}
}
zap.L().Debug("revisedItemsToAdd after newState", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
} else {
for _, item := range itemsToAdd {
revisedItemsToAdd[item.Fingerprint] = item
}
}
if len(revisedItemsToAdd) > 0 && r.reader != nil {
zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
entries := make([]v3.RuleStateHistory, 0, len(revisedItemsToAdd))
for _, item := range revisedItemsToAdd {
entries = append(entries, item)
}
err := r.reader.AddRuleStateHistory(ctx, entries)
if err != nil {
zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd))
}
}
r.handledRestart = true
return nil
}
func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) {
prevState := r.State()
@ -442,7 +541,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
QueryResultLables: resultLabels,
Annotations: annotations,
ActiveAt: ts,
State: StatePending,
State: model.StatePending,
Value: alertSmpl.F,
GeneratorURL: r.GeneratorURL(),
Receivers: r.preferredChannels,
@ -454,7 +553,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
// Update the last value and annotations if so, create a new alert entry otherwise.
if alert, ok := r.active[h]; ok && alert.State != StateInactive {
if alert, ok := r.active[h]; ok && alert.State != model.StateInactive {
alert.Value = a.Value
alert.Annotations = a.Annotations
alert.Receivers = r.preferredChannels
@ -469,23 +568,23 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.active {
labelsJSON, err := json.Marshal(a.Labels)
labelsJSON, err := json.Marshal(a.QueryResultLables)
if err != nil {
zap.L().Error("error marshaling labels", zap.Error(err), zap.String("name", r.Name()))
}
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
delete(r.active, fp)
}
if a.State != StateInactive {
a.State = StateInactive
if a.State != model.StateInactive {
a.State = model.StateInactive
a.ResolvedAt = ts
itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: "normal",
State: model.StateInactive,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: v3.LabelsString(labelsJSON),
@ -495,12 +594,12 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
continue
}
if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {
a.State = StateFiring
if a.State == model.StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {
a.State = model.StateFiring
a.FiredAt = ts
state := "firing"
state := model.StateFiring
if a.Missing {
state = "no_data"
state = model.StateNoData
}
itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{
RuleID: r.ID(),
@ -520,23 +619,14 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
currentState := r.State()
if currentState != prevState {
for idx := range itemsToAdd {
if currentState == StateInactive {
itemsToAdd[idx].OverallState = "normal"
} else {
itemsToAdd[idx].OverallState = currentState.String()
}
itemsToAdd[idx].OverallStateChanged = true
}
overallStateChanged := currentState != prevState
for idx, item := range itemsToAdd {
item.OverallStateChanged = overallStateChanged
item.OverallState = currentState
itemsToAdd[idx] = item
}
if len(itemsToAdd) > 0 && r.reader != nil {
err := r.reader.AddRuleStateHistory(ctx, itemsToAdd)
if err != nil {
zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd))
}
}
r.recordRuleStateHistory(ctx, prevState, currentState, itemsToAdd)
return len(r.active), nil
}

View File

@ -192,7 +192,7 @@ func (g *PromRuleTask) HasAlertingRules() bool {
defer g.mtx.Unlock()
for _, rule := range g.rules {
if _, ok := rule.(*ThresholdRule); ok {
if _, ok := rule.(*PromRule); ok {
return true
}
}
@ -284,11 +284,11 @@ func (g *PromRuleTask) CopyState(fromTask Task) error {
g.seriesInPreviousEval[i] = from.seriesInPreviousEval[fi]
ruleMap[nameAndLabels] = indexes[1:]
ar, ok := rule.(*ThresholdRule)
ar, ok := rule.(*PromRule)
if !ok {
continue
}
far, ok := from.rules[fi].(*ThresholdRule)
far, ok := from.rules[fi].(*PromRule)
if !ok {
continue
}
@ -296,6 +296,7 @@ func (g *PromRuleTask) CopyState(fromTask Task) error {
for fp, a := range far.active {
ar.active[fp] = a
}
ar.handledRestart = far.handledRestart
}
// Handle deleted and unmatched duplicate rules.

View File

@ -4,6 +4,7 @@ import (
"context"
"time"
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/utils/labels"
)
@ -17,7 +18,7 @@ type Rule interface {
Labels() labels.BaseLabels
Annotations() labels.BaseLabels
Condition() *RuleCondition
State() AlertState
State() model.AlertState
ActiveAlerts() []*Alert
PreferredChannels() []string

View File

@ -288,6 +288,7 @@ func (g *RuleTask) CopyState(fromTask Task) error {
for fp, a := range far.active {
ar.active[fp] = a
}
ar.handledRestart = far.handledRestart
}
return nil

View File

@ -20,6 +20,7 @@ import (
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/converter"
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/postprocess"
"go.signoz.io/signoz/pkg/query-service/app/querier"
@ -100,6 +101,8 @@ type ThresholdRule struct {
reader interfaces.Reader
evalDelay time.Duration
handledRestart bool
}
type ThresholdRuleOpts struct {
@ -309,9 +312,9 @@ func (r *ThresholdRule) GetEvaluationTimestamp() time.Time {
// State returns the maximum state of alert instances for this rule.
// StateFiring > StatePending > StateInactive
func (r *ThresholdRule) State() AlertState {
func (r *ThresholdRule) State() model.AlertState {
maxState := StateInactive
maxState := model.StateInactive
for _, a := range r.active {
if a.State > maxState {
maxState = a.State
@ -898,6 +901,102 @@ func normalizeLabelName(name string) string {
return normalized
}
// TODO(srikanthccv): implement base rule and use for all types of rules
func (r *ThresholdRule) recordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error {
zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd))
revisedItemsToAdd := map[uint64]v3.RuleStateHistory{}
lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID())
if err != nil {
return err
}
// if the query-service has been restarted, or the rule has been modified (which re-initializes the rule),
// the state would reset so we need to add the corresponding state changes to previously saved states
if !r.handledRestart && len(lastSavedState) > 0 {
zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState))
l := map[uint64]v3.RuleStateHistory{}
for _, item := range itemsToAdd {
l[item.Fingerprint] = item
}
shouldSkip := map[uint64]bool{}
for _, item := range lastSavedState {
// for the last saved item with fingerprint, check if there is a corresponding entry in the current state
currentState, ok := l[item.Fingerprint]
if !ok {
// there was a state change in the past, but not in the current state
// if the state was firing, then we should add a resolved state change
if item.State == model.StateFiring || item.State == model.StateNoData {
item.State = model.StateInactive
item.StateChanged = true
item.UnixMilli = time.Now().UnixMilli()
revisedItemsToAdd[item.Fingerprint] = item
}
// there is nothing to do if the prev state was normal
} else {
if item.State != currentState.State {
item.State = currentState.State
item.StateChanged = true
item.UnixMilli = time.Now().UnixMilli()
revisedItemsToAdd[item.Fingerprint] = item
}
}
// do not add this item to revisedItemsToAdd as it is already processed
shouldSkip[item.Fingerprint] = true
}
zap.L().Debug("after lastSavedState loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
// if there are any new state changes that were not saved, add them to the revised items
for _, item := range itemsToAdd {
if _, ok := revisedItemsToAdd[item.Fingerprint]; !ok && !shouldSkip[item.Fingerprint] {
revisedItemsToAdd[item.Fingerprint] = item
}
}
zap.L().Debug("after itemsToAdd loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
newState := model.StateInactive
for _, item := range revisedItemsToAdd {
if item.State == model.StateFiring || item.State == model.StateNoData {
newState = model.StateFiring
break
}
}
zap.L().Debug("newState", zap.String("ruleid", r.ID()), zap.Any("newState", newState))
// if there is a change in the overall state, update the overall state
if lastSavedState[0].OverallState != newState {
for fingerprint, item := range revisedItemsToAdd {
item.OverallState = newState
item.OverallStateChanged = true
revisedItemsToAdd[fingerprint] = item
}
}
zap.L().Debug("revisedItemsToAdd after newState", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
} else {
for _, item := range itemsToAdd {
revisedItemsToAdd[item.Fingerprint] = item
}
}
if len(revisedItemsToAdd) > 0 && r.reader != nil {
zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
entries := make([]v3.RuleStateHistory, 0, len(revisedItemsToAdd))
for _, item := range revisedItemsToAdd {
entries = append(entries, item)
}
err := r.reader.AddRuleStateHistory(ctx, entries)
if err != nil {
zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd))
}
}
r.handledRestart = true
return nil
}
func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) {
prevState := r.State()
@ -1005,7 +1104,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
QueryResultLables: resultLabels,
Annotations: annotations,
ActiveAt: ts,
State: StatePending,
State: model.StatePending,
Value: smpl.V,
GeneratorURL: r.GeneratorURL(),
Receivers: r.preferredChannels,
@ -1019,7 +1118,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
// Update the last value and annotations if so, create a new alert entry otherwise.
if alert, ok := r.active[h]; ok && alert.State != StateInactive {
if alert, ok := r.active[h]; ok && alert.State != model.StateInactive {
alert.Value = a.Value
alert.Annotations = a.Annotations
@ -1042,31 +1141,32 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
delete(r.active, fp)
}
if a.State != StateInactive {
a.State = StateInactive
if a.State != model.StateInactive {
a.State = model.StateInactive
a.ResolvedAt = ts
itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: "normal",
State: model.StateInactive,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: v3.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLables.Hash(),
Value: a.Value,
})
}
continue
}
if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {
a.State = StateFiring
if a.State == model.StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {
a.State = model.StateFiring
a.FiredAt = ts
state := "firing"
state := model.StateFiring
if a.Missing {
state = "no_data"
state = model.StateNoData
}
itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{
RuleID: r.ID(),
@ -1083,28 +1183,15 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
currentState := r.State()
if currentState != prevState {
for idx := range itemsToAdd {
if currentState == StateInactive {
itemsToAdd[idx].OverallState = "normal"
} else {
itemsToAdd[idx].OverallState = currentState.String()
}
itemsToAdd[idx].OverallStateChanged = true
}
} else {
for idx := range itemsToAdd {
itemsToAdd[idx].OverallState = currentState.String()
itemsToAdd[idx].OverallStateChanged = false
}
overallStateChanged := currentState != prevState
for idx, item := range itemsToAdd {
item.OverallStateChanged = overallStateChanged
item.OverallState = currentState
itemsToAdd[idx] = item
}
if len(itemsToAdd) > 0 && r.reader != nil {
err := r.reader.AddRuleStateHistory(ctx, itemsToAdd)
if err != nil {
zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd))
}
}
r.recordRuleStateHistory(ctx, prevState, currentState, itemsToAdd)
r.health = HealthGood
r.lastError = err