From bb849604427c94686156e3426c8d6f85e60fe5da Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Fri, 9 Aug 2024 12:11:05 +0530 Subject: [PATCH] chore: add alerts state history query service impl (#5255) --- ee/query-service/app/server.go | 8 + .../app/clickhouseReader/reader.go | 315 ++++++++++++++++++ pkg/query-service/app/http_handler.go | 158 +++++++++ pkg/query-service/app/server.go | 8 + pkg/query-service/interfaces/interface.go | 8 + pkg/query-service/migrate/migate.go | 90 +++++ pkg/query-service/model/v3/v3.go | 93 +++++- pkg/query-service/rules/alerting.go | 2 + pkg/query-service/rules/manager.go | 2 + pkg/query-service/rules/promRule.go | 59 +++- pkg/query-service/rules/promrule_test.go | 2 +- pkg/query-service/rules/thresholdRule.go | 60 +++- 12 files changed, 798 insertions(+), 7 deletions(-) diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index a016d18d7e..464517ef1a 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -28,6 +28,7 @@ import ( "go.signoz.io/signoz/ee/query-service/integrations/gateway" "go.signoz.io/signoz/ee/query-service/interfaces" baseauth "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/migrate" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" @@ -179,6 +180,13 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } + go func() { + err = migrate.ClickHouseMigrate(reader.GetConn(), serverOptions.Cluster) + if err != nil { + zap.L().Error("error while running clickhouse migrations", zap.Error(err)) + } + }() + // initiate opamp _, err = opAmpModel.InitDB(localDB) if err != nil { diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 0c8ba834fe..cc27b4f2eb 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -62,6 +62,8 @@ const ( primaryNamespace = "clickhouse" archiveNamespace = "clickhouse-archive" signozTraceDBName = "signoz_traces" + signozHistoryDBName = "signoz_analytics" + ruleStateHistoryTableName = "distributed_rule_state_history" signozDurationMVTable = "distributed_durationSort" signozUsageExplorerTable = "distributed_usage_explorer" signozSpansTable = "distributed_signoz_spans" @@ -5125,6 +5127,319 @@ func (r *ClickHouseReader) LiveTailLogsV3(ctx context.Context, query string, tim } } +func (r *ClickHouseReader) AddRuleStateHistory(ctx context.Context, ruleStateHistory []v3.RuleStateHistory) error { + var statement driver.Batch + var err error + + defer func() { + if statement != nil { + statement.Abort() + } + }() + + statement, err = r.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (rule_id, rule_name, overall_state, overall_state_changed, state, state_changed, unix_milli, labels, fingerprint, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", + signozHistoryDBName, ruleStateHistoryTableName)) + + if err != nil { + return err + } + + for _, history := range ruleStateHistory { + err = statement.Append(history.RuleID, history.RuleName, history.OverallState, history.OverallStateChanged, history.State, history.StateChanged, history.UnixMilli, history.Labels, history.Fingerprint, history.Value) + if err != nil { + return err + } + } + + err = statement.Send() + if err != nil { + return err + } + return nil +} + +func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID( + ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateHistory, error) { + + var conditions []string + + conditions = append(conditions, fmt.Sprintf("rule_id = '%s'", ruleID)) + + conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", params.Start, params.End)) + + if params.Filters != nil && len(params.Filters.Items) != 0 { + for _, item := range params.Filters.Items { + toFormat := item.Value + op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator)))) + if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains { + toFormat = fmt.Sprintf("%%%s%%", toFormat) + } + fmtVal := utils.ClickHouseFormattedValue(toFormat) + switch op { + case v3.FilterOperatorEqual: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotEqual: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorIn: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotIn: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorLike: + conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotLike: + conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorRegex: + conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotRegex: + conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorGreaterThan: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorGreaterThanOrEq: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorLessThan: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorLessThanOrEq: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorContains: + conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotContains: + conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorExists: + conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", item.Key.Key)) + case v3.FilterOperatorNotExists: + conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", item.Key.Key)) + default: + return nil, fmt.Errorf("unsupported filter operator") + } + } + } + whereClause := strings.Join(conditions, " AND ") + + query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s ORDER BY unix_milli %s LIMIT %d OFFSET %d", + signozHistoryDBName, ruleStateHistoryTableName, whereClause, params.Order, params.Limit, params.Offset) + + history := []v3.RuleStateHistory{} + err := r.db.Select(ctx, &history, query) + if err != nil { + zap.L().Error("Error while reading rule state history", zap.Error(err)) + return nil, err + } + + return history, nil +} + +func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID( + ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateHistoryContributor, error) { + query := fmt.Sprintf(`SELECT + fingerprint, + any(labels) as labels, + count(*) as count + FROM %s.%s + WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d + GROUP BY fingerprint + ORDER BY count DESC`, + signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End) + + contributors := []v3.RuleStateHistoryContributor{} + err := r.db.Select(ctx, &contributors, query) + if err != nil { + zap.L().Error("Error while reading rule state history", zap.Error(err)) + return nil, err + } + + return contributors, nil +} + +func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateTransition, error) { + + tmpl := `WITH firing_events AS ( + SELECT + rule_id, + state, + unix_milli AS firing_time + FROM %s.%s + WHERE overall_state = 'firing' + AND overall_state_changed = true + AND rule_id IN ('%s') + AND unix_milli >= %d AND unix_milli <= %d +), +resolution_events AS ( + SELECT + rule_id, + state, + unix_milli AS resolution_time + FROM %s.%s + WHERE overall_state = 'normal' + AND overall_state_changed = true + AND rule_id IN ('%s') + AND unix_milli >= %d AND unix_milli <= %d +), +matched_events AS ( + SELECT + f.rule_id, + f.state, + f.firing_time, + MIN(r.resolution_time) AS resolution_time + FROM firing_events f + LEFT JOIN resolution_events r + ON f.rule_id = r.rule_id + WHERE r.resolution_time > f.firing_time + GROUP BY f.rule_id, f.state, f.firing_time +) +SELECT * +FROM matched_events +ORDER BY firing_time ASC;` + + query := fmt.Sprintf(tmpl, + signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End, + signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End) + + transitions := []v3.RuleStateTransition{} + err := r.db.Select(ctx, &transitions, query) + if err != nil { + return nil, err + } + + return transitions, nil +} + +func (r *ClickHouseReader) GetAvgResolutionTime(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (float64, error) { + + tmpl := ` +WITH firing_events AS ( + SELECT + rule_id, + state, + unix_milli AS firing_time + FROM %s.%s + WHERE overall_state = 'firing' + AND overall_state_changed = true + AND rule_id IN ('%s') + AND unix_milli >= %d AND unix_milli <= %d +), +resolution_events AS ( + SELECT + rule_id, + state, + unix_milli AS resolution_time + FROM %s.%s + WHERE overall_state = 'normal' + AND overall_state_changed = true + AND rule_id IN ('%s') + AND unix_milli >= %d AND unix_milli <= %d +), +matched_events AS ( + SELECT + f.rule_id, + f.state, + f.firing_time, + MIN(r.resolution_time) AS resolution_time + FROM firing_events f + LEFT JOIN resolution_events r + ON f.rule_id = r.rule_id + WHERE r.resolution_time > f.firing_time + GROUP BY f.rule_id, f.state, f.firing_time +) +SELECT AVG(resolution_time - firing_time) / 1000 AS avg_resolution_time +FROM matched_events; +` + + query := fmt.Sprintf(tmpl, + signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End, + signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End) + + var avgResolutionTime float64 + err := r.db.QueryRow(ctx, query).Scan(&avgResolutionTime) + if err != nil { + return 0, err + } + + return avgResolutionTime, nil +} + +func (r *ClickHouseReader) GetAvgResolutionTimeByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) { + + step := common.MinAllowedStepInterval(params.Start, params.End) + + tmpl := ` +WITH firing_events AS ( + SELECT + rule_id, + state, + unix_milli AS firing_time + FROM %s.%s + WHERE overall_state = 'firing' + AND overall_state_changed = true + AND rule_id IN ('%s') + AND unix_milli >= %d AND unix_milli <= %d +), +resolution_events AS ( + SELECT + rule_id, + state, + unix_milli AS resolution_time + FROM %s.%s + WHERE overall_state = 'normal' + AND overall_state_changed = true + AND rule_id IN ('%s') + AND unix_milli >= %d AND unix_milli <= %d +), +matched_events AS ( + SELECT + f.rule_id, + f.state, + f.firing_time, + MIN(r.resolution_time) AS resolution_time + FROM firing_events f + LEFT JOIN resolution_events r + ON f.rule_id = r.rule_id + WHERE r.resolution_time > f.firing_time + GROUP BY f.rule_id, f.state, f.firing_time +) +SELECT toStartOfInterval(toDateTime(firing_time / 1000), INTERVAL %d SECOND) AS ts, AVG(resolution_time - firing_time) / 1000 AS avg_resolution_time +FROM matched_events +GROUP BY ts +ORDER BY ts ASC;` + + query := fmt.Sprintf(tmpl, + signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End, + signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End, step) + + result, err := r.GetTimeSeriesResultV3(ctx, query) + if err != nil || len(result) == 0 { + return nil, err + } + + return result[0], nil +} + +func (r *ClickHouseReader) GetTotalTriggers(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (uint64, error) { + query := fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d", + signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End) + + var totalTriggers uint64 + err := r.db.QueryRow(ctx, query).Scan(&totalTriggers) + if err != nil { + return 0, err + } + + return totalTriggers, nil +} + +func (r *ClickHouseReader) GetTriggersByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) { + step := common.MinAllowedStepInterval(params.Start, params.End) + + query := fmt.Sprintf("SELECT count(*), toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d GROUP BY ts ORDER BY ts ASC", + step, signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End) + + result, err := r.GetTimeSeriesResultV3(ctx, query) + if err != nil || len(result) == 0 { + return nil, err + } + + return result[0], nil +} + func (r *ClickHouseReader) GetMinAndMaxTimestampForTraceID(ctx context.Context, traceID []string) (int64, int64, error) { var minTime, maxTime time.Time diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 136dbe6f14..07a1814ac7 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "math" "net/http" "regexp" "slices" @@ -343,6 +344,10 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) { router.HandleFunc("/api/v1/rules/{id}", am.EditAccess(aH.deleteRule)).Methods(http.MethodDelete) router.HandleFunc("/api/v1/rules/{id}", am.EditAccess(aH.patchRule)).Methods(http.MethodPatch) router.HandleFunc("/api/v1/testRule", am.EditAccess(aH.testRule)).Methods(http.MethodPost) + router.HandleFunc("/api/v1/rules/{id}/history/stats", am.ViewAccess(aH.getRuleStats)).Methods(http.MethodPost) + router.HandleFunc("/api/v1/rules/{id}/history/timeline", am.ViewAccess(aH.getRuleStateHistory)).Methods(http.MethodPost) + router.HandleFunc("/api/v1/rules/{id}/history/top_contributors", am.ViewAccess(aH.getRuleStateHistoryTopContributors)).Methods(http.MethodPost) + router.HandleFunc("/api/v1/rules/{id}/history/overall_status", am.ViewAccess(aH.getOverallStateTransitions)).Methods(http.MethodPost) router.HandleFunc("/api/v1/downtime_schedules", am.OpenAccess(aH.listDowntimeSchedules)).Methods(http.MethodGet) router.HandleFunc("/api/v1/downtime_schedules/{id}", am.OpenAccess(aH.getDowntimeSchedule)).Methods(http.MethodGet) @@ -625,6 +630,159 @@ func (aH *APIHandler) deleteDowntimeSchedule(w http.ResponseWriter, r *http.Requ aH.Respond(w, nil) } +func (aH *APIHandler) getRuleStats(w http.ResponseWriter, r *http.Request) { + ruleID := mux.Vars(r)["id"] + params := v3.QueryRuleStateHistory{} + err := json.NewDecoder(r.Body).Decode(¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + totalCurrentTriggers, err := aH.reader.GetTotalTriggers(r.Context(), ruleID, ¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + currentTriggersSeries, err := aH.reader.GetTriggersByInterval(r.Context(), ruleID, ¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + currentAvgResolutionTime, err := aH.reader.GetAvgResolutionTime(r.Context(), ruleID, ¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + currentAvgResolutionTimeSeries, err := aH.reader.GetAvgResolutionTimeByInterval(r.Context(), ruleID, ¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + if params.End-params.Start >= 86400000 { + days := int64(math.Ceil(float64(params.End-params.Start) / 86400000)) + params.Start -= days * 86400000 + params.End -= days * 86400000 + } else { + params.Start -= 86400000 + params.End -= 86400000 + } + + totalPastTriggers, err := aH.reader.GetTotalTriggers(r.Context(), ruleID, ¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + pastTriggersSeries, err := aH.reader.GetTriggersByInterval(r.Context(), ruleID, ¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + pastAvgResolutionTime, err := aH.reader.GetAvgResolutionTime(r.Context(), ruleID, ¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + pastAvgResolutionTimeSeries, err := aH.reader.GetAvgResolutionTimeByInterval(r.Context(), ruleID, ¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + stats := v3.Stats{ + TotalCurrentTriggers: totalCurrentTriggers, + TotalPastTriggers: totalPastTriggers, + CurrentTriggersSeries: currentTriggersSeries, + PastTriggersSeries: pastTriggersSeries, + CurrentAvgResolutionTime: strconv.FormatFloat(currentAvgResolutionTime, 'f', -1, 64), + PastAvgResolutionTime: strconv.FormatFloat(pastAvgResolutionTime, 'f', -1, 64), + CurrentAvgResolutionTimeSeries: currentAvgResolutionTimeSeries, + PastAvgResolutionTimeSeries: pastAvgResolutionTimeSeries, + } + + aH.Respond(w, stats) +} + +func (aH *APIHandler) getOverallStateTransitions(w http.ResponseWriter, r *http.Request) { + ruleID := mux.Vars(r)["id"] + params := v3.QueryRuleStateHistory{} + err := json.NewDecoder(r.Body).Decode(¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + res, err := aH.reader.GetOverallStateTransitions(r.Context(), ruleID, ¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + stateItems := []v3.ReleStateItem{} + + for idx, item := range res { + start := item.FiringTime + end := item.ResolutionTime + stateItems = append(stateItems, v3.ReleStateItem{ + State: item.State, + Start: start, + End: end, + }) + if idx < len(res)-1 { + nextStart := res[idx+1].FiringTime + if nextStart > end { + stateItems = append(stateItems, v3.ReleStateItem{ + State: "normal", + Start: end, + End: nextStart, + }) + } + } + } + + aH.Respond(w, stateItems) +} + +func (aH *APIHandler) getRuleStateHistory(w http.ResponseWriter, r *http.Request) { + ruleID := mux.Vars(r)["id"] + params := v3.QueryRuleStateHistory{} + err := json.NewDecoder(r.Body).Decode(¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + if err := params.Validate(); err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + res, err := aH.reader.ReadRuleStateHistoryByRuleID(r.Context(), ruleID, ¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + aH.Respond(w, res) +} + +func (aH *APIHandler) getRuleStateHistoryTopContributors(w http.ResponseWriter, r *http.Request) { + ruleID := mux.Vars(r)["id"] + params := v3.QueryRuleStateHistory{} + err := json.NewDecoder(r.Body).Decode(¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + res, err := aH.reader.ReadRuleStateHistoryTopContributorsByRuleID(r.Context(), ruleID, ¶ms) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + aH.Respond(w, res) +} + func (aH *APIHandler) listRules(w http.ResponseWriter, r *http.Request) { rules, err := aH.ruleManager.ListRuleStates(r.Context()) diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 0e6ca010ab..254b6f6f78 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -30,6 +30,7 @@ import ( opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model" "go.signoz.io/signoz/pkg/query-service/app/preferences" "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/query-service/migrate" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/app/explorer" @@ -147,6 +148,13 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } + go func() { + err = migrate.ClickHouseMigrate(reader.GetConn(), serverOptions.Cluster) + if err != nil { + zap.L().Error("error while running clickhouse migrations", zap.Error(err)) + } + }() + var c cache.Cache if serverOptions.CacheConfigPath != "" { cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 53c5ae8d68..819f452f3b 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -108,6 +108,14 @@ type Reader interface { GetMetricMetadata(context.Context, string, string) (*v3.MetricMetadataResponse, error) + AddRuleStateHistory(ctx context.Context, ruleStateHistory []v3.RuleStateHistory) error + GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateTransition, error) + ReadRuleStateHistoryByRuleID(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateHistory, error) + GetTotalTriggers(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (uint64, error) + GetTriggersByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) + GetAvgResolutionTime(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (float64, error) + GetAvgResolutionTimeByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) + ReadRuleStateHistoryTopContributorsByRuleID(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateHistoryContributor, error) GetMinAndMaxTimestampForTraceID(ctx context.Context, traceID []string) (int64, int64, error) } diff --git a/pkg/query-service/migrate/migate.go b/pkg/query-service/migrate/migate.go index 4fe61764bc..bf5f3a7738 100644 --- a/pkg/query-service/migrate/migate.go +++ b/pkg/query-service/migrate/migate.go @@ -1,8 +1,11 @@ package migrate import ( + "context" "database/sql" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/jmoiron/sqlx" alertstov4 "go.signoz.io/signoz/pkg/query-service/migrate/0_45_alerts_to_v4" alertscustomstep "go.signoz.io/signoz/pkg/query-service/migrate/0_47_alerts_custom_step" @@ -77,3 +80,90 @@ func Migrate(dsn string) error { return nil } + +func ClickHouseMigrate(conn driver.Conn, cluster string) error { + + database := "CREATE DATABASE IF NOT EXISTS signoz_analytics ON CLUSTER %s" + + localTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.rule_state_history ON CLUSTER %s +( + _retention_days UInt32 DEFAULT 180, + rule_id LowCardinality(String), + rule_name LowCardinality(String), + overall_state LowCardinality(String), + overall_state_changed Bool, + state LowCardinality(String), + state_changed Bool, + unix_milli Int64 CODEC(Delta(8), ZSTD(1)), + fingerprint UInt64 CODEC(ZSTD(1)), + value Float64 CODEC(Gorilla, ZSTD(1)), + labels String CODEC(ZSTD(5)), +) +ENGINE = MergeTree +PARTITION BY toDate(unix_milli / 1000) +ORDER BY (rule_id, unix_milli) +TTL toDateTime(unix_milli / 1000) + toIntervalDay(_retention_days) +SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192` + + distributedTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.distributed_rule_state_history ON CLUSTER %s +( + rule_id LowCardinality(String), + rule_name LowCardinality(String), + overall_state LowCardinality(String), + overall_state_changed Bool, + state LowCardinality(String), + state_changed Bool, + unix_milli Int64 CODEC(Delta(8), ZSTD(1)), + fingerprint UInt64 CODEC(ZSTD(1)), + value Float64 CODEC(Gorilla, ZSTD(1)), + labels String CODEC(ZSTD(5)), +) +ENGINE = Distributed(%s, signoz_analytics, rule_state_history, cityHash64(rule_id, rule_name, fingerprint))` + + // check if db exists + dbExists := `SELECT count(*) FROM system.databases WHERE name = 'signoz_analytics'` + var count uint64 + err := conn.QueryRow(context.Background(), dbExists).Scan(&count) + if err != nil { + return err + } + + if count == 0 { + err = conn.Exec(context.Background(), fmt.Sprintf(database, cluster)) + if err != nil { + return err + } + } + + // check if table exists + tableExists := `SELECT count(*) FROM system.tables WHERE name = 'rule_state_history' AND database = 'signoz_analytics'` + var tableCount uint64 + err = conn.QueryRow(context.Background(), tableExists).Scan(&tableCount) + if err != nil { + return err + } + + if tableCount == 0 { + err = conn.Exec(context.Background(), fmt.Sprintf(localTable, cluster)) + if err != nil { + return err + } + } + + // check if distributed table exists + distributedTableExists := `SELECT count(*) FROM system.tables WHERE name = 'distributed_rule_state_history' AND database = 'signoz_analytics'` + var distributedTableCount uint64 + err = conn.QueryRow(context.Background(), distributedTableExists).Scan(&distributedTableCount) + if err != nil { + return err + } + + if distributedTableCount == 0 { + err = conn.Exec(context.Background(), fmt.Sprintf(distributedTable, cluster, cluster)) + if err != nil { + return err + } + } + + return nil +} diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 76e781ffc1..9d388b32e2 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -1038,8 +1038,8 @@ type LogsLiveTailClient struct { } type Series struct { - Labels map[string]string `json:"labels"` - LabelsArray []map[string]string `json:"labelsArray"` + Labels map[string]string `json:"labels,omitempty"` + LabelsArray []map[string]string `json:"labelsArray,omitempty"` Points []Point `json:"values"` } @@ -1154,3 +1154,92 @@ type MetricMetadataResponse struct { IsMonotonic bool `json:"isMonotonic"` Temporality string `json:"temporality"` } + +type LabelsString string + +func (l *LabelsString) MarshalJSON() ([]byte, error) { + lbls := make(map[string]string) + err := json.Unmarshal([]byte(*l), &lbls) + if err != nil { + return nil, err + } + return json.Marshal(lbls) +} + +func (l *LabelsString) Scan(src interface{}) error { + if data, ok := src.(string); ok { + *l = LabelsString(data) + } + return nil +} + +func (l LabelsString) String() string { + return string(l) +} + +type RuleStateHistory struct { + RuleID string `json:"ruleID" ch:"rule_id"` + RuleName string `json:"ruleName" ch:"rule_name"` + // One of ["normal", "firing"] + OverallState string `json:"overallState" ch:"overall_state"` + OverallStateChanged bool `json:"overallStateChanged" ch:"overall_state_changed"` + // One of ["normal", "firing", "no_data", "muted"] + State string `json:"state" ch:"state"` + StateChanged bool `json:"stateChanged" ch:"state_changed"` + UnixMilli int64 `json:"unixMilli" ch:"unix_milli"` + Labels LabelsString `json:"labels" ch:"labels"` + Fingerprint uint64 `json:"fingerprint" ch:"fingerprint"` + Value float64 `json:"value" ch:"value"` +} + +type QueryRuleStateHistory struct { + Start int64 `json:"start"` + End int64 `json:"end"` + Filters *FilterSet `json:"filters"` + Offset int64 `json:"offset"` + Limit int64 `json:"limit"` + Order string `json:"order"` +} + +func (r *QueryRuleStateHistory) Validate() error { + if r.Start == 0 || r.End == 0 { + return fmt.Errorf("start and end are required") + } + if r.Offset < 0 || r.Limit < 0 { + return fmt.Errorf("offset and limit must be greater than 0") + } + if r.Order != "asc" && r.Order != "desc" { + return fmt.Errorf("order must be asc or desc") + } + return nil +} + +type RuleStateHistoryContributor struct { + Fingerprint uint64 `json:"fingerprint" ch:"fingerprint"` + Labels LabelsString `json:"labels" ch:"labels"` + Count uint64 `json:"count" ch:"count"` +} + +type RuleStateTransition struct { + RuleID string `json:"ruleID" ch:"rule_id"` + State string `json:"state" ch:"state"` + FiringTime int64 `json:"firingTime" ch:"firing_time"` + ResolutionTime int64 `json:"resolutionTime" ch:"resolution_time"` +} + +type ReleStateItem struct { + State string `json:"state"` + Start int64 `json:"start"` + End int64 `json:"end"` +} + +type Stats struct { + TotalCurrentTriggers uint64 `json:"totalCurrentTriggers"` + TotalPastTriggers uint64 `json:"totalPastTriggers"` + CurrentTriggersSeries *Series `json:"currentTriggersSeries"` + PastTriggersSeries *Series `json:"pastTriggersSeries"` + CurrentAvgResolutionTime string `json:"currentAvgResolutionTime"` + PastAvgResolutionTime string `json:"pastAvgResolutionTime"` + CurrentAvgResolutionTimeSeries *Series `json:"currentAvgResolutionTimeSeries"` + PastAvgResolutionTimeSeries *Series `json:"pastAvgResolutionTimeSeries"` +} diff --git a/pkg/query-service/rules/alerting.go b/pkg/query-service/rules/alerting.go index b2f511c6c0..e5660e0dfe 100644 --- a/pkg/query-service/rules/alerting.go +++ b/pkg/query-service/rules/alerting.go @@ -78,6 +78,8 @@ type Alert struct { ResolvedAt time.Time LastSentAt time.Time ValidUntil time.Time + + Missing bool } func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool { diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index d660d863f1..40764f9fb0 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -553,6 +553,7 @@ func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string r, log.With(m.logger, "alert", r.AlertName), PromRuleOpts{}, + m.reader, ) if err != nil { @@ -912,6 +913,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m PromRuleOpts{ SendAlways: true, }, + m.reader, ) if err != nil { diff --git a/pkg/query-service/rules/promRule.go b/pkg/query-service/rules/promRule.go index fd00363abf..235a5cf825 100644 --- a/pkg/query-service/rules/promRule.go +++ b/pkg/query-service/rules/promRule.go @@ -2,6 +2,7 @@ package rules import ( "context" + "encoding/json" "fmt" "math" "sync" @@ -15,6 +16,7 @@ import ( pql "github.com/prometheus/prometheus/promql" "go.signoz.io/signoz/pkg/query-service/converter" "go.signoz.io/signoz/pkg/query-service/formatter" + "go.signoz.io/signoz/pkg/query-service/interfaces" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" qslabels "go.signoz.io/signoz/pkg/query-service/utils/labels" "go.signoz.io/signoz/pkg/query-service/utils/times" @@ -54,6 +56,8 @@ type PromRule struct { logger log.Logger opts PromRuleOpts + + reader interfaces.Reader } func NewPromRule( @@ -61,6 +65,7 @@ func NewPromRule( postableRule *PostableRule, logger log.Logger, opts PromRuleOpts, + reader interfaces.Reader, ) (*PromRule, error) { if postableRule.RuleCondition == nil { @@ -83,6 +88,7 @@ func NewPromRule( logger: logger, opts: opts, } + p.reader = reader if int64(p.evalWindow) == 0 { p.evalWindow = 5 * time.Minute @@ -215,8 +221,6 @@ func (r *PromRule) GetEvaluationTimestamp() time.Time { // State returns the maximum state of alert instances for this rule. // StateFiring > StatePending > StateInactive func (r *PromRule) State() AlertState { - r.mtx.Lock() - defer r.mtx.Unlock() maxState := StateInactive for _, a := range r.active { @@ -338,6 +342,8 @@ func (r *PromRule) compareOp() CompareOp { func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) { + prevState := r.State() + start := ts.Add(-r.evalWindow) end := ts interval := 60 * time.Second // TODO(srikanthccv): this should be configurable @@ -459,8 +465,14 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( } + itemsToAdd := []v3.RuleStateHistory{} + // Check if any pending alerts should be removed or fire now. Write out alert timeseries. for fp, a := range r.active { + labelsJSON, err := json.Marshal(a.Labels) + if err != nil { + zap.L().Error("error marshaling labels", zap.Error(err), zap.String("name", r.Name())) + } if _, ok := resultFPs[fp]; !ok { // If the alert was previously firing, keep it around for a given // retention time so it is reported as resolved to the AlertManager. @@ -470,6 +482,15 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( if a.State != StateInactive { a.State = StateInactive a.ResolvedAt = ts + itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{ + RuleID: r.ID(), + RuleName: r.Name(), + State: "normal", + StateChanged: true, + UnixMilli: ts.UnixMilli(), + Labels: v3.LabelsString(labelsJSON), + Fingerprint: a.Labels.Hash(), + }) } continue } @@ -477,12 +498,46 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration { a.State = StateFiring a.FiredAt = ts + state := "firing" + if a.Missing { + state = "no_data" + } + itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{ + RuleID: r.ID(), + RuleName: r.Name(), + State: state, + StateChanged: true, + UnixMilli: ts.UnixMilli(), + Labels: v3.LabelsString(labelsJSON), + Fingerprint: a.Labels.Hash(), + Value: a.Value, + }) } } r.health = HealthGood r.lastError = err + currentState := r.State() + + if currentState != prevState { + for idx := range itemsToAdd { + if currentState == StateInactive { + itemsToAdd[idx].OverallState = "normal" + } else { + itemsToAdd[idx].OverallState = currentState.String() + } + itemsToAdd[idx].OverallStateChanged = true + } + } + + if len(itemsToAdd) > 0 && r.reader != nil { + err := r.reader.AddRuleStateHistory(ctx, itemsToAdd) + if err != nil { + zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd)) + } + } + return len(r.active), nil } diff --git a/pkg/query-service/rules/promrule_test.go b/pkg/query-service/rules/promrule_test.go index 0707933b89..a06b510f2e 100644 --- a/pkg/query-service/rules/promrule_test.go +++ b/pkg/query-service/rules/promrule_test.go @@ -611,7 +611,7 @@ func TestPromRuleShouldAlert(t *testing.T) { postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewPromRule("69", &postableRule, testLogger{t}, PromRuleOpts{}) + rule, err := NewPromRule("69", &postableRule, testLogger{t}, PromRuleOpts{}, nil) if err != nil { assert.NoError(t, err) } diff --git a/pkg/query-service/rules/thresholdRule.go b/pkg/query-service/rules/thresholdRule.go index 5426c04449..ef65f7966f 100644 --- a/pkg/query-service/rules/thresholdRule.go +++ b/pkg/query-service/rules/thresholdRule.go @@ -76,6 +76,7 @@ type ThresholdRule struct { querier interfaces.Querier querierV2 interfaces.Querier + reader interfaces.Reader evalDelay time.Duration } @@ -150,6 +151,7 @@ func NewThresholdRule( t.querier = querier.NewQuerier(querierOption) t.querierV2 = querierV2.NewQuerier(querierOptsV2) + t.reader = reader zap.L().Info("creating new ThresholdRule", zap.String("name", t.name), zap.String("id", t.id)) @@ -287,8 +289,6 @@ func (r *ThresholdRule) GetEvaluationTimestamp() time.Time { // StateFiring > StatePending > StateInactive func (r *ThresholdRule) State() AlertState { - r.mtx.Lock() - defer r.mtx.Unlock() maxState := StateInactive for _, a := range r.active { if a.State > maxState { @@ -873,6 +873,8 @@ func normalizeLabelName(name string) string { func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) { + prevState := r.State() + valueFormatter := formatter.FromUnit(r.Unit()) res, err := r.buildAndRunQuery(ctx, ts, queriers.Ch) @@ -980,6 +982,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie Value: smpl.V, GeneratorURL: r.GeneratorURL(), Receivers: r.preferredChannels, + Missing: smpl.IsMissing, } } @@ -1001,8 +1004,14 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie } + itemsToAdd := []v3.RuleStateHistory{} + // Check if any pending alerts should be removed or fire now. Write out alert timeseries. for fp, a := range r.active { + labelsJSON, err := json.Marshal(a.Labels) + if err != nil { + zap.L().Error("error marshaling labels", zap.Error(err), zap.Any("labels", a.Labels)) + } if _, ok := resultFPs[fp]; !ok { // If the alert was previously firing, keep it around for a given // retention time so it is reported as resolved to the AlertManager. @@ -1012,6 +1021,15 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie if a.State != StateInactive { a.State = StateInactive a.ResolvedAt = ts + itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{ + RuleID: r.ID(), + RuleName: r.Name(), + State: "normal", + StateChanged: true, + UnixMilli: ts.UnixMilli(), + Labels: v3.LabelsString(labelsJSON), + Fingerprint: a.Labels.Hash(), + }) } continue } @@ -1019,8 +1037,46 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration { a.State = StateFiring a.FiredAt = ts + state := "firing" + if a.Missing { + state = "no_data" + } + itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{ + RuleID: r.ID(), + RuleName: r.Name(), + State: state, + StateChanged: true, + UnixMilli: ts.UnixMilli(), + Labels: v3.LabelsString(labelsJSON), + Fingerprint: a.Labels.Hash(), + Value: a.Value, + }) } + } + currentState := r.State() + + if currentState != prevState { + for idx := range itemsToAdd { + if currentState == StateInactive { + itemsToAdd[idx].OverallState = "normal" + } else { + itemsToAdd[idx].OverallState = currentState.String() + } + itemsToAdd[idx].OverallStateChanged = true + } + } else { + for idx := range itemsToAdd { + itemsToAdd[idx].OverallState = currentState.String() + itemsToAdd[idx].OverallStateChanged = false + } + } + + if len(itemsToAdd) > 0 && r.reader != nil { + err := r.reader.AddRuleStateHistory(ctx, itemsToAdd) + if err != nil { + zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd)) + } } r.health = HealthGood r.lastError = err