diff --git a/ee/query-service/anomaly/params.go b/ee/query-service/anomaly/params.go index d39b2fa80f..8340a2673a 100644 --- a/ee/query-service/anomaly/params.go +++ b/ee/query-service/anomaly/params.go @@ -16,6 +16,10 @@ const ( SeasonalityWeekly Seasonality = "weekly" ) +func (s Seasonality) String() string { + return string(s) +} + var ( oneWeekOffset = 24 * 7 * time.Hour.Milliseconds() oneDayOffset = 24 * time.Hour.Milliseconds() diff --git a/ee/query-service/anomaly/seasonal.go b/ee/query-service/anomaly/seasonal.go index 485ab7f460..9b5f33d3df 100644 --- a/ee/query-service/anomaly/seasonal.go +++ b/ee/query-service/anomaly/seasonal.go @@ -67,6 +67,7 @@ func (p *BaseSeasonalProvider) getQueryParams(req *GetAnomaliesRequest) *anomaly } func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQueryParams) (*anomalyQueryResults, error) { + zap.L().Info("fetching results for current period", zap.Any("currentPeriodQuery", params.CurrentPeriodQuery)) currentPeriodResults, _, err := p.querierV2.QueryRange(ctx, params.CurrentPeriodQuery) if err != nil { return nil, err @@ -77,6 +78,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQu return nil, err } + zap.L().Info("fetching results for past period", zap.Any("pastPeriodQuery", params.PastPeriodQuery)) pastPeriodResults, _, err := p.querierV2.QueryRange(ctx, params.PastPeriodQuery) if err != nil { return nil, err @@ -87,6 +89,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQu return nil, err } + zap.L().Info("fetching results for current season", zap.Any("currentSeasonQuery", params.CurrentSeasonQuery)) currentSeasonResults, _, err := p.querierV2.QueryRange(ctx, params.CurrentSeasonQuery) if err != nil { return nil, err @@ -97,6 +100,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQu return nil, err } + zap.L().Info("fetching results for past season", zap.Any("pastSeasonQuery", params.PastSeasonQuery)) pastSeasonResults, _, err := p.querierV2.QueryRange(ctx, params.PastSeasonQuery) if err != nil { return nil, err @@ -107,6 +111,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQu return nil, err } + zap.L().Info("fetching results for past 2 season", zap.Any("past2SeasonQuery", params.Past2SeasonQuery)) past2SeasonResults, _, err := p.querierV2.QueryRange(ctx, params.Past2SeasonQuery) if err != nil { return nil, err @@ -117,6 +122,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQu return nil, err } + zap.L().Info("fetching results for past 3 season", zap.Any("past3SeasonQuery", params.Past3SeasonQuery)) past3SeasonResults, _, err := p.querierV2.QueryRange(ctx, params.Past3SeasonQuery) if err != nil { return nil, err @@ -184,7 +190,7 @@ func (p *BaseSeasonalProvider) getMovingAvg(series *v3.Series, movingAvgWindowSi return 0 } if startIdx >= len(series.Points)-movingAvgWindowSize { - startIdx = len(series.Points) - movingAvgWindowSize + startIdx = int(math.Max(0, float64(len(series.Points)-movingAvgWindowSize))) } var sum float64 points := series.Points[startIdx:] @@ -250,7 +256,7 @@ func (p *BaseSeasonalProvider) getPredictedSeries( // moving avg of the previous period series + z score threshold * std dev of the series // moving avg of the previous period series - z score threshold * std dev of the series func (p *BaseSeasonalProvider) getBounds( - series, prevSeries, _, _, _, _ *v3.Series, + series, predictedSeries *v3.Series, zScoreThreshold float64, ) (*v3.Series, *v3.Series) { upperBoundSeries := &v3.Series{ @@ -266,8 +272,8 @@ func (p *BaseSeasonalProvider) getBounds( } for idx, curr := range series.Points { - upperBound := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) + zScoreThreshold*p.getStdDev(series) - lowerBound := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) - zScoreThreshold*p.getStdDev(series) + upperBound := p.getMovingAvg(predictedSeries, movingAvgWindowSize, idx) + zScoreThreshold*p.getStdDev(series) + lowerBound := p.getMovingAvg(predictedSeries, movingAvgWindowSize, idx) - zScoreThreshold*p.getStdDev(series) upperBoundSeries.Points = append(upperBoundSeries.Points, v3.Point{ Timestamp: curr.Timestamp, Value: upperBound, @@ -431,11 +437,7 @@ func (p *BaseSeasonalProvider) getAnomalies(ctx context.Context, req *GetAnomali upperBoundSeries, lowerBoundSeries := p.getBounds( series, - pastPeriodSeries, - currentSeasonSeries, - pastSeasonSeries, - past2SeasonSeries, - past3SeasonSeries, + predictedSeries, zScoreThreshold, ) result.UpperBoundSeries = append(result.UpperBoundSeries, upperBoundSeries) diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index 2e2eb8ded5..82557705fd 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -177,6 +177,8 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *baseapp.AuthMiddlew am.ViewAccess(ah.listLicensesV2)). Methods(http.MethodGet) + router.HandleFunc("/api/v4/query_range", am.ViewAccess(ah.queryRangeV4)).Methods(http.MethodPost) + // Gateway router.PathPrefix(gateway.RoutePrefix).HandlerFunc(am.AdminAccess(ah.ServeGatewayHTTP)) diff --git a/ee/query-service/app/api/queryrange.go b/ee/query-service/app/api/queryrange.go new file mode 100644 index 0000000000..d4f3eb975a --- /dev/null +++ b/ee/query-service/app/api/queryrange.go @@ -0,0 +1,119 @@ +package api + +import ( + "bytes" + "fmt" + "io" + "net/http" + + "go.signoz.io/signoz/ee/query-service/anomaly" + baseapp "go.signoz.io/signoz/pkg/query-service/app" + "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.uber.org/zap" +) + +func (aH *APIHandler) queryRangeV4(w http.ResponseWriter, r *http.Request) { + + bodyBytes, _ := io.ReadAll(r.Body) + r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + + queryRangeParams, apiErrorObj := baseapp.ParseQueryRangeParams(r) + + if apiErrorObj != nil { + zap.L().Error("error parsing metric query range params", zap.Error(apiErrorObj.Err)) + RespondError(w, apiErrorObj, nil) + return + } + queryRangeParams.Version = "v4" + + // add temporality for each metric + temporalityErr := aH.PopulateTemporality(r.Context(), queryRangeParams) + if temporalityErr != nil { + zap.L().Error("Error while adding temporality for metrics", zap.Error(temporalityErr)) + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil) + return + } + + anomalyQueryExists := false + anomalyQuery := &v3.BuilderQuery{} + if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder { + for _, query := range queryRangeParams.CompositeQuery.BuilderQueries { + for _, fn := range query.Functions { + if fn.Name == v3.FunctionNameAnomaly { + anomalyQueryExists = true + anomalyQuery = query + break + } + } + } + } + + if anomalyQueryExists { + // ensure all queries have metric data source, and there should be only one anomaly query + for _, query := range queryRangeParams.CompositeQuery.BuilderQueries { + if query.DataSource != v3.DataSourceMetrics { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("all queries must have metric data source")}, nil) + return + } + } + + // get the threshold, and seasonality from the anomaly query + var seasonality anomaly.Seasonality + for _, fn := range anomalyQuery.Functions { + if fn.Name == v3.FunctionNameAnomaly { + seasonalityStr, ok := fn.NamedArgs["seasonality"].(string) + if !ok { + seasonalityStr = "daily" + } + if seasonalityStr == "weekly" { + seasonality = anomaly.SeasonalityWeekly + } else if seasonalityStr == "daily" { + seasonality = anomaly.SeasonalityDaily + } else { + seasonality = anomaly.SeasonalityHourly + } + break + } + } + var provider anomaly.Provider + switch seasonality { + case anomaly.SeasonalityWeekly: + provider = anomaly.NewWeeklyProvider( + anomaly.WithCache[*anomaly.WeeklyProvider](aH.opts.Cache), + anomaly.WithKeyGenerator[*anomaly.WeeklyProvider](queryBuilder.NewKeyGenerator()), + anomaly.WithReader[*anomaly.WeeklyProvider](aH.opts.DataConnector), + anomaly.WithFeatureLookup[*anomaly.WeeklyProvider](aH.opts.FeatureFlags), + ) + case anomaly.SeasonalityDaily: + provider = anomaly.NewDailyProvider( + anomaly.WithCache[*anomaly.DailyProvider](aH.opts.Cache), + anomaly.WithKeyGenerator[*anomaly.DailyProvider](queryBuilder.NewKeyGenerator()), + anomaly.WithReader[*anomaly.DailyProvider](aH.opts.DataConnector), + anomaly.WithFeatureLookup[*anomaly.DailyProvider](aH.opts.FeatureFlags), + ) + case anomaly.SeasonalityHourly: + provider = anomaly.NewHourlyProvider( + anomaly.WithCache[*anomaly.HourlyProvider](aH.opts.Cache), + anomaly.WithKeyGenerator[*anomaly.HourlyProvider](queryBuilder.NewKeyGenerator()), + anomaly.WithReader[*anomaly.HourlyProvider](aH.opts.DataConnector), + anomaly.WithFeatureLookup[*anomaly.HourlyProvider](aH.opts.FeatureFlags), + ) + } + anomalies, err := provider.GetAnomalies(r.Context(), &anomaly.GetAnomaliesRequest{Params: queryRangeParams}) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + uniqueResults := make(map[string]*v3.Result) + for _, anomaly := range anomalies.Results { + uniqueResults[anomaly.QueryName] = anomaly + uniqueResults[anomaly.QueryName].IsAnomaly = true + } + aH.Respond(w, uniqueResults) + } else { + r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + aH.QueryRangeV4(w, r) + } +} diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 9845ee670b..54eb7bd1e5 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -170,6 +170,14 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } } + var c cache.Cache + if serverOptions.CacheConfigPath != "" { + cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) + if err != nil { + return nil, err + } + c = cache.NewCache(cacheOpts) + } <-readerReady rm, err := makeRulesManager(serverOptions.PromConfigPath, @@ -177,6 +185,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.RuleRepoURL, localDB, reader, + c, serverOptions.DisableRules, lm, serverOptions.UseLogsNewSchema, @@ -237,15 +246,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { telemetry.GetInstance().SetReader(reader) telemetry.GetInstance().SetSaasOperator(constants.SaasSegmentKey) - var c cache.Cache - if serverOptions.CacheConfigPath != "" { - cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) - if err != nil { - return nil, err - } - c = cache.NewCache(cacheOpts) - } - fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) if err != nil { @@ -732,6 +732,7 @@ func makeRulesManager( ruleRepoURL string, db *sqlx.DB, ch baseint.Reader, + cache cache.Cache, disableRules bool, fm baseint.FeatureLookup, useLogsNewSchema bool) (*baserules.Manager, error) { @@ -760,6 +761,7 @@ func makeRulesManager( DisableRules: disableRules, FeatureFlags: fm, Reader: ch, + Cache: cache, EvalDelay: baseconst.GetEvalDelay(), PrepareTaskFunc: rules.PrepareTaskFunc, diff --git a/ee/query-service/model/plans.go b/ee/query-service/model/plans.go index 9b696c013f..5b695143b7 100644 --- a/ee/query-service/model/plans.go +++ b/ee/query-service/model/plans.go @@ -127,6 +127,13 @@ var BasicPlan = basemodel.FeatureSet{ UsageLimit: -1, Route: "", }, + basemodel.Feature{ + Name: basemodel.AnomalyDetection, + Active: false, + Usage: 0, + UsageLimit: -1, + Route: "", + }, } var ProPlan = basemodel.FeatureSet{ @@ -235,6 +242,13 @@ var ProPlan = basemodel.FeatureSet{ UsageLimit: -1, Route: "", }, + basemodel.Feature{ + Name: basemodel.AnomalyDetection, + Active: true, + Usage: 0, + UsageLimit: -1, + Route: "", + }, } var EnterprisePlan = basemodel.FeatureSet{ @@ -357,4 +371,11 @@ var EnterprisePlan = basemodel.FeatureSet{ UsageLimit: -1, Route: "", }, + basemodel.Feature{ + Name: basemodel.AnomalyDetection, + Active: true, + Usage: 0, + UsageLimit: -1, + Route: "", + }, } diff --git a/ee/query-service/rules/anomaly.go b/ee/query-service/rules/anomaly.go new file mode 100644 index 0000000000..a04bfc2840 --- /dev/null +++ b/ee/query-service/rules/anomaly.go @@ -0,0 +1,393 @@ +package rules + +import ( + "context" + "encoding/json" + "fmt" + "math" + "strings" + "sync" + "time" + + "go.uber.org/zap" + + "go.signoz.io/signoz/ee/query-service/anomaly" + "go.signoz.io/signoz/pkg/query-service/cache" + "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/query-service/model" + + querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2" + "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" + "go.signoz.io/signoz/pkg/query-service/interfaces" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils/labels" + "go.signoz.io/signoz/pkg/query-service/utils/times" + "go.signoz.io/signoz/pkg/query-service/utils/timestamp" + + "go.signoz.io/signoz/pkg/query-service/formatter" + + baserules "go.signoz.io/signoz/pkg/query-service/rules" + + yaml "gopkg.in/yaml.v2" +) + +const ( + RuleTypeAnomaly = "anomaly_rule" +) + +type AnomalyRule struct { + *baserules.BaseRule + + mtx sync.Mutex + + reader interfaces.Reader + + // querierV2 is used for alerts created after the introduction of new metrics query builder + querierV2 interfaces.Querier + + provider anomaly.Provider + + seasonality anomaly.Seasonality +} + +func NewAnomalyRule( + id string, + p *baserules.PostableRule, + featureFlags interfaces.FeatureLookup, + reader interfaces.Reader, + cache cache.Cache, + opts ...baserules.RuleOption, +) (*AnomalyRule, error) { + + zap.L().Info("creating new AnomalyRule", zap.String("id", id), zap.Any("opts", opts)) + + baseRule, err := baserules.NewBaseRule(id, p, reader, opts...) + if err != nil { + return nil, err + } + + t := AnomalyRule{ + BaseRule: baseRule, + } + + switch strings.ToLower(p.RuleCondition.Seasonality) { + case "hourly": + t.seasonality = anomaly.SeasonalityHourly + case "daily": + t.seasonality = anomaly.SeasonalityDaily + case "weekly": + t.seasonality = anomaly.SeasonalityWeekly + default: + t.seasonality = anomaly.SeasonalityDaily + } + + zap.L().Info("using seasonality", zap.String("seasonality", t.seasonality.String())) + + querierOptsV2 := querierV2.QuerierOptions{ + Reader: reader, + Cache: cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FeatureLookup: featureFlags, + } + + t.querierV2 = querierV2.NewQuerier(querierOptsV2) + t.reader = reader + if t.seasonality == anomaly.SeasonalityHourly { + t.provider = anomaly.NewHourlyProvider( + anomaly.WithCache[*anomaly.HourlyProvider](cache), + anomaly.WithKeyGenerator[*anomaly.HourlyProvider](queryBuilder.NewKeyGenerator()), + anomaly.WithReader[*anomaly.HourlyProvider](reader), + anomaly.WithFeatureLookup[*anomaly.HourlyProvider](featureFlags), + ) + } else if t.seasonality == anomaly.SeasonalityDaily { + t.provider = anomaly.NewDailyProvider( + anomaly.WithCache[*anomaly.DailyProvider](cache), + anomaly.WithKeyGenerator[*anomaly.DailyProvider](queryBuilder.NewKeyGenerator()), + anomaly.WithReader[*anomaly.DailyProvider](reader), + anomaly.WithFeatureLookup[*anomaly.DailyProvider](featureFlags), + ) + } else if t.seasonality == anomaly.SeasonalityWeekly { + t.provider = anomaly.NewWeeklyProvider( + anomaly.WithCache[*anomaly.WeeklyProvider](cache), + anomaly.WithKeyGenerator[*anomaly.WeeklyProvider](queryBuilder.NewKeyGenerator()), + anomaly.WithReader[*anomaly.WeeklyProvider](reader), + anomaly.WithFeatureLookup[*anomaly.WeeklyProvider](featureFlags), + ) + } + return &t, nil +} + +func (r *AnomalyRule) Type() baserules.RuleType { + return RuleTypeAnomaly +} + +func (r *AnomalyRule) prepareQueryRange(ts time.Time) (*v3.QueryRangeParamsV3, error) { + + zap.L().Info("prepareQueryRange", zap.Int64("ts", ts.UnixMilli()), zap.Int64("evalWindow", r.EvalWindow().Milliseconds()), zap.Int64("evalDelay", r.EvalDelay().Milliseconds())) + + start := ts.Add(-time.Duration(r.EvalWindow())).UnixMilli() + end := ts.UnixMilli() + + if r.EvalDelay() > 0 { + start = start - int64(r.EvalDelay().Milliseconds()) + end = end - int64(r.EvalDelay().Milliseconds()) + } + // round to minute otherwise we could potentially miss data + start = start - (start % (60 * 1000)) + end = end - (end % (60 * 1000)) + + compositeQuery := r.Condition().CompositeQuery + + if compositeQuery.PanelType != v3.PanelTypeGraph { + compositeQuery.PanelType = v3.PanelTypeGraph + } + + // default mode + return &v3.QueryRangeParamsV3{ + Start: start, + End: end, + Step: int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)), + CompositeQuery: compositeQuery, + Variables: make(map[string]interface{}, 0), + NoCache: false, + }, nil +} + +func (r *AnomalyRule) GetSelectedQuery() string { + return r.Condition().GetSelectedQueryName() +} + +func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, ts time.Time) (baserules.Vector, error) { + + params, err := r.prepareQueryRange(ts) + if err != nil { + return nil, err + } + err = r.PopulateTemporality(ctx, params) + if err != nil { + return nil, fmt.Errorf("internal error while setting temporality") + } + + anomalies, err := r.provider.GetAnomalies(ctx, &anomaly.GetAnomaliesRequest{ + Params: params, + Seasonality: r.seasonality, + }) + if err != nil { + return nil, err + } + + var queryResult *v3.Result + for _, result := range anomalies.Results { + if result.QueryName == r.GetSelectedQuery() { + queryResult = result + break + } + } + + var resultVector baserules.Vector + + scoresJSON, _ := json.Marshal(queryResult.AnomalyScores) + zap.L().Info("anomaly scores", zap.String("scores", string(scoresJSON))) + + for _, series := range queryResult.AnomalyScores { + smpl, shouldAlert := r.ShouldAlert(*series) + if shouldAlert { + resultVector = append(resultVector, smpl) + } + } + return resultVector, nil +} + +func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) { + + prevState := r.State() + + valueFormatter := formatter.FromUnit(r.Unit()) + res, err := r.buildAndRunQuery(ctx, ts) + + if err != nil { + return nil, err + } + + r.mtx.Lock() + defer r.mtx.Unlock() + + resultFPs := map[uint64]struct{}{} + var alerts = make(map[uint64]*baserules.Alert, len(res)) + + for _, smpl := range res { + l := make(map[string]string, len(smpl.Metric)) + for _, lbl := range smpl.Metric { + l[lbl.Name] = lbl.Value + } + + value := valueFormatter.Format(smpl.V, r.Unit()) + threshold := valueFormatter.Format(r.TargetVal(), r.Unit()) + zap.L().Debug("Alert template data for rule", zap.String("name", r.Name()), zap.String("formatter", valueFormatter.Name()), zap.String("value", value), zap.String("threshold", threshold)) + + tmplData := baserules.AlertTemplateData(l, value, threshold) + // Inject some convenience variables that are easier to remember for users + // who are not used to Go's templating system. + defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}" + + // utility function to apply go template on labels and annotations + expand := func(text string) string { + + tmpl := baserules.NewTemplateExpander( + ctx, + defs+text, + "__alert_"+r.Name(), + tmplData, + times.Time(timestamp.FromTime(ts)), + nil, + ) + result, err := tmpl.Expand() + if err != nil { + result = fmt.Sprintf("", err) + zap.L().Error("Expanding alert template failed", zap.Error(err), zap.Any("data", tmplData)) + } + return result + } + + lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel) + resultLabels := labels.NewBuilder(smpl.MetricOrig).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel).Labels() + + for name, value := range r.Labels().Map() { + lb.Set(name, expand(value)) + } + + lb.Set(labels.AlertNameLabel, r.Name()) + lb.Set(labels.AlertRuleIdLabel, r.ID()) + lb.Set(labels.RuleSourceLabel, r.GeneratorURL()) + + annotations := make(labels.Labels, 0, len(r.Annotations().Map())) + for name, value := range r.Annotations().Map() { + annotations = append(annotations, labels.Label{Name: common.NormalizeLabelName(name), Value: expand(value)}) + } + if smpl.IsMissing { + lb.Set(labels.AlertNameLabel, "[No data] "+r.Name()) + } + + lbs := lb.Labels() + h := lbs.Hash() + resultFPs[h] = struct{}{} + + if _, ok := alerts[h]; ok { + zap.L().Error("the alert query returns duplicate records", zap.String("ruleid", r.ID()), zap.Any("alert", alerts[h])) + err = fmt.Errorf("duplicate alert found, vector contains metrics with the same labelset after applying alert labels") + return nil, err + } + + alerts[h] = &baserules.Alert{ + Labels: lbs, + QueryResultLables: resultLabels, + Annotations: annotations, + ActiveAt: ts, + State: model.StatePending, + Value: smpl.V, + GeneratorURL: r.GeneratorURL(), + Receivers: r.PreferredChannels(), + Missing: smpl.IsMissing, + } + } + + zap.L().Info("number of alerts found", zap.String("name", r.Name()), zap.Int("count", len(alerts))) + + // alerts[h] is ready, add or update active list now + for h, a := range alerts { + // Check whether we already have alerting state for the identifying label set. + // Update the last value and annotations if so, create a new alert entry otherwise. + if alert, ok := r.Active[h]; ok && alert.State != model.StateInactive { + + alert.Value = a.Value + alert.Annotations = a.Annotations + alert.Receivers = r.PreferredChannels() + continue + } + + r.Active[h] = a + } + + itemsToAdd := []model.RuleStateHistory{} + + // Check if any pending alerts should be removed or fire now. Write out alert timeseries. + for fp, a := range r.Active { + labelsJSON, err := json.Marshal(a.QueryResultLables) + if err != nil { + zap.L().Error("error marshaling labels", zap.Error(err), zap.Any("labels", a.Labels)) + } + if _, ok := resultFPs[fp]; !ok { + // If the alert was previously firing, keep it around for a given + // retention time so it is reported as resolved to the AlertManager. + if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > baserules.ResolvedRetention) { + delete(r.Active, fp) + } + if a.State != model.StateInactive { + a.State = model.StateInactive + a.ResolvedAt = ts + itemsToAdd = append(itemsToAdd, model.RuleStateHistory{ + RuleID: r.ID(), + RuleName: r.Name(), + State: model.StateInactive, + StateChanged: true, + UnixMilli: ts.UnixMilli(), + Labels: model.LabelsString(labelsJSON), + Fingerprint: a.QueryResultLables.Hash(), + Value: a.Value, + }) + } + continue + } + + if a.State == model.StatePending && ts.Sub(a.ActiveAt) >= r.HoldDuration() { + a.State = model.StateFiring + a.FiredAt = ts + state := model.StateFiring + if a.Missing { + state = model.StateNoData + } + itemsToAdd = append(itemsToAdd, model.RuleStateHistory{ + RuleID: r.ID(), + RuleName: r.Name(), + State: state, + StateChanged: true, + UnixMilli: ts.UnixMilli(), + Labels: model.LabelsString(labelsJSON), + Fingerprint: a.QueryResultLables.Hash(), + Value: a.Value, + }) + } + } + + currentState := r.State() + + overallStateChanged := currentState != prevState + for idx, item := range itemsToAdd { + item.OverallStateChanged = overallStateChanged + item.OverallState = currentState + itemsToAdd[idx] = item + } + + r.RecordRuleStateHistory(ctx, prevState, currentState, itemsToAdd) + + return len(r.Active), nil +} + +func (r *AnomalyRule) String() string { + + ar := baserules.PostableRule{ + AlertName: r.Name(), + RuleCondition: r.Condition(), + EvalWindow: baserules.Duration(r.EvalWindow()), + Labels: r.Labels().Map(), + Annotations: r.Annotations().Map(), + PreferredChannels: r.PreferredChannels(), + } + + byt, err := yaml.Marshal(ar) + if err != nil { + return fmt.Sprintf("error marshaling alerting rule: %s", err.Error()) + } + + return string(byt) +} diff --git a/ee/query-service/rules/manager.go b/ee/query-service/rules/manager.go index 2b80441f0c..5ed35d4d34 100644 --- a/ee/query-service/rules/manager.go +++ b/ee/query-service/rules/manager.go @@ -53,6 +53,25 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error) // create promql rule task for evalution task = newTask(baserules.TaskTypeProm, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.RuleDB) + } else if opts.Rule.RuleType == baserules.RuleTypeAnomaly { + // create anomaly rule + ar, err := NewAnomalyRule( + ruleId, + opts.Rule, + opts.FF, + opts.Reader, + opts.Cache, + baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay), + ) + if err != nil { + return task, err + } + + rules = append(rules, ar) + + // create anomaly rule task for evalution + task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.RuleDB) + } else { return nil, fmt.Errorf("unsupported rule type. Supported types: %s, %s", baserules.RuleTypeProm, baserules.RuleTypeThreshold) } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 219181dc7f..5055913113 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -518,7 +518,7 @@ func (aH *APIHandler) getRule(w http.ResponseWriter, r *http.Request) { } // populateTemporality adds the temporality to the query if it is not present -func (aH *APIHandler) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error { +func (aH *APIHandler) PopulateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error { aH.temporalityMux.Lock() defer aH.temporalityMux.Unlock() @@ -3791,7 +3791,7 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) { } // add temporality for each metric - temporalityErr := aH.populateTemporality(r.Context(), queryRangeParams) + temporalityErr := aH.PopulateTemporality(r.Context(), queryRangeParams) if temporalityErr != nil { zap.L().Error("Error while adding temporality for metrics", zap.Error(temporalityErr)) RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil) @@ -4139,7 +4139,7 @@ func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) { queryRangeParams.Version = "v4" // add temporality for each metric - temporalityErr := aH.populateTemporality(r.Context(), queryRangeParams) + temporalityErr := aH.PopulateTemporality(r.Context(), queryRangeParams) if temporalityErr != nil { zap.L().Error("Error while adding temporality for metrics", zap.Error(temporalityErr)) RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil) diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 22d52b9884..b71df63781 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -144,9 +144,20 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } } + var c cache.Cache + if serverOptions.CacheConfigPath != "" { + cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) + if err != nil { + return nil, err + } + c = cache.NewCache(cacheOpts) + } <-readerReady - rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema) + rm, err := makeRulesManager( + serverOptions.PromConfigPath, + constants.GetAlertManagerApiPrefix(), + serverOptions.RuleRepoURL, localDB, reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema) if err != nil { return nil, err } @@ -158,15 +169,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { } }() - var c cache.Cache - if serverOptions.CacheConfigPath != "" { - cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) - if err != nil { - return nil, err - } - c = cache.NewCache(cacheOpts) - } - fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) if err != nil { return nil, err @@ -715,6 +717,7 @@ func makeRulesManager( ruleRepoURL string, db *sqlx.DB, ch interfaces.Reader, + cache cache.Cache, disableRules bool, fm interfaces.FeatureLookup, useLogsNewSchema bool) (*rules.Manager, error) { @@ -743,6 +746,7 @@ func makeRulesManager( DisableRules: disableRules, FeatureFlags: fm, Reader: ch, + Cache: cache, EvalDelay: constants.GetEvalDelay(), UseLogsNewSchema: useLogsNewSchema, } diff --git a/pkg/query-service/common/query_range.go b/pkg/query-service/common/query_range.go index d6b62baf27..598ac1a21c 100644 --- a/pkg/query-service/common/query_range.go +++ b/pkg/query-service/common/query_range.go @@ -2,7 +2,9 @@ package common import ( "math" + "regexp" "time" + "unicode" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" @@ -73,6 +75,23 @@ func LCMList(nums []int64) int64 { return result } +func NormalizeLabelName(name string) string { + // See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels + + // Regular expression to match non-alphanumeric characters except underscores + reg := regexp.MustCompile(`[^a-zA-Z0-9_]`) + + // Replace all non-alphanumeric characters except underscores with underscores + normalized := reg.ReplaceAllString(name, "_") + + // If the first character is not a letter or an underscore, prepend an underscore + if len(normalized) > 0 && !unicode.IsLetter(rune(normalized[0])) && normalized[0] != '_' { + normalized = "_" + normalized + } + + return normalized +} + func GetSeriesFromCachedData(data []querycache.CachedSeriesData, start, end int64) []*v3.Series { series := make(map[uint64]*v3.Series) diff --git a/pkg/query-service/model/featureSet.go b/pkg/query-service/model/featureSet.go index 0e7a1c0278..de4a4ea879 100644 --- a/pkg/query-service/model/featureSet.go +++ b/pkg/query-service/model/featureSet.go @@ -22,6 +22,7 @@ const AlertChannelPagerduty = "ALERT_CHANNEL_PAGERDUTY" const AlertChannelMsTeams = "ALERT_CHANNEL_MSTEAMS" const AlertChannelOpsgenie = "ALERT_CHANNEL_OPSGENIE" const AlertChannelEmail = "ALERT_CHANNEL_EMAIL" +const AnomalyDetection = "ANOMALY_DETECTION" var BasicPlan = FeatureSet{ Feature{ @@ -115,4 +116,11 @@ var BasicPlan = FeatureSet{ UsageLimit: -1, Route: "", }, + Feature{ + Name: AnomalyDetection, + Active: false, + Usage: 0, + UsageLimit: -1, + Route: "", + }, } diff --git a/pkg/query-service/rules/alerting.go b/pkg/query-service/rules/alerting.go index 77c9fbe219..ec2b0c8016 100644 --- a/pkg/query-service/rules/alerting.go +++ b/pkg/query-service/rules/alerting.go @@ -19,7 +19,7 @@ import ( const ( // how long before re-sending the alert - resolvedRetention = 15 * time.Minute + ResolvedRetention = 15 * time.Minute TestAlertPostFix = "_TEST_ALERT" ) @@ -29,6 +29,7 @@ type RuleType string const ( RuleTypeThreshold = "threshold_rule" RuleTypeProm = "promql_rule" + RuleTypeAnomaly = "anomaly_rule" ) type RuleHealth string @@ -83,27 +84,16 @@ type NamedAlert struct { type CompareOp string const ( - CompareOpNone CompareOp = "0" - ValueIsAbove CompareOp = "1" - ValueIsBelow CompareOp = "2" - ValueIsEq CompareOp = "3" - ValueIsNotEq CompareOp = "4" + CompareOpNone CompareOp = "0" + ValueIsAbove CompareOp = "1" + ValueIsBelow CompareOp = "2" + ValueIsEq CompareOp = "3" + ValueIsNotEq CompareOp = "4" + ValueAboveOrEq CompareOp = "5" + ValueBelowOrEq CompareOp = "6" + ValueOutsideBounds CompareOp = "7" ) -func ResolveCompareOp(cop CompareOp) string { - switch cop { - case ValueIsAbove: - return ">" - case ValueIsBelow: - return "<" - case ValueIsEq: - return "==" - case ValueIsNotEq: - return "!=" - } - return "" -} - type MatchType string const ( @@ -123,6 +113,8 @@ type RuleCondition struct { AbsentFor uint64 `yaml:"absentFor,omitempty" json:"absentFor,omitempty"` MatchType MatchType `json:"matchType,omitempty"` TargetUnit string `json:"targetUnit,omitempty"` + Algorithm string `json:"algorithm,omitempty"` + Seasonality string `json:"seasonality,omitempty"` SelectedQuery string `json:"selectedQueryName,omitempty"` } diff --git a/pkg/query-service/rules/api_params.go b/pkg/query-service/rules/api_params.go index 6d3288ece1..77a1552946 100644 --- a/pkg/query-service/rules/api_params.go +++ b/pkg/query-service/rules/api_params.go @@ -133,7 +133,9 @@ func parseIntoRule(initRule PostableRule, content []byte, kind RuleDataKind) (*P if rule.RuleCondition != nil { if rule.RuleCondition.CompositeQuery.QueryType == v3.QueryTypeBuilder { - rule.RuleType = RuleTypeThreshold + if rule.RuleType == "" { + rule.RuleType = RuleTypeThreshold + } } else if rule.RuleCondition.CompositeQuery.QueryType == v3.QueryTypePromQL { rule.RuleType = RuleTypeProm } diff --git a/pkg/query-service/rules/base_rule.go b/pkg/query-service/rules/base_rule.go index b82aab91b5..181eaa3a28 100644 --- a/pkg/query-service/rules/base_rule.go +++ b/pkg/query-service/rules/base_rule.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/converter" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" @@ -53,7 +54,7 @@ type BaseRule struct { health RuleHealth lastError error - active map[uint64]*Alert + Active map[uint64]*Alert // lastTimestampWithDatapoints is the timestamp of the last datapoint we observed // for this rule @@ -72,6 +73,12 @@ type BaseRule struct { // sendAlways will send alert irresepective of resendDelay // or other params sendAlways bool + + // TemporalityMap is a map of metric name to temporality + // to avoid fetching temporality for the same metric multiple times + // querying the v4 table on low cardinal temporality column + // should be fast but we can still avoid the query if we have the data in memory + TemporalityMap map[string]map[v3.Temporality]bool } type RuleOption func(*BaseRule) @@ -116,8 +123,9 @@ func NewBaseRule(id string, p *PostableRule, reader interfaces.Reader, opts ...R annotations: qslabels.FromMap(p.Annotations), preferredChannels: p.PreferredChannels, health: HealthUnknown, - active: map[uint64]*Alert{}, + Active: map[uint64]*Alert{}, reader: reader, + TemporalityMap: make(map[string]map[v3.Temporality]bool), } if baseRule.evalWindow == 0 { @@ -165,14 +173,30 @@ func (r *BaseRule) currentAlerts() []*Alert { r.mtx.Lock() defer r.mtx.Unlock() - alerts := make([]*Alert, 0, len(r.active)) - for _, a := range r.active { + alerts := make([]*Alert, 0, len(r.Active)) + for _, a := range r.Active { anew := *a alerts = append(alerts, &anew) } return alerts } +func (r *BaseRule) EvalDelay() time.Duration { + return r.evalDelay +} + +func (r *BaseRule) EvalWindow() time.Duration { + return r.evalWindow +} + +func (r *BaseRule) HoldDuration() time.Duration { + return r.holdDuration +} + +func (r *BaseRule) TargetVal() float64 { + return r.targetVal() +} + func (r *ThresholdRule) hostFromSource() string { parsedUrl, err := url.Parse(r.source) if err != nil { @@ -267,7 +291,7 @@ func (r *BaseRule) GetEvaluationTimestamp() time.Time { func (r *BaseRule) State() model.AlertState { maxState := model.StateInactive - for _, a := range r.active { + for _, a := range r.Active { if a.State > maxState { maxState = a.State } @@ -306,12 +330,12 @@ func (r *BaseRule) ForEachActiveAlert(f func(*Alert)) { r.mtx.Lock() defer r.mtx.Unlock() - for _, a := range r.active { + for _, a := range r.Active { f(a) } } -func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) { +func (r *BaseRule) ShouldAlert(series v3.Series) (Sample, bool) { var alertSmpl Sample var shouldAlert bool var lbls qslabels.Labels @@ -319,7 +343,7 @@ func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) { for name, value := range series.Labels { lbls = append(lbls, qslabels.Label{Name: name, Value: value}) - lblsNormalized = append(lblsNormalized, qslabels.Label{Name: normalizeLabelName(name), Value: value}) + lblsNormalized = append(lblsNormalized, qslabels.Label{Name: common.NormalizeLabelName(name), Value: value}) } series.Points = removeGroupinSetPoints(series) @@ -364,6 +388,14 @@ func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) { break } } + } else if r.compareOp() == ValueOutsideBounds { + for _, smpl := range series.Points { + if math.Abs(smpl.Value) >= r.targetVal() { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} + shouldAlert = true + break + } + } } case AllTheTimes: // If all samples match the condition, the rule is firing. @@ -425,6 +457,14 @@ func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) { } } } + } else if r.compareOp() == ValueOutsideBounds { + for _, smpl := range series.Points { + if math.Abs(smpl.Value) >= r.targetVal() { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} + shouldAlert = true + break + } + } } case OnAverage: // If the average of all samples matches the condition, the rule is firing. @@ -454,6 +494,10 @@ func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) { if avg != r.targetVal() { shouldAlert = true } + } else if r.compareOp() == ValueOutsideBounds { + if math.Abs(avg) >= r.targetVal() { + shouldAlert = true + } } case InTotal: // If the sum of all samples matches the condition, the rule is firing. @@ -482,6 +526,10 @@ func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) { if sum != r.targetVal() { shouldAlert = true } + } else if r.compareOp() == ValueOutsideBounds { + if math.Abs(sum) >= r.targetVal() { + shouldAlert = true + } } case Last: // If the last sample matches the condition, the rule is firing. @@ -602,3 +650,59 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren return nil } + +func (r *BaseRule) PopulateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error { + + missingTemporality := make([]string, 0) + metricNameToTemporality := make(map[string]map[v3.Temporality]bool) + if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { + for _, query := range qp.CompositeQuery.BuilderQueries { + // if there is no temporality specified in the query but we have it in the map + // then use the value from the map + if query.Temporality == "" && r.TemporalityMap[query.AggregateAttribute.Key] != nil { + // We prefer delta if it is available + if r.TemporalityMap[query.AggregateAttribute.Key][v3.Delta] { + query.Temporality = v3.Delta + } else if r.TemporalityMap[query.AggregateAttribute.Key][v3.Cumulative] { + query.Temporality = v3.Cumulative + } else { + query.Temporality = v3.Unspecified + } + } + // we don't have temporality for this metric + if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" { + missingTemporality = append(missingTemporality, query.AggregateAttribute.Key) + } + if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok { + metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool) + } + } + } + + var nameToTemporality map[string]map[v3.Temporality]bool + var err error + + if len(missingTemporality) > 0 { + nameToTemporality, err = r.reader.FetchTemporality(ctx, missingTemporality) + if err != nil { + return err + } + } + + if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { + for name := range qp.CompositeQuery.BuilderQueries { + query := qp.CompositeQuery.BuilderQueries[name] + if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" { + if nameToTemporality[query.AggregateAttribute.Key][v3.Delta] { + query.Temporality = v3.Delta + } else if nameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] { + query.Temporality = v3.Cumulative + } else { + query.Temporality = v3.Unspecified + } + r.TemporalityMap[query.AggregateAttribute.Key] = nameToTemporality[query.AggregateAttribute.Key] + } + } + } + return nil +} diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index 89dec5f3d1..09eb7ad367 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -18,6 +18,7 @@ import ( "github.com/jmoiron/sqlx" + "go.signoz.io/signoz/pkg/query-service/cache" am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" @@ -32,6 +33,7 @@ type PrepareTaskOptions struct { RuleDB RuleDB Logger *zap.Logger Reader interfaces.Reader + Cache cache.Cache FF interfaces.FeatureLookup ManagerOpts *ManagerOptions NotifyFunc NotifyFunc @@ -73,6 +75,7 @@ type ManagerOptions struct { DisableRules bool FeatureFlags interfaces.FeatureLookup Reader interfaces.Reader + Cache cache.Cache EvalDelay time.Duration @@ -96,9 +99,9 @@ type Manager struct { logger *zap.Logger - featureFlags interfaces.FeatureLookup - reader interfaces.Reader - + featureFlags interfaces.FeatureLookup + reader interfaces.Reader + cache cache.Cache prepareTaskFunc func(opts PrepareTaskOptions) (Task, error) UseLogsNewSchema bool @@ -209,6 +212,7 @@ func NewManager(o *ManagerOptions) (*Manager, error) { logger: o.Logger, featureFlags: o.FeatureFlags, reader: o.Reader, + cache: o.Cache, prepareTaskFunc: o.PrepareTaskFunc, } return m, nil @@ -342,6 +346,7 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error { RuleDB: m.ruleDB, Logger: m.logger, Reader: m.reader, + Cache: m.cache, FF: m.featureFlags, ManagerOpts: m.opts, NotifyFunc: m.prepareNotifyFunc(), @@ -463,6 +468,7 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error { RuleDB: m.ruleDB, Logger: m.logger, Reader: m.reader, + Cache: m.cache, FF: m.featureFlags, ManagerOpts: m.opts, NotifyFunc: m.prepareNotifyFunc(), diff --git a/pkg/query-service/rules/prom_rule.go b/pkg/query-service/rules/prom_rule.go index db5a963731..473fac0d5d 100644 --- a/pkg/query-service/rules/prom_rule.go +++ b/pkg/query-service/rules/prom_rule.go @@ -131,7 +131,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) continue } - alertSmpl, shouldAlert := r.shouldAlert(toCommonSeries(series)) + alertSmpl, shouldAlert := r.ShouldAlert(toCommonSeries(series)) if !shouldAlert { continue } @@ -208,21 +208,21 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) for h, a := range alerts { // Check whether we already have alerting state for the identifying label set. // Update the last value and annotations if so, create a new alert entry otherwise. - if alert, ok := r.active[h]; ok && alert.State != model.StateInactive { + if alert, ok := r.Active[h]; ok && alert.State != model.StateInactive { alert.Value = a.Value alert.Annotations = a.Annotations alert.Receivers = r.preferredChannels continue } - r.active[h] = a + r.Active[h] = a } itemsToAdd := []model.RuleStateHistory{} // Check if any pending alerts should be removed or fire now. Write out alert timeseries. - for fp, a := range r.active { + for fp, a := range r.Active { labelsJSON, err := json.Marshal(a.QueryResultLables) if err != nil { zap.L().Error("error marshaling labels", zap.Error(err), zap.String("name", r.Name())) @@ -230,8 +230,8 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) if _, ok := resultFPs[fp]; !ok { // If the alert was previously firing, keep it around for a given // retention time so it is reported as resolved to the AlertManager. - if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) { - delete(r.active, fp) + if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > ResolvedRetention) { + delete(r.Active, fp) } if a.State != model.StateInactive { a.State = model.StateInactive @@ -283,7 +283,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) r.RecordRuleStateHistory(ctx, prevState, currentState, itemsToAdd) - return len(r.active), nil + return len(r.Active), nil } func (r *PromRule) String() string { diff --git a/pkg/query-service/rules/prom_rule_task.go b/pkg/query-service/rules/prom_rule_task.go index f78994430a..c96b09e74f 100644 --- a/pkg/query-service/rules/prom_rule_task.go +++ b/pkg/query-service/rules/prom_rule_task.go @@ -293,8 +293,8 @@ func (g *PromRuleTask) CopyState(fromTask Task) error { continue } - for fp, a := range far.active { - ar.active[fp] = a + for fp, a := range far.Active { + ar.Active[fp] = a } ar.handledRestart = far.handledRestart } diff --git a/pkg/query-service/rules/promrule_test.go b/pkg/query-service/rules/promrule_test.go index c87ef2cee9..3bc268ed65 100644 --- a/pkg/query-service/rules/promrule_test.go +++ b/pkg/query-service/rules/promrule_test.go @@ -661,7 +661,7 @@ func TestPromRuleShouldAlert(t *testing.T) { assert.NoError(t, err) } - _, shoulAlert := rule.shouldAlert(toCommonSeries(c.values)) + _, shoulAlert := rule.ShouldAlert(toCommonSeries(c.values)) assert.Equal(t, c.expectAlert, shoulAlert, "Test case %d", idx) } } diff --git a/pkg/query-service/rules/rule.go b/pkg/query-service/rules/rule.go index 2b5b8d5aae..a8d25f89b6 100644 --- a/pkg/query-service/rules/rule.go +++ b/pkg/query-service/rules/rule.go @@ -18,6 +18,9 @@ type Rule interface { Labels() labels.BaseLabels Annotations() labels.BaseLabels Condition() *RuleCondition + EvalDelay() time.Duration + EvalWindow() time.Duration + HoldDuration() time.Duration State() model.AlertState ActiveAlerts() []*Alert diff --git a/pkg/query-service/rules/rule_task.go b/pkg/query-service/rules/rule_task.go index 0a969bffc8..fc7bde05af 100644 --- a/pkg/query-service/rules/rule_task.go +++ b/pkg/query-service/rules/rule_task.go @@ -285,8 +285,8 @@ func (g *RuleTask) CopyState(fromTask Task) error { continue } - for fp, a := range far.active { - ar.active[fp] = a + for fp, a := range far.Active { + ar.Active[fp] = a } ar.handledRestart = far.handledRestart } diff --git a/pkg/query-service/rules/threshold_rule.go b/pkg/query-service/rules/threshold_rule.go index 0f768314cf..8453f1a268 100644 --- a/pkg/query-service/rules/threshold_rule.go +++ b/pkg/query-service/rules/threshold_rule.go @@ -6,10 +6,8 @@ import ( "encoding/json" "fmt" "math" - "regexp" "text/template" "time" - "unicode" "go.uber.org/zap" @@ -43,11 +41,6 @@ type ThresholdRule struct { // if the version is "v3", then we use the old querier // if the version is "v4", then we use the new querierV2 version string - // temporalityMap is a map of metric name to temporality - // to avoid fetching temporality for the same metric multiple times - // querying the v4 table on low cardinal temporality column - // should be fast but we can still avoid the query if we have the data in memory - temporalityMap map[string]map[v3.Temporality]bool // querier is used for alerts created before the introduction of new metrics query builder querier interfaces.Querier @@ -76,9 +69,8 @@ func NewThresholdRule( } t := ThresholdRule{ - BaseRule: baseRule, - version: p.Version, - temporalityMap: make(map[string]map[v3.Temporality]bool), + BaseRule: baseRule, + version: p.Version, } querierOption := querier.QuerierOptions{ @@ -107,63 +99,6 @@ func (r *ThresholdRule) Type() RuleType { return RuleTypeThreshold } -// populateTemporality same as addTemporality but for v4 and better -func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error { - - missingTemporality := make([]string, 0) - metricNameToTemporality := make(map[string]map[v3.Temporality]bool) - if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { - for _, query := range qp.CompositeQuery.BuilderQueries { - // if there is no temporality specified in the query but we have it in the map - // then use the value from the map - if query.Temporality == "" && r.temporalityMap[query.AggregateAttribute.Key] != nil { - // We prefer delta if it is available - if r.temporalityMap[query.AggregateAttribute.Key][v3.Delta] { - query.Temporality = v3.Delta - } else if r.temporalityMap[query.AggregateAttribute.Key][v3.Cumulative] { - query.Temporality = v3.Cumulative - } else { - query.Temporality = v3.Unspecified - } - } - // we don't have temporality for this metric - if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" { - missingTemporality = append(missingTemporality, query.AggregateAttribute.Key) - } - if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok { - metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool) - } - } - } - - var nameToTemporality map[string]map[v3.Temporality]bool - var err error - - if len(missingTemporality) > 0 { - nameToTemporality, err = r.reader.FetchTemporality(ctx, missingTemporality) - if err != nil { - return err - } - } - - if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { - for name := range qp.CompositeQuery.BuilderQueries { - query := qp.CompositeQuery.BuilderQueries[name] - if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" { - if nameToTemporality[query.AggregateAttribute.Key][v3.Delta] { - query.Temporality = v3.Delta - } else if nameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] { - query.Temporality = v3.Cumulative - } else { - query.Temporality = v3.Unspecified - } - r.temporalityMap[query.AggregateAttribute.Key] = nameToTemporality[query.AggregateAttribute.Key] - } - } - } - return nil -} - func (r *ThresholdRule) prepareQueryRange(ts time.Time) (*v3.QueryRangeParamsV3, error) { zap.L().Info("prepareQueryRange", zap.Int64("ts", ts.UnixMilli()), zap.Int64("evalWindow", r.evalWindow.Milliseconds()), zap.Int64("evalDelay", r.evalDelay.Milliseconds())) @@ -313,7 +248,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time) (Vec if err != nil { return nil, err } - err = r.populateTemporality(ctx, params) + err = r.PopulateTemporality(ctx, params) if err != nil { return nil, fmt.Errorf("internal error while setting temporality") } @@ -406,7 +341,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time) (Vec } for _, series := range queryResult.Series { - smpl, shouldAlert := r.shouldAlert(*series) + smpl, shouldAlert := r.ShouldAlert(*series) if shouldAlert { resultVector = append(resultVector, smpl) } @@ -414,23 +349,6 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time) (Vec return resultVector, nil } -func normalizeLabelName(name string) string { - // See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels - - // Regular expression to match non-alphanumeric characters except underscores - reg := regexp.MustCompile(`[^a-zA-Z0-9_]`) - - // Replace all non-alphanumeric characters except underscores with underscores - normalized := reg.ReplaceAllString(name, "_") - - // If the first character is not a letter or an underscore, prepend an underscore - if len(normalized) > 0 && !unicode.IsLetter(rune(normalized[0])) && normalized[0] != '_' { - normalized = "_" + normalized - } - - return normalized -} - func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) { prevState := r.State() @@ -495,7 +413,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er annotations := make(labels.Labels, 0, len(r.annotations.Map())) for name, value := range r.annotations.Map() { - annotations = append(annotations, labels.Label{Name: normalizeLabelName(name), Value: expand(value)}) + annotations = append(annotations, labels.Label{Name: common.NormalizeLabelName(name), Value: expand(value)}) } if smpl.IsMissing { lb.Set(labels.AlertNameLabel, "[No data] "+r.Name()) @@ -547,7 +465,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er for h, a := range alerts { // Check whether we already have alerting state for the identifying label set. // Update the last value and annotations if so, create a new alert entry otherwise. - if alert, ok := r.active[h]; ok && alert.State != model.StateInactive { + if alert, ok := r.Active[h]; ok && alert.State != model.StateInactive { alert.Value = a.Value alert.Annotations = a.Annotations @@ -555,13 +473,13 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er continue } - r.active[h] = a + r.Active[h] = a } itemsToAdd := []model.RuleStateHistory{} // Check if any pending alerts should be removed or fire now. Write out alert timeseries. - for fp, a := range r.active { + for fp, a := range r.Active { labelsJSON, err := json.Marshal(a.QueryResultLables) if err != nil { zap.L().Error("error marshaling labels", zap.Error(err), zap.Any("labels", a.Labels)) @@ -569,8 +487,8 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er if _, ok := resultFPs[fp]; !ok { // If the alert was previously firing, keep it around for a given // retention time so it is reported as resolved to the AlertManager. - if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) { - delete(r.active, fp) + if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > ResolvedRetention) { + delete(r.Active, fp) } if a.State != model.StateInactive { a.State = model.StateInactive @@ -623,7 +541,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er r.health = HealthGood r.lastError = err - return len(r.active), nil + return len(r.Active), nil } func (r *ThresholdRule) String() string { diff --git a/pkg/query-service/rules/threshold_rule_test.go b/pkg/query-service/rules/threshold_rule_test.go index 8f9554db52..e23ba0d05c 100644 --- a/pkg/query-service/rules/threshold_rule_test.go +++ b/pkg/query-service/rules/threshold_rule_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader" + "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/featureManager" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils/labels" @@ -800,7 +801,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) { values.Points[i].Timestamp = time.Now().UnixMilli() } - smpl, shoulAlert := rule.shouldAlert(c.values) + smpl, shoulAlert := rule.ShouldAlert(c.values) assert.Equal(t, c.expectAlert, shoulAlert, "Test case %d", idx) if shoulAlert { assert.Equal(t, c.expectedAlertSample.Value, smpl.V, "Test case %d", idx) @@ -844,7 +845,7 @@ func TestNormalizeLabelName(t *testing.T) { } for _, c := range cases { - assert.Equal(t, c.expected, normalizeLabelName(c.labelName)) + assert.Equal(t, c.expected, common.NormalizeLabelName(c.labelName)) } } @@ -1007,9 +1008,9 @@ func TestThresholdRuleLabelNormalization(t *testing.T) { values.Points[i].Timestamp = time.Now().UnixMilli() } - sample, shoulAlert := rule.shouldAlert(c.values) + sample, shoulAlert := rule.ShouldAlert(c.values) for name, value := range c.values.Labels { - assert.Equal(t, value, sample.Metric.Get(normalizeLabelName(name))) + assert.Equal(t, value, sample.Metric.Get(common.NormalizeLabelName(name))) } assert.Equal(t, c.expectAlert, shoulAlert, "Test case %d", idx) @@ -1243,7 +1244,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) - rule.temporalityMap = map[string]map[v3.Temporality]bool{ + rule.TemporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, }, @@ -1260,7 +1261,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx) if c.expectAlerts != 0 { foundCount := 0 - for _, item := range rule.active { + for _, item := range rule.Active { for _, summary := range c.summaryAny { if strings.Contains(item.Annotations.Get("summary"), summary) { foundCount++ @@ -1342,7 +1343,7 @@ func TestThresholdRuleNoData(t *testing.T) { reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) - rule.temporalityMap = map[string]map[v3.Temporality]bool{ + rule.TemporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, }, @@ -1357,7 +1358,7 @@ func TestThresholdRuleNoData(t *testing.T) { } assert.Equal(t, 1, retVal.(int), "case %d", idx) - for _, item := range rule.active { + for _, item := range rule.Active { if c.expectNoData { assert.True(t, strings.Contains(item.Labels.Get(labels.AlertNameLabel), "[No data]"), "case %d", idx) } else { @@ -1447,7 +1448,7 @@ func TestThresholdRuleTracesLink(t *testing.T) { reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) - rule.temporalityMap = map[string]map[v3.Temporality]bool{ + rule.TemporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, }, @@ -1465,7 +1466,7 @@ func TestThresholdRuleTracesLink(t *testing.T) { assert.Equal(t, 0, retVal.(int), "case %d", idx) } else { assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx) - for _, item := range rule.active { + for _, item := range rule.Active { for name, value := range item.Annotations.Map() { if name == "related_traces" { assert.NotEmpty(t, value, "case %d", idx) @@ -1572,7 +1573,7 @@ func TestThresholdRuleLogsLink(t *testing.T) { reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) - rule.temporalityMap = map[string]map[v3.Temporality]bool{ + rule.TemporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, }, @@ -1590,7 +1591,7 @@ func TestThresholdRuleLogsLink(t *testing.T) { assert.Equal(t, 0, retVal.(int), "case %d", idx) } else { assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx) - for _, item := range rule.active { + for _, item := range rule.Active { for name, value := range item.Annotations.Map() { if name == "related_logs" { assert.NotEmpty(t, value, "case %d", idx)