chore: add alerts state history query service impl (#5255)

This commit is contained in:
Srikanth Chekuri 2024-08-09 12:11:05 +05:30 committed by GitHub
parent 52199361d5
commit bb84960442
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 798 additions and 7 deletions

View File

@ -28,6 +28,7 @@ import (
"go.signoz.io/signoz/ee/query-service/integrations/gateway"
"go.signoz.io/signoz/ee/query-service/interfaces"
baseauth "go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/migrate"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
@ -179,6 +180,13 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err
}
go func() {
err = migrate.ClickHouseMigrate(reader.GetConn(), serverOptions.Cluster)
if err != nil {
zap.L().Error("error while running clickhouse migrations", zap.Error(err))
}
}()
// initiate opamp
_, err = opAmpModel.InitDB(localDB)
if err != nil {

View File

@ -62,6 +62,8 @@ const (
primaryNamespace = "clickhouse"
archiveNamespace = "clickhouse-archive"
signozTraceDBName = "signoz_traces"
signozHistoryDBName = "signoz_analytics"
ruleStateHistoryTableName = "distributed_rule_state_history"
signozDurationMVTable = "distributed_durationSort"
signozUsageExplorerTable = "distributed_usage_explorer"
signozSpansTable = "distributed_signoz_spans"
@ -5125,6 +5127,319 @@ func (r *ClickHouseReader) LiveTailLogsV3(ctx context.Context, query string, tim
}
}
func (r *ClickHouseReader) AddRuleStateHistory(ctx context.Context, ruleStateHistory []v3.RuleStateHistory) error {
var statement driver.Batch
var err error
defer func() {
if statement != nil {
statement.Abort()
}
}()
statement, err = r.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (rule_id, rule_name, overall_state, overall_state_changed, state, state_changed, unix_milli, labels, fingerprint, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
signozHistoryDBName, ruleStateHistoryTableName))
if err != nil {
return err
}
for _, history := range ruleStateHistory {
err = statement.Append(history.RuleID, history.RuleName, history.OverallState, history.OverallStateChanged, history.State, history.StateChanged, history.UnixMilli, history.Labels, history.Fingerprint, history.Value)
if err != nil {
return err
}
}
err = statement.Send()
if err != nil {
return err
}
return nil
}
func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateHistory, error) {
var conditions []string
conditions = append(conditions, fmt.Sprintf("rule_id = '%s'", ruleID))
conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", params.Start, params.End))
if params.Filters != nil && len(params.Filters.Items) != 0 {
for _, item := range params.Filters.Items {
toFormat := item.Value
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains {
toFormat = fmt.Sprintf("%%%s%%", toFormat)
}
fmtVal := utils.ClickHouseFormattedValue(toFormat)
switch op {
case v3.FilterOperatorEqual:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal))
case v3.FilterOperatorNotEqual:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal))
case v3.FilterOperatorIn:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal))
case v3.FilterOperatorNotIn:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal))
case v3.FilterOperatorLike:
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorNotLike:
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorRegex:
conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorNotRegex:
conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorGreaterThan:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal))
case v3.FilterOperatorGreaterThanOrEq:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal))
case v3.FilterOperatorLessThan:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal))
case v3.FilterOperatorLessThanOrEq:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal))
case v3.FilterOperatorContains:
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorNotContains:
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorExists:
conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", item.Key.Key))
case v3.FilterOperatorNotExists:
conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", item.Key.Key))
default:
return nil, fmt.Errorf("unsupported filter operator")
}
}
}
whereClause := strings.Join(conditions, " AND ")
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s ORDER BY unix_milli %s LIMIT %d OFFSET %d",
signozHistoryDBName, ruleStateHistoryTableName, whereClause, params.Order, params.Limit, params.Offset)
history := []v3.RuleStateHistory{}
err := r.db.Select(ctx, &history, query)
if err != nil {
zap.L().Error("Error while reading rule state history", zap.Error(err))
return nil, err
}
return history, nil
}
func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID(
ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateHistoryContributor, error) {
query := fmt.Sprintf(`SELECT
fingerprint,
any(labels) as labels,
count(*) as count
FROM %s.%s
WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d
GROUP BY fingerprint
ORDER BY count DESC`,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
contributors := []v3.RuleStateHistoryContributor{}
err := r.db.Select(ctx, &contributors, query)
if err != nil {
zap.L().Error("Error while reading rule state history", zap.Error(err))
return nil, err
}
return contributors, nil
}
func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateTransition, error) {
tmpl := `WITH firing_events AS (
SELECT
rule_id,
state,
unix_milli AS firing_time
FROM %s.%s
WHERE overall_state = 'firing'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
),
resolution_events AS (
SELECT
rule_id,
state,
unix_milli AS resolution_time
FROM %s.%s
WHERE overall_state = 'normal'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
),
matched_events AS (
SELECT
f.rule_id,
f.state,
f.firing_time,
MIN(r.resolution_time) AS resolution_time
FROM firing_events f
LEFT JOIN resolution_events r
ON f.rule_id = r.rule_id
WHERE r.resolution_time > f.firing_time
GROUP BY f.rule_id, f.state, f.firing_time
)
SELECT *
FROM matched_events
ORDER BY firing_time ASC;`
query := fmt.Sprintf(tmpl,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
transitions := []v3.RuleStateTransition{}
err := r.db.Select(ctx, &transitions, query)
if err != nil {
return nil, err
}
return transitions, nil
}
func (r *ClickHouseReader) GetAvgResolutionTime(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (float64, error) {
tmpl := `
WITH firing_events AS (
SELECT
rule_id,
state,
unix_milli AS firing_time
FROM %s.%s
WHERE overall_state = 'firing'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
),
resolution_events AS (
SELECT
rule_id,
state,
unix_milli AS resolution_time
FROM %s.%s
WHERE overall_state = 'normal'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
),
matched_events AS (
SELECT
f.rule_id,
f.state,
f.firing_time,
MIN(r.resolution_time) AS resolution_time
FROM firing_events f
LEFT JOIN resolution_events r
ON f.rule_id = r.rule_id
WHERE r.resolution_time > f.firing_time
GROUP BY f.rule_id, f.state, f.firing_time
)
SELECT AVG(resolution_time - firing_time) / 1000 AS avg_resolution_time
FROM matched_events;
`
query := fmt.Sprintf(tmpl,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
var avgResolutionTime float64
err := r.db.QueryRow(ctx, query).Scan(&avgResolutionTime)
if err != nil {
return 0, err
}
return avgResolutionTime, nil
}
func (r *ClickHouseReader) GetAvgResolutionTimeByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) {
step := common.MinAllowedStepInterval(params.Start, params.End)
tmpl := `
WITH firing_events AS (
SELECT
rule_id,
state,
unix_milli AS firing_time
FROM %s.%s
WHERE overall_state = 'firing'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
),
resolution_events AS (
SELECT
rule_id,
state,
unix_milli AS resolution_time
FROM %s.%s
WHERE overall_state = 'normal'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
),
matched_events AS (
SELECT
f.rule_id,
f.state,
f.firing_time,
MIN(r.resolution_time) AS resolution_time
FROM firing_events f
LEFT JOIN resolution_events r
ON f.rule_id = r.rule_id
WHERE r.resolution_time > f.firing_time
GROUP BY f.rule_id, f.state, f.firing_time
)
SELECT toStartOfInterval(toDateTime(firing_time / 1000), INTERVAL %d SECOND) AS ts, AVG(resolution_time - firing_time) / 1000 AS avg_resolution_time
FROM matched_events
GROUP BY ts
ORDER BY ts ASC;`
query := fmt.Sprintf(tmpl,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End, step)
result, err := r.GetTimeSeriesResultV3(ctx, query)
if err != nil || len(result) == 0 {
return nil, err
}
return result[0], nil
}
func (r *ClickHouseReader) GetTotalTriggers(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (uint64, error) {
query := fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d",
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
var totalTriggers uint64
err := r.db.QueryRow(ctx, query).Scan(&totalTriggers)
if err != nil {
return 0, err
}
return totalTriggers, nil
}
func (r *ClickHouseReader) GetTriggersByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) {
step := common.MinAllowedStepInterval(params.Start, params.End)
query := fmt.Sprintf("SELECT count(*), toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d GROUP BY ts ORDER BY ts ASC",
step, signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
result, err := r.GetTimeSeriesResultV3(ctx, query)
if err != nil || len(result) == 0 {
return nil, err
}
return result[0], nil
}
func (r *ClickHouseReader) GetMinAndMaxTimestampForTraceID(ctx context.Context, traceID []string) (int64, int64, error) {
var minTime, maxTime time.Time

View File

@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"math"
"net/http"
"regexp"
"slices"
@ -343,6 +344,10 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
router.HandleFunc("/api/v1/rules/{id}", am.EditAccess(aH.deleteRule)).Methods(http.MethodDelete)
router.HandleFunc("/api/v1/rules/{id}", am.EditAccess(aH.patchRule)).Methods(http.MethodPatch)
router.HandleFunc("/api/v1/testRule", am.EditAccess(aH.testRule)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/rules/{id}/history/stats", am.ViewAccess(aH.getRuleStats)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/rules/{id}/history/timeline", am.ViewAccess(aH.getRuleStateHistory)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/rules/{id}/history/top_contributors", am.ViewAccess(aH.getRuleStateHistoryTopContributors)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/rules/{id}/history/overall_status", am.ViewAccess(aH.getOverallStateTransitions)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/downtime_schedules", am.OpenAccess(aH.listDowntimeSchedules)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/downtime_schedules/{id}", am.OpenAccess(aH.getDowntimeSchedule)).Methods(http.MethodGet)
@ -625,6 +630,159 @@ func (aH *APIHandler) deleteDowntimeSchedule(w http.ResponseWriter, r *http.Requ
aH.Respond(w, nil)
}
func (aH *APIHandler) getRuleStats(w http.ResponseWriter, r *http.Request) {
ruleID := mux.Vars(r)["id"]
params := v3.QueryRuleStateHistory{}
err := json.NewDecoder(r.Body).Decode(&params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
totalCurrentTriggers, err := aH.reader.GetTotalTriggers(r.Context(), ruleID, &params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
currentTriggersSeries, err := aH.reader.GetTriggersByInterval(r.Context(), ruleID, &params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
currentAvgResolutionTime, err := aH.reader.GetAvgResolutionTime(r.Context(), ruleID, &params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
currentAvgResolutionTimeSeries, err := aH.reader.GetAvgResolutionTimeByInterval(r.Context(), ruleID, &params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
if params.End-params.Start >= 86400000 {
days := int64(math.Ceil(float64(params.End-params.Start) / 86400000))
params.Start -= days * 86400000
params.End -= days * 86400000
} else {
params.Start -= 86400000
params.End -= 86400000
}
totalPastTriggers, err := aH.reader.GetTotalTriggers(r.Context(), ruleID, &params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
pastTriggersSeries, err := aH.reader.GetTriggersByInterval(r.Context(), ruleID, &params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
pastAvgResolutionTime, err := aH.reader.GetAvgResolutionTime(r.Context(), ruleID, &params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
pastAvgResolutionTimeSeries, err := aH.reader.GetAvgResolutionTimeByInterval(r.Context(), ruleID, &params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
stats := v3.Stats{
TotalCurrentTriggers: totalCurrentTriggers,
TotalPastTriggers: totalPastTriggers,
CurrentTriggersSeries: currentTriggersSeries,
PastTriggersSeries: pastTriggersSeries,
CurrentAvgResolutionTime: strconv.FormatFloat(currentAvgResolutionTime, 'f', -1, 64),
PastAvgResolutionTime: strconv.FormatFloat(pastAvgResolutionTime, 'f', -1, 64),
CurrentAvgResolutionTimeSeries: currentAvgResolutionTimeSeries,
PastAvgResolutionTimeSeries: pastAvgResolutionTimeSeries,
}
aH.Respond(w, stats)
}
func (aH *APIHandler) getOverallStateTransitions(w http.ResponseWriter, r *http.Request) {
ruleID := mux.Vars(r)["id"]
params := v3.QueryRuleStateHistory{}
err := json.NewDecoder(r.Body).Decode(&params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
res, err := aH.reader.GetOverallStateTransitions(r.Context(), ruleID, &params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
stateItems := []v3.ReleStateItem{}
for idx, item := range res {
start := item.FiringTime
end := item.ResolutionTime
stateItems = append(stateItems, v3.ReleStateItem{
State: item.State,
Start: start,
End: end,
})
if idx < len(res)-1 {
nextStart := res[idx+1].FiringTime
if nextStart > end {
stateItems = append(stateItems, v3.ReleStateItem{
State: "normal",
Start: end,
End: nextStart,
})
}
}
}
aH.Respond(w, stateItems)
}
func (aH *APIHandler) getRuleStateHistory(w http.ResponseWriter, r *http.Request) {
ruleID := mux.Vars(r)["id"]
params := v3.QueryRuleStateHistory{}
err := json.NewDecoder(r.Body).Decode(&params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
if err := params.Validate(); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
res, err := aH.reader.ReadRuleStateHistoryByRuleID(r.Context(), ruleID, &params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, res)
}
func (aH *APIHandler) getRuleStateHistoryTopContributors(w http.ResponseWriter, r *http.Request) {
ruleID := mux.Vars(r)["id"]
params := v3.QueryRuleStateHistory{}
err := json.NewDecoder(r.Body).Decode(&params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
res, err := aH.reader.ReadRuleStateHistoryTopContributorsByRuleID(r.Context(), ruleID, &params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, res)
}
func (aH *APIHandler) listRules(w http.ResponseWriter, r *http.Request) {
rules, err := aH.ruleManager.ListRuleStates(r.Context())

View File

@ -30,6 +30,7 @@ import (
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
"go.signoz.io/signoz/pkg/query-service/app/preferences"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/migrate"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/app/explorer"
@ -147,6 +148,13 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err
}
go func() {
err = migrate.ClickHouseMigrate(reader.GetConn(), serverOptions.Cluster)
if err != nil {
zap.L().Error("error while running clickhouse migrations", zap.Error(err))
}
}()
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)

View File

@ -108,6 +108,14 @@ type Reader interface {
GetMetricMetadata(context.Context, string, string) (*v3.MetricMetadataResponse, error)
AddRuleStateHistory(ctx context.Context, ruleStateHistory []v3.RuleStateHistory) error
GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateTransition, error)
ReadRuleStateHistoryByRuleID(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateHistory, error)
GetTotalTriggers(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (uint64, error)
GetTriggersByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error)
GetAvgResolutionTime(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (float64, error)
GetAvgResolutionTimeByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error)
ReadRuleStateHistoryTopContributorsByRuleID(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateHistoryContributor, error)
GetMinAndMaxTimestampForTraceID(ctx context.Context, traceID []string) (int64, int64, error)
}

View File

@ -1,8 +1,11 @@
package migrate
import (
"context"
"database/sql"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/jmoiron/sqlx"
alertstov4 "go.signoz.io/signoz/pkg/query-service/migrate/0_45_alerts_to_v4"
alertscustomstep "go.signoz.io/signoz/pkg/query-service/migrate/0_47_alerts_custom_step"
@ -77,3 +80,90 @@ func Migrate(dsn string) error {
return nil
}
func ClickHouseMigrate(conn driver.Conn, cluster string) error {
database := "CREATE DATABASE IF NOT EXISTS signoz_analytics ON CLUSTER %s"
localTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.rule_state_history ON CLUSTER %s
(
_retention_days UInt32 DEFAULT 180,
rule_id LowCardinality(String),
rule_name LowCardinality(String),
overall_state LowCardinality(String),
overall_state_changed Bool,
state LowCardinality(String),
state_changed Bool,
unix_milli Int64 CODEC(Delta(8), ZSTD(1)),
fingerprint UInt64 CODEC(ZSTD(1)),
value Float64 CODEC(Gorilla, ZSTD(1)),
labels String CODEC(ZSTD(5)),
)
ENGINE = MergeTree
PARTITION BY toDate(unix_milli / 1000)
ORDER BY (rule_id, unix_milli)
TTL toDateTime(unix_milli / 1000) + toIntervalDay(_retention_days)
SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192`
distributedTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.distributed_rule_state_history ON CLUSTER %s
(
rule_id LowCardinality(String),
rule_name LowCardinality(String),
overall_state LowCardinality(String),
overall_state_changed Bool,
state LowCardinality(String),
state_changed Bool,
unix_milli Int64 CODEC(Delta(8), ZSTD(1)),
fingerprint UInt64 CODEC(ZSTD(1)),
value Float64 CODEC(Gorilla, ZSTD(1)),
labels String CODEC(ZSTD(5)),
)
ENGINE = Distributed(%s, signoz_analytics, rule_state_history, cityHash64(rule_id, rule_name, fingerprint))`
// check if db exists
dbExists := `SELECT count(*) FROM system.databases WHERE name = 'signoz_analytics'`
var count uint64
err := conn.QueryRow(context.Background(), dbExists).Scan(&count)
if err != nil {
return err
}
if count == 0 {
err = conn.Exec(context.Background(), fmt.Sprintf(database, cluster))
if err != nil {
return err
}
}
// check if table exists
tableExists := `SELECT count(*) FROM system.tables WHERE name = 'rule_state_history' AND database = 'signoz_analytics'`
var tableCount uint64
err = conn.QueryRow(context.Background(), tableExists).Scan(&tableCount)
if err != nil {
return err
}
if tableCount == 0 {
err = conn.Exec(context.Background(), fmt.Sprintf(localTable, cluster))
if err != nil {
return err
}
}
// check if distributed table exists
distributedTableExists := `SELECT count(*) FROM system.tables WHERE name = 'distributed_rule_state_history' AND database = 'signoz_analytics'`
var distributedTableCount uint64
err = conn.QueryRow(context.Background(), distributedTableExists).Scan(&distributedTableCount)
if err != nil {
return err
}
if distributedTableCount == 0 {
err = conn.Exec(context.Background(), fmt.Sprintf(distributedTable, cluster, cluster))
if err != nil {
return err
}
}
return nil
}

View File

@ -1038,8 +1038,8 @@ type LogsLiveTailClient struct {
}
type Series struct {
Labels map[string]string `json:"labels"`
LabelsArray []map[string]string `json:"labelsArray"`
Labels map[string]string `json:"labels,omitempty"`
LabelsArray []map[string]string `json:"labelsArray,omitempty"`
Points []Point `json:"values"`
}
@ -1154,3 +1154,92 @@ type MetricMetadataResponse struct {
IsMonotonic bool `json:"isMonotonic"`
Temporality string `json:"temporality"`
}
type LabelsString string
func (l *LabelsString) MarshalJSON() ([]byte, error) {
lbls := make(map[string]string)
err := json.Unmarshal([]byte(*l), &lbls)
if err != nil {
return nil, err
}
return json.Marshal(lbls)
}
func (l *LabelsString) Scan(src interface{}) error {
if data, ok := src.(string); ok {
*l = LabelsString(data)
}
return nil
}
func (l LabelsString) String() string {
return string(l)
}
type RuleStateHistory struct {
RuleID string `json:"ruleID" ch:"rule_id"`
RuleName string `json:"ruleName" ch:"rule_name"`
// One of ["normal", "firing"]
OverallState string `json:"overallState" ch:"overall_state"`
OverallStateChanged bool `json:"overallStateChanged" ch:"overall_state_changed"`
// One of ["normal", "firing", "no_data", "muted"]
State string `json:"state" ch:"state"`
StateChanged bool `json:"stateChanged" ch:"state_changed"`
UnixMilli int64 `json:"unixMilli" ch:"unix_milli"`
Labels LabelsString `json:"labels" ch:"labels"`
Fingerprint uint64 `json:"fingerprint" ch:"fingerprint"`
Value float64 `json:"value" ch:"value"`
}
type QueryRuleStateHistory struct {
Start int64 `json:"start"`
End int64 `json:"end"`
Filters *FilterSet `json:"filters"`
Offset int64 `json:"offset"`
Limit int64 `json:"limit"`
Order string `json:"order"`
}
func (r *QueryRuleStateHistory) Validate() error {
if r.Start == 0 || r.End == 0 {
return fmt.Errorf("start and end are required")
}
if r.Offset < 0 || r.Limit < 0 {
return fmt.Errorf("offset and limit must be greater than 0")
}
if r.Order != "asc" && r.Order != "desc" {
return fmt.Errorf("order must be asc or desc")
}
return nil
}
type RuleStateHistoryContributor struct {
Fingerprint uint64 `json:"fingerprint" ch:"fingerprint"`
Labels LabelsString `json:"labels" ch:"labels"`
Count uint64 `json:"count" ch:"count"`
}
type RuleStateTransition struct {
RuleID string `json:"ruleID" ch:"rule_id"`
State string `json:"state" ch:"state"`
FiringTime int64 `json:"firingTime" ch:"firing_time"`
ResolutionTime int64 `json:"resolutionTime" ch:"resolution_time"`
}
type ReleStateItem struct {
State string `json:"state"`
Start int64 `json:"start"`
End int64 `json:"end"`
}
type Stats struct {
TotalCurrentTriggers uint64 `json:"totalCurrentTriggers"`
TotalPastTriggers uint64 `json:"totalPastTriggers"`
CurrentTriggersSeries *Series `json:"currentTriggersSeries"`
PastTriggersSeries *Series `json:"pastTriggersSeries"`
CurrentAvgResolutionTime string `json:"currentAvgResolutionTime"`
PastAvgResolutionTime string `json:"pastAvgResolutionTime"`
CurrentAvgResolutionTimeSeries *Series `json:"currentAvgResolutionTimeSeries"`
PastAvgResolutionTimeSeries *Series `json:"pastAvgResolutionTimeSeries"`
}

View File

@ -78,6 +78,8 @@ type Alert struct {
ResolvedAt time.Time
LastSentAt time.Time
ValidUntil time.Time
Missing bool
}
func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {

View File

@ -553,6 +553,7 @@ func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string
r,
log.With(m.logger, "alert", r.AlertName),
PromRuleOpts{},
m.reader,
)
if err != nil {
@ -912,6 +913,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
PromRuleOpts{
SendAlways: true,
},
m.reader,
)
if err != nil {

View File

@ -2,6 +2,7 @@ package rules
import (
"context"
"encoding/json"
"fmt"
"math"
"sync"
@ -15,6 +16,7 @@ import (
pql "github.com/prometheus/prometheus/promql"
"go.signoz.io/signoz/pkg/query-service/converter"
"go.signoz.io/signoz/pkg/query-service/formatter"
"go.signoz.io/signoz/pkg/query-service/interfaces"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
qslabels "go.signoz.io/signoz/pkg/query-service/utils/labels"
"go.signoz.io/signoz/pkg/query-service/utils/times"
@ -54,6 +56,8 @@ type PromRule struct {
logger log.Logger
opts PromRuleOpts
reader interfaces.Reader
}
func NewPromRule(
@ -61,6 +65,7 @@ func NewPromRule(
postableRule *PostableRule,
logger log.Logger,
opts PromRuleOpts,
reader interfaces.Reader,
) (*PromRule, error) {
if postableRule.RuleCondition == nil {
@ -83,6 +88,7 @@ func NewPromRule(
logger: logger,
opts: opts,
}
p.reader = reader
if int64(p.evalWindow) == 0 {
p.evalWindow = 5 * time.Minute
@ -215,8 +221,6 @@ func (r *PromRule) GetEvaluationTimestamp() time.Time {
// State returns the maximum state of alert instances for this rule.
// StateFiring > StatePending > StateInactive
func (r *PromRule) State() AlertState {
r.mtx.Lock()
defer r.mtx.Unlock()
maxState := StateInactive
for _, a := range r.active {
@ -338,6 +342,8 @@ func (r *PromRule) compareOp() CompareOp {
func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) {
prevState := r.State()
start := ts.Add(-r.evalWindow)
end := ts
interval := 60 * time.Second // TODO(srikanthccv): this should be configurable
@ -459,8 +465,14 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
}
itemsToAdd := []v3.RuleStateHistory{}
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.active {
labelsJSON, err := json.Marshal(a.Labels)
if err != nil {
zap.L().Error("error marshaling labels", zap.Error(err), zap.String("name", r.Name()))
}
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
@ -470,6 +482,15 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
if a.State != StateInactive {
a.State = StateInactive
a.ResolvedAt = ts
itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: "normal",
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: v3.LabelsString(labelsJSON),
Fingerprint: a.Labels.Hash(),
})
}
continue
}
@ -477,12 +498,46 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {
a.State = StateFiring
a.FiredAt = ts
state := "firing"
if a.Missing {
state = "no_data"
}
itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: state,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: v3.LabelsString(labelsJSON),
Fingerprint: a.Labels.Hash(),
Value: a.Value,
})
}
}
r.health = HealthGood
r.lastError = err
currentState := r.State()
if currentState != prevState {
for idx := range itemsToAdd {
if currentState == StateInactive {
itemsToAdd[idx].OverallState = "normal"
} else {
itemsToAdd[idx].OverallState = currentState.String()
}
itemsToAdd[idx].OverallStateChanged = true
}
}
if len(itemsToAdd) > 0 && r.reader != nil {
err := r.reader.AddRuleStateHistory(ctx, itemsToAdd)
if err != nil {
zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd))
}
}
return len(r.active), nil
}

View File

@ -611,7 +611,7 @@ func TestPromRuleShouldAlert(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target
rule, err := NewPromRule("69", &postableRule, testLogger{t}, PromRuleOpts{})
rule, err := NewPromRule("69", &postableRule, testLogger{t}, PromRuleOpts{}, nil)
if err != nil {
assert.NoError(t, err)
}

View File

@ -76,6 +76,7 @@ type ThresholdRule struct {
querier interfaces.Querier
querierV2 interfaces.Querier
reader interfaces.Reader
evalDelay time.Duration
}
@ -150,6 +151,7 @@ func NewThresholdRule(
t.querier = querier.NewQuerier(querierOption)
t.querierV2 = querierV2.NewQuerier(querierOptsV2)
t.reader = reader
zap.L().Info("creating new ThresholdRule", zap.String("name", t.name), zap.String("id", t.id))
@ -287,8 +289,6 @@ func (r *ThresholdRule) GetEvaluationTimestamp() time.Time {
// StateFiring > StatePending > StateInactive
func (r *ThresholdRule) State() AlertState {
r.mtx.Lock()
defer r.mtx.Unlock()
maxState := StateInactive
for _, a := range r.active {
if a.State > maxState {
@ -873,6 +873,8 @@ func normalizeLabelName(name string) string {
func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) {
prevState := r.State()
valueFormatter := formatter.FromUnit(r.Unit())
res, err := r.buildAndRunQuery(ctx, ts, queriers.Ch)
@ -980,6 +982,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
Value: smpl.V,
GeneratorURL: r.GeneratorURL(),
Receivers: r.preferredChannels,
Missing: smpl.IsMissing,
}
}
@ -1001,8 +1004,14 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
}
itemsToAdd := []v3.RuleStateHistory{}
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.active {
labelsJSON, err := json.Marshal(a.Labels)
if err != nil {
zap.L().Error("error marshaling labels", zap.Error(err), zap.Any("labels", a.Labels))
}
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
@ -1012,6 +1021,15 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
if a.State != StateInactive {
a.State = StateInactive
a.ResolvedAt = ts
itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: "normal",
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: v3.LabelsString(labelsJSON),
Fingerprint: a.Labels.Hash(),
})
}
continue
}
@ -1019,8 +1037,46 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {
a.State = StateFiring
a.FiredAt = ts
state := "firing"
if a.Missing {
state = "no_data"
}
itemsToAdd = append(itemsToAdd, v3.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: state,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: v3.LabelsString(labelsJSON),
Fingerprint: a.Labels.Hash(),
Value: a.Value,
})
}
}
currentState := r.State()
if currentState != prevState {
for idx := range itemsToAdd {
if currentState == StateInactive {
itemsToAdd[idx].OverallState = "normal"
} else {
itemsToAdd[idx].OverallState = currentState.String()
}
itemsToAdd[idx].OverallStateChanged = true
}
} else {
for idx := range itemsToAdd {
itemsToAdd[idx].OverallState = currentState.String()
itemsToAdd[idx].OverallStateChanged = false
}
}
if len(itemsToAdd) > 0 && r.reader != nil {
err := r.reader.AddRuleStateHistory(ctx, itemsToAdd)
if err != nil {
zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd))
}
}
r.health = HealthGood
r.lastError = err