feat: add anomaly rule (#5973)

This commit is contained in:
Srikanth Chekuri 2024-09-24 10:22:52 +05:30 committed by GitHub
parent df2844ea74
commit 419d2da363
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 799 additions and 180 deletions

View File

@ -16,6 +16,10 @@ const (
SeasonalityWeekly Seasonality = "weekly"
)
func (s Seasonality) String() string {
return string(s)
}
var (
oneWeekOffset = 24 * 7 * time.Hour.Milliseconds()
oneDayOffset = 24 * time.Hour.Milliseconds()

View File

@ -67,6 +67,7 @@ func (p *BaseSeasonalProvider) getQueryParams(req *GetAnomaliesRequest) *anomaly
}
func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQueryParams) (*anomalyQueryResults, error) {
zap.L().Info("fetching results for current period", zap.Any("currentPeriodQuery", params.CurrentPeriodQuery))
currentPeriodResults, _, err := p.querierV2.QueryRange(ctx, params.CurrentPeriodQuery)
if err != nil {
return nil, err
@ -77,6 +78,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQu
return nil, err
}
zap.L().Info("fetching results for past period", zap.Any("pastPeriodQuery", params.PastPeriodQuery))
pastPeriodResults, _, err := p.querierV2.QueryRange(ctx, params.PastPeriodQuery)
if err != nil {
return nil, err
@ -87,6 +89,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQu
return nil, err
}
zap.L().Info("fetching results for current season", zap.Any("currentSeasonQuery", params.CurrentSeasonQuery))
currentSeasonResults, _, err := p.querierV2.QueryRange(ctx, params.CurrentSeasonQuery)
if err != nil {
return nil, err
@ -97,6 +100,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQu
return nil, err
}
zap.L().Info("fetching results for past season", zap.Any("pastSeasonQuery", params.PastSeasonQuery))
pastSeasonResults, _, err := p.querierV2.QueryRange(ctx, params.PastSeasonQuery)
if err != nil {
return nil, err
@ -107,6 +111,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQu
return nil, err
}
zap.L().Info("fetching results for past 2 season", zap.Any("past2SeasonQuery", params.Past2SeasonQuery))
past2SeasonResults, _, err := p.querierV2.QueryRange(ctx, params.Past2SeasonQuery)
if err != nil {
return nil, err
@ -117,6 +122,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQu
return nil, err
}
zap.L().Info("fetching results for past 3 season", zap.Any("past3SeasonQuery", params.Past3SeasonQuery))
past3SeasonResults, _, err := p.querierV2.QueryRange(ctx, params.Past3SeasonQuery)
if err != nil {
return nil, err
@ -184,7 +190,7 @@ func (p *BaseSeasonalProvider) getMovingAvg(series *v3.Series, movingAvgWindowSi
return 0
}
if startIdx >= len(series.Points)-movingAvgWindowSize {
startIdx = len(series.Points) - movingAvgWindowSize
startIdx = int(math.Max(0, float64(len(series.Points)-movingAvgWindowSize)))
}
var sum float64
points := series.Points[startIdx:]
@ -250,7 +256,7 @@ func (p *BaseSeasonalProvider) getPredictedSeries(
// moving avg of the previous period series + z score threshold * std dev of the series
// moving avg of the previous period series - z score threshold * std dev of the series
func (p *BaseSeasonalProvider) getBounds(
series, prevSeries, _, _, _, _ *v3.Series,
series, predictedSeries *v3.Series,
zScoreThreshold float64,
) (*v3.Series, *v3.Series) {
upperBoundSeries := &v3.Series{
@ -266,8 +272,8 @@ func (p *BaseSeasonalProvider) getBounds(
}
for idx, curr := range series.Points {
upperBound := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) + zScoreThreshold*p.getStdDev(series)
lowerBound := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) - zScoreThreshold*p.getStdDev(series)
upperBound := p.getMovingAvg(predictedSeries, movingAvgWindowSize, idx) + zScoreThreshold*p.getStdDev(series)
lowerBound := p.getMovingAvg(predictedSeries, movingAvgWindowSize, idx) - zScoreThreshold*p.getStdDev(series)
upperBoundSeries.Points = append(upperBoundSeries.Points, v3.Point{
Timestamp: curr.Timestamp,
Value: upperBound,
@ -431,11 +437,7 @@ func (p *BaseSeasonalProvider) getAnomalies(ctx context.Context, req *GetAnomali
upperBoundSeries, lowerBoundSeries := p.getBounds(
series,
pastPeriodSeries,
currentSeasonSeries,
pastSeasonSeries,
past2SeasonSeries,
past3SeasonSeries,
predictedSeries,
zScoreThreshold,
)
result.UpperBoundSeries = append(result.UpperBoundSeries, upperBoundSeries)

View File

@ -177,6 +177,8 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *baseapp.AuthMiddlew
am.ViewAccess(ah.listLicensesV2)).
Methods(http.MethodGet)
router.HandleFunc("/api/v4/query_range", am.ViewAccess(ah.queryRangeV4)).Methods(http.MethodPost)
// Gateway
router.PathPrefix(gateway.RoutePrefix).HandlerFunc(am.AdminAccess(ah.ServeGatewayHTTP))

View File

@ -0,0 +1,119 @@
package api
import (
"bytes"
"fmt"
"io"
"net/http"
"go.signoz.io/signoz/ee/query-service/anomaly"
baseapp "go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.uber.org/zap"
)
func (aH *APIHandler) queryRangeV4(w http.ResponseWriter, r *http.Request) {
bodyBytes, _ := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
queryRangeParams, apiErrorObj := baseapp.ParseQueryRangeParams(r)
if apiErrorObj != nil {
zap.L().Error("error parsing metric query range params", zap.Error(apiErrorObj.Err))
RespondError(w, apiErrorObj, nil)
return
}
queryRangeParams.Version = "v4"
// add temporality for each metric
temporalityErr := aH.PopulateTemporality(r.Context(), queryRangeParams)
if temporalityErr != nil {
zap.L().Error("Error while adding temporality for metrics", zap.Error(temporalityErr))
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
return
}
anomalyQueryExists := false
anomalyQuery := &v3.BuilderQuery{}
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
for _, query := range queryRangeParams.CompositeQuery.BuilderQueries {
for _, fn := range query.Functions {
if fn.Name == v3.FunctionNameAnomaly {
anomalyQueryExists = true
anomalyQuery = query
break
}
}
}
}
if anomalyQueryExists {
// ensure all queries have metric data source, and there should be only one anomaly query
for _, query := range queryRangeParams.CompositeQuery.BuilderQueries {
if query.DataSource != v3.DataSourceMetrics {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("all queries must have metric data source")}, nil)
return
}
}
// get the threshold, and seasonality from the anomaly query
var seasonality anomaly.Seasonality
for _, fn := range anomalyQuery.Functions {
if fn.Name == v3.FunctionNameAnomaly {
seasonalityStr, ok := fn.NamedArgs["seasonality"].(string)
if !ok {
seasonalityStr = "daily"
}
if seasonalityStr == "weekly" {
seasonality = anomaly.SeasonalityWeekly
} else if seasonalityStr == "daily" {
seasonality = anomaly.SeasonalityDaily
} else {
seasonality = anomaly.SeasonalityHourly
}
break
}
}
var provider anomaly.Provider
switch seasonality {
case anomaly.SeasonalityWeekly:
provider = anomaly.NewWeeklyProvider(
anomaly.WithCache[*anomaly.WeeklyProvider](aH.opts.Cache),
anomaly.WithKeyGenerator[*anomaly.WeeklyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.WeeklyProvider](aH.opts.DataConnector),
anomaly.WithFeatureLookup[*anomaly.WeeklyProvider](aH.opts.FeatureFlags),
)
case anomaly.SeasonalityDaily:
provider = anomaly.NewDailyProvider(
anomaly.WithCache[*anomaly.DailyProvider](aH.opts.Cache),
anomaly.WithKeyGenerator[*anomaly.DailyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.DailyProvider](aH.opts.DataConnector),
anomaly.WithFeatureLookup[*anomaly.DailyProvider](aH.opts.FeatureFlags),
)
case anomaly.SeasonalityHourly:
provider = anomaly.NewHourlyProvider(
anomaly.WithCache[*anomaly.HourlyProvider](aH.opts.Cache),
anomaly.WithKeyGenerator[*anomaly.HourlyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.HourlyProvider](aH.opts.DataConnector),
anomaly.WithFeatureLookup[*anomaly.HourlyProvider](aH.opts.FeatureFlags),
)
}
anomalies, err := provider.GetAnomalies(r.Context(), &anomaly.GetAnomaliesRequest{Params: queryRangeParams})
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
uniqueResults := make(map[string]*v3.Result)
for _, anomaly := range anomalies.Results {
uniqueResults[anomaly.QueryName] = anomaly
uniqueResults[anomaly.QueryName].IsAnomaly = true
}
aH.Respond(w, uniqueResults)
} else {
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
aH.QueryRangeV4(w, r)
}
}

View File

@ -170,6 +170,14 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err
}
}
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
<-readerReady
rm, err := makeRulesManager(serverOptions.PromConfigPath,
@ -177,6 +185,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.RuleRepoURL,
localDB,
reader,
c,
serverOptions.DisableRules,
lm,
serverOptions.UseLogsNewSchema,
@ -237,15 +246,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
telemetry.GetInstance().SetReader(reader)
telemetry.GetInstance().SetSaasOperator(constants.SaasSegmentKey)
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval)
if err != nil {
@ -732,6 +732,7 @@ func makeRulesManager(
ruleRepoURL string,
db *sqlx.DB,
ch baseint.Reader,
cache cache.Cache,
disableRules bool,
fm baseint.FeatureLookup,
useLogsNewSchema bool) (*baserules.Manager, error) {
@ -760,6 +761,7 @@ func makeRulesManager(
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
Cache: cache,
EvalDelay: baseconst.GetEvalDelay(),
PrepareTaskFunc: rules.PrepareTaskFunc,

View File

@ -127,6 +127,13 @@ var BasicPlan = basemodel.FeatureSet{
UsageLimit: -1,
Route: "",
},
basemodel.Feature{
Name: basemodel.AnomalyDetection,
Active: false,
Usage: 0,
UsageLimit: -1,
Route: "",
},
}
var ProPlan = basemodel.FeatureSet{
@ -235,6 +242,13 @@ var ProPlan = basemodel.FeatureSet{
UsageLimit: -1,
Route: "",
},
basemodel.Feature{
Name: basemodel.AnomalyDetection,
Active: true,
Usage: 0,
UsageLimit: -1,
Route: "",
},
}
var EnterprisePlan = basemodel.FeatureSet{
@ -357,4 +371,11 @@ var EnterprisePlan = basemodel.FeatureSet{
UsageLimit: -1,
Route: "",
},
basemodel.Feature{
Name: basemodel.AnomalyDetection,
Active: true,
Usage: 0,
UsageLimit: -1,
Route: "",
},
}

View File

@ -0,0 +1,393 @@
package rules
import (
"context"
"encoding/json"
"fmt"
"math"
"strings"
"sync"
"time"
"go.uber.org/zap"
"go.signoz.io/signoz/ee/query-service/anomaly"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/model"
querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
"go.signoz.io/signoz/pkg/query-service/interfaces"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils/labels"
"go.signoz.io/signoz/pkg/query-service/utils/times"
"go.signoz.io/signoz/pkg/query-service/utils/timestamp"
"go.signoz.io/signoz/pkg/query-service/formatter"
baserules "go.signoz.io/signoz/pkg/query-service/rules"
yaml "gopkg.in/yaml.v2"
)
const (
RuleTypeAnomaly = "anomaly_rule"
)
type AnomalyRule struct {
*baserules.BaseRule
mtx sync.Mutex
reader interfaces.Reader
// querierV2 is used for alerts created after the introduction of new metrics query builder
querierV2 interfaces.Querier
provider anomaly.Provider
seasonality anomaly.Seasonality
}
func NewAnomalyRule(
id string,
p *baserules.PostableRule,
featureFlags interfaces.FeatureLookup,
reader interfaces.Reader,
cache cache.Cache,
opts ...baserules.RuleOption,
) (*AnomalyRule, error) {
zap.L().Info("creating new AnomalyRule", zap.String("id", id), zap.Any("opts", opts))
baseRule, err := baserules.NewBaseRule(id, p, reader, opts...)
if err != nil {
return nil, err
}
t := AnomalyRule{
BaseRule: baseRule,
}
switch strings.ToLower(p.RuleCondition.Seasonality) {
case "hourly":
t.seasonality = anomaly.SeasonalityHourly
case "daily":
t.seasonality = anomaly.SeasonalityDaily
case "weekly":
t.seasonality = anomaly.SeasonalityWeekly
default:
t.seasonality = anomaly.SeasonalityDaily
}
zap.L().Info("using seasonality", zap.String("seasonality", t.seasonality.String()))
querierOptsV2 := querierV2.QuerierOptions{
Reader: reader,
Cache: cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FeatureLookup: featureFlags,
}
t.querierV2 = querierV2.NewQuerier(querierOptsV2)
t.reader = reader
if t.seasonality == anomaly.SeasonalityHourly {
t.provider = anomaly.NewHourlyProvider(
anomaly.WithCache[*anomaly.HourlyProvider](cache),
anomaly.WithKeyGenerator[*anomaly.HourlyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.HourlyProvider](reader),
anomaly.WithFeatureLookup[*anomaly.HourlyProvider](featureFlags),
)
} else if t.seasonality == anomaly.SeasonalityDaily {
t.provider = anomaly.NewDailyProvider(
anomaly.WithCache[*anomaly.DailyProvider](cache),
anomaly.WithKeyGenerator[*anomaly.DailyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.DailyProvider](reader),
anomaly.WithFeatureLookup[*anomaly.DailyProvider](featureFlags),
)
} else if t.seasonality == anomaly.SeasonalityWeekly {
t.provider = anomaly.NewWeeklyProvider(
anomaly.WithCache[*anomaly.WeeklyProvider](cache),
anomaly.WithKeyGenerator[*anomaly.WeeklyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.WeeklyProvider](reader),
anomaly.WithFeatureLookup[*anomaly.WeeklyProvider](featureFlags),
)
}
return &t, nil
}
func (r *AnomalyRule) Type() baserules.RuleType {
return RuleTypeAnomaly
}
func (r *AnomalyRule) prepareQueryRange(ts time.Time) (*v3.QueryRangeParamsV3, error) {
zap.L().Info("prepareQueryRange", zap.Int64("ts", ts.UnixMilli()), zap.Int64("evalWindow", r.EvalWindow().Milliseconds()), zap.Int64("evalDelay", r.EvalDelay().Milliseconds()))
start := ts.Add(-time.Duration(r.EvalWindow())).UnixMilli()
end := ts.UnixMilli()
if r.EvalDelay() > 0 {
start = start - int64(r.EvalDelay().Milliseconds())
end = end - int64(r.EvalDelay().Milliseconds())
}
// round to minute otherwise we could potentially miss data
start = start - (start % (60 * 1000))
end = end - (end % (60 * 1000))
compositeQuery := r.Condition().CompositeQuery
if compositeQuery.PanelType != v3.PanelTypeGraph {
compositeQuery.PanelType = v3.PanelTypeGraph
}
// default mode
return &v3.QueryRangeParamsV3{
Start: start,
End: end,
Step: int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)),
CompositeQuery: compositeQuery,
Variables: make(map[string]interface{}, 0),
NoCache: false,
}, nil
}
func (r *AnomalyRule) GetSelectedQuery() string {
return r.Condition().GetSelectedQueryName()
}
func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, ts time.Time) (baserules.Vector, error) {
params, err := r.prepareQueryRange(ts)
if err != nil {
return nil, err
}
err = r.PopulateTemporality(ctx, params)
if err != nil {
return nil, fmt.Errorf("internal error while setting temporality")
}
anomalies, err := r.provider.GetAnomalies(ctx, &anomaly.GetAnomaliesRequest{
Params: params,
Seasonality: r.seasonality,
})
if err != nil {
return nil, err
}
var queryResult *v3.Result
for _, result := range anomalies.Results {
if result.QueryName == r.GetSelectedQuery() {
queryResult = result
break
}
}
var resultVector baserules.Vector
scoresJSON, _ := json.Marshal(queryResult.AnomalyScores)
zap.L().Info("anomaly scores", zap.String("scores", string(scoresJSON)))
for _, series := range queryResult.AnomalyScores {
smpl, shouldAlert := r.ShouldAlert(*series)
if shouldAlert {
resultVector = append(resultVector, smpl)
}
}
return resultVector, nil
}
func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) {
prevState := r.State()
valueFormatter := formatter.FromUnit(r.Unit())
res, err := r.buildAndRunQuery(ctx, ts)
if err != nil {
return nil, err
}
r.mtx.Lock()
defer r.mtx.Unlock()
resultFPs := map[uint64]struct{}{}
var alerts = make(map[uint64]*baserules.Alert, len(res))
for _, smpl := range res {
l := make(map[string]string, len(smpl.Metric))
for _, lbl := range smpl.Metric {
l[lbl.Name] = lbl.Value
}
value := valueFormatter.Format(smpl.V, r.Unit())
threshold := valueFormatter.Format(r.TargetVal(), r.Unit())
zap.L().Debug("Alert template data for rule", zap.String("name", r.Name()), zap.String("formatter", valueFormatter.Name()), zap.String("value", value), zap.String("threshold", threshold))
tmplData := baserules.AlertTemplateData(l, value, threshold)
// Inject some convenience variables that are easier to remember for users
// who are not used to Go's templating system.
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
// utility function to apply go template on labels and annotations
expand := func(text string) string {
tmpl := baserules.NewTemplateExpander(
ctx,
defs+text,
"__alert_"+r.Name(),
tmplData,
times.Time(timestamp.FromTime(ts)),
nil,
)
result, err := tmpl.Expand()
if err != nil {
result = fmt.Sprintf("<error expanding template: %s>", err)
zap.L().Error("Expanding alert template failed", zap.Error(err), zap.Any("data", tmplData))
}
return result
}
lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel)
resultLabels := labels.NewBuilder(smpl.MetricOrig).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel).Labels()
for name, value := range r.Labels().Map() {
lb.Set(name, expand(value))
}
lb.Set(labels.AlertNameLabel, r.Name())
lb.Set(labels.AlertRuleIdLabel, r.ID())
lb.Set(labels.RuleSourceLabel, r.GeneratorURL())
annotations := make(labels.Labels, 0, len(r.Annotations().Map()))
for name, value := range r.Annotations().Map() {
annotations = append(annotations, labels.Label{Name: common.NormalizeLabelName(name), Value: expand(value)})
}
if smpl.IsMissing {
lb.Set(labels.AlertNameLabel, "[No data] "+r.Name())
}
lbs := lb.Labels()
h := lbs.Hash()
resultFPs[h] = struct{}{}
if _, ok := alerts[h]; ok {
zap.L().Error("the alert query returns duplicate records", zap.String("ruleid", r.ID()), zap.Any("alert", alerts[h]))
err = fmt.Errorf("duplicate alert found, vector contains metrics with the same labelset after applying alert labels")
return nil, err
}
alerts[h] = &baserules.Alert{
Labels: lbs,
QueryResultLables: resultLabels,
Annotations: annotations,
ActiveAt: ts,
State: model.StatePending,
Value: smpl.V,
GeneratorURL: r.GeneratorURL(),
Receivers: r.PreferredChannels(),
Missing: smpl.IsMissing,
}
}
zap.L().Info("number of alerts found", zap.String("name", r.Name()), zap.Int("count", len(alerts)))
// alerts[h] is ready, add or update active list now
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
// Update the last value and annotations if so, create a new alert entry otherwise.
if alert, ok := r.Active[h]; ok && alert.State != model.StateInactive {
alert.Value = a.Value
alert.Annotations = a.Annotations
alert.Receivers = r.PreferredChannels()
continue
}
r.Active[h] = a
}
itemsToAdd := []model.RuleStateHistory{}
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.Active {
labelsJSON, err := json.Marshal(a.QueryResultLables)
if err != nil {
zap.L().Error("error marshaling labels", zap.Error(err), zap.Any("labels", a.Labels))
}
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > baserules.ResolvedRetention) {
delete(r.Active, fp)
}
if a.State != model.StateInactive {
a.State = model.StateInactive
a.ResolvedAt = ts
itemsToAdd = append(itemsToAdd, model.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: model.StateInactive,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: model.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLables.Hash(),
Value: a.Value,
})
}
continue
}
if a.State == model.StatePending && ts.Sub(a.ActiveAt) >= r.HoldDuration() {
a.State = model.StateFiring
a.FiredAt = ts
state := model.StateFiring
if a.Missing {
state = model.StateNoData
}
itemsToAdd = append(itemsToAdd, model.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: state,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: model.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLables.Hash(),
Value: a.Value,
})
}
}
currentState := r.State()
overallStateChanged := currentState != prevState
for idx, item := range itemsToAdd {
item.OverallStateChanged = overallStateChanged
item.OverallState = currentState
itemsToAdd[idx] = item
}
r.RecordRuleStateHistory(ctx, prevState, currentState, itemsToAdd)
return len(r.Active), nil
}
func (r *AnomalyRule) String() string {
ar := baserules.PostableRule{
AlertName: r.Name(),
RuleCondition: r.Condition(),
EvalWindow: baserules.Duration(r.EvalWindow()),
Labels: r.Labels().Map(),
Annotations: r.Annotations().Map(),
PreferredChannels: r.PreferredChannels(),
}
byt, err := yaml.Marshal(ar)
if err != nil {
return fmt.Sprintf("error marshaling alerting rule: %s", err.Error())
}
return string(byt)
}

View File

@ -53,6 +53,25 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
// create promql rule task for evalution
task = newTask(baserules.TaskTypeProm, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.RuleDB)
} else if opts.Rule.RuleType == baserules.RuleTypeAnomaly {
// create anomaly rule
ar, err := NewAnomalyRule(
ruleId,
opts.Rule,
opts.FF,
opts.Reader,
opts.Cache,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
)
if err != nil {
return task, err
}
rules = append(rules, ar)
// create anomaly rule task for evalution
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.RuleDB)
} else {
return nil, fmt.Errorf("unsupported rule type. Supported types: %s, %s", baserules.RuleTypeProm, baserules.RuleTypeThreshold)
}

View File

@ -518,7 +518,7 @@ func (aH *APIHandler) getRule(w http.ResponseWriter, r *http.Request) {
}
// populateTemporality adds the temporality to the query if it is not present
func (aH *APIHandler) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
func (aH *APIHandler) PopulateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
aH.temporalityMux.Lock()
defer aH.temporalityMux.Unlock()
@ -3791,7 +3791,7 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
}
// add temporality for each metric
temporalityErr := aH.populateTemporality(r.Context(), queryRangeParams)
temporalityErr := aH.PopulateTemporality(r.Context(), queryRangeParams)
if temporalityErr != nil {
zap.L().Error("Error while adding temporality for metrics", zap.Error(temporalityErr))
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
@ -4139,7 +4139,7 @@ func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) {
queryRangeParams.Version = "v4"
// add temporality for each metric
temporalityErr := aH.populateTemporality(r.Context(), queryRangeParams)
temporalityErr := aH.PopulateTemporality(r.Context(), queryRangeParams)
if temporalityErr != nil {
zap.L().Error("Error while adding temporality for metrics", zap.Error(temporalityErr))
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)

View File

@ -144,9 +144,20 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err
}
}
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
<-readerReady
rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema)
rm, err := makeRulesManager(
serverOptions.PromConfigPath,
constants.GetAlertManagerApiPrefix(),
serverOptions.RuleRepoURL, localDB, reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema)
if err != nil {
return nil, err
}
@ -158,15 +169,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
}()
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval)
if err != nil {
return nil, err
@ -715,6 +717,7 @@ func makeRulesManager(
ruleRepoURL string,
db *sqlx.DB,
ch interfaces.Reader,
cache cache.Cache,
disableRules bool,
fm interfaces.FeatureLookup,
useLogsNewSchema bool) (*rules.Manager, error) {
@ -743,6 +746,7 @@ func makeRulesManager(
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
Cache: cache,
EvalDelay: constants.GetEvalDelay(),
UseLogsNewSchema: useLogsNewSchema,
}

View File

@ -2,7 +2,9 @@ package common
import (
"math"
"regexp"
"time"
"unicode"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
@ -73,6 +75,23 @@ func LCMList(nums []int64) int64 {
return result
}
func NormalizeLabelName(name string) string {
// See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
// Regular expression to match non-alphanumeric characters except underscores
reg := regexp.MustCompile(`[^a-zA-Z0-9_]`)
// Replace all non-alphanumeric characters except underscores with underscores
normalized := reg.ReplaceAllString(name, "_")
// If the first character is not a letter or an underscore, prepend an underscore
if len(normalized) > 0 && !unicode.IsLetter(rune(normalized[0])) && normalized[0] != '_' {
normalized = "_" + normalized
}
return normalized
}
func GetSeriesFromCachedData(data []querycache.CachedSeriesData, start, end int64) []*v3.Series {
series := make(map[uint64]*v3.Series)

View File

@ -22,6 +22,7 @@ const AlertChannelPagerduty = "ALERT_CHANNEL_PAGERDUTY"
const AlertChannelMsTeams = "ALERT_CHANNEL_MSTEAMS"
const AlertChannelOpsgenie = "ALERT_CHANNEL_OPSGENIE"
const AlertChannelEmail = "ALERT_CHANNEL_EMAIL"
const AnomalyDetection = "ANOMALY_DETECTION"
var BasicPlan = FeatureSet{
Feature{
@ -115,4 +116,11 @@ var BasicPlan = FeatureSet{
UsageLimit: -1,
Route: "",
},
Feature{
Name: AnomalyDetection,
Active: false,
Usage: 0,
UsageLimit: -1,
Route: "",
},
}

View File

@ -19,7 +19,7 @@ import (
const (
// how long before re-sending the alert
resolvedRetention = 15 * time.Minute
ResolvedRetention = 15 * time.Minute
TestAlertPostFix = "_TEST_ALERT"
)
@ -29,6 +29,7 @@ type RuleType string
const (
RuleTypeThreshold = "threshold_rule"
RuleTypeProm = "promql_rule"
RuleTypeAnomaly = "anomaly_rule"
)
type RuleHealth string
@ -83,27 +84,16 @@ type NamedAlert struct {
type CompareOp string
const (
CompareOpNone CompareOp = "0"
ValueIsAbove CompareOp = "1"
ValueIsBelow CompareOp = "2"
ValueIsEq CompareOp = "3"
ValueIsNotEq CompareOp = "4"
CompareOpNone CompareOp = "0"
ValueIsAbove CompareOp = "1"
ValueIsBelow CompareOp = "2"
ValueIsEq CompareOp = "3"
ValueIsNotEq CompareOp = "4"
ValueAboveOrEq CompareOp = "5"
ValueBelowOrEq CompareOp = "6"
ValueOutsideBounds CompareOp = "7"
)
func ResolveCompareOp(cop CompareOp) string {
switch cop {
case ValueIsAbove:
return ">"
case ValueIsBelow:
return "<"
case ValueIsEq:
return "=="
case ValueIsNotEq:
return "!="
}
return ""
}
type MatchType string
const (
@ -123,6 +113,8 @@ type RuleCondition struct {
AbsentFor uint64 `yaml:"absentFor,omitempty" json:"absentFor,omitempty"`
MatchType MatchType `json:"matchType,omitempty"`
TargetUnit string `json:"targetUnit,omitempty"`
Algorithm string `json:"algorithm,omitempty"`
Seasonality string `json:"seasonality,omitempty"`
SelectedQuery string `json:"selectedQueryName,omitempty"`
}

View File

@ -133,7 +133,9 @@ func parseIntoRule(initRule PostableRule, content []byte, kind RuleDataKind) (*P
if rule.RuleCondition != nil {
if rule.RuleCondition.CompositeQuery.QueryType == v3.QueryTypeBuilder {
rule.RuleType = RuleTypeThreshold
if rule.RuleType == "" {
rule.RuleType = RuleTypeThreshold
}
} else if rule.RuleCondition.CompositeQuery.QueryType == v3.QueryTypePromQL {
rule.RuleType = RuleTypeProm
}

View File

@ -8,6 +8,7 @@ import (
"sync"
"time"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/converter"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
@ -53,7 +54,7 @@ type BaseRule struct {
health RuleHealth
lastError error
active map[uint64]*Alert
Active map[uint64]*Alert
// lastTimestampWithDatapoints is the timestamp of the last datapoint we observed
// for this rule
@ -72,6 +73,12 @@ type BaseRule struct {
// sendAlways will send alert irresepective of resendDelay
// or other params
sendAlways bool
// TemporalityMap is a map of metric name to temporality
// to avoid fetching temporality for the same metric multiple times
// querying the v4 table on low cardinal temporality column
// should be fast but we can still avoid the query if we have the data in memory
TemporalityMap map[string]map[v3.Temporality]bool
}
type RuleOption func(*BaseRule)
@ -116,8 +123,9 @@ func NewBaseRule(id string, p *PostableRule, reader interfaces.Reader, opts ...R
annotations: qslabels.FromMap(p.Annotations),
preferredChannels: p.PreferredChannels,
health: HealthUnknown,
active: map[uint64]*Alert{},
Active: map[uint64]*Alert{},
reader: reader,
TemporalityMap: make(map[string]map[v3.Temporality]bool),
}
if baseRule.evalWindow == 0 {
@ -165,14 +173,30 @@ func (r *BaseRule) currentAlerts() []*Alert {
r.mtx.Lock()
defer r.mtx.Unlock()
alerts := make([]*Alert, 0, len(r.active))
for _, a := range r.active {
alerts := make([]*Alert, 0, len(r.Active))
for _, a := range r.Active {
anew := *a
alerts = append(alerts, &anew)
}
return alerts
}
func (r *BaseRule) EvalDelay() time.Duration {
return r.evalDelay
}
func (r *BaseRule) EvalWindow() time.Duration {
return r.evalWindow
}
func (r *BaseRule) HoldDuration() time.Duration {
return r.holdDuration
}
func (r *BaseRule) TargetVal() float64 {
return r.targetVal()
}
func (r *ThresholdRule) hostFromSource() string {
parsedUrl, err := url.Parse(r.source)
if err != nil {
@ -267,7 +291,7 @@ func (r *BaseRule) GetEvaluationTimestamp() time.Time {
func (r *BaseRule) State() model.AlertState {
maxState := model.StateInactive
for _, a := range r.active {
for _, a := range r.Active {
if a.State > maxState {
maxState = a.State
}
@ -306,12 +330,12 @@ func (r *BaseRule) ForEachActiveAlert(f func(*Alert)) {
r.mtx.Lock()
defer r.mtx.Unlock()
for _, a := range r.active {
for _, a := range r.Active {
f(a)
}
}
func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) {
func (r *BaseRule) ShouldAlert(series v3.Series) (Sample, bool) {
var alertSmpl Sample
var shouldAlert bool
var lbls qslabels.Labels
@ -319,7 +343,7 @@ func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) {
for name, value := range series.Labels {
lbls = append(lbls, qslabels.Label{Name: name, Value: value})
lblsNormalized = append(lblsNormalized, qslabels.Label{Name: normalizeLabelName(name), Value: value})
lblsNormalized = append(lblsNormalized, qslabels.Label{Name: common.NormalizeLabelName(name), Value: value})
}
series.Points = removeGroupinSetPoints(series)
@ -364,6 +388,14 @@ func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) {
break
}
}
} else if r.compareOp() == ValueOutsideBounds {
for _, smpl := range series.Points {
if math.Abs(smpl.Value) >= r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
shouldAlert = true
break
}
}
}
case AllTheTimes:
// If all samples match the condition, the rule is firing.
@ -425,6 +457,14 @@ func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) {
}
}
}
} else if r.compareOp() == ValueOutsideBounds {
for _, smpl := range series.Points {
if math.Abs(smpl.Value) >= r.targetVal() {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls}
shouldAlert = true
break
}
}
}
case OnAverage:
// If the average of all samples matches the condition, the rule is firing.
@ -454,6 +494,10 @@ func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) {
if avg != r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueOutsideBounds {
if math.Abs(avg) >= r.targetVal() {
shouldAlert = true
}
}
case InTotal:
// If the sum of all samples matches the condition, the rule is firing.
@ -482,6 +526,10 @@ func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) {
if sum != r.targetVal() {
shouldAlert = true
}
} else if r.compareOp() == ValueOutsideBounds {
if math.Abs(sum) >= r.targetVal() {
shouldAlert = true
}
}
case Last:
// If the last sample matches the condition, the rule is firing.
@ -602,3 +650,59 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren
return nil
}
func (r *BaseRule) PopulateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
missingTemporality := make([]string, 0)
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
for _, query := range qp.CompositeQuery.BuilderQueries {
// if there is no temporality specified in the query but we have it in the map
// then use the value from the map
if query.Temporality == "" && r.TemporalityMap[query.AggregateAttribute.Key] != nil {
// We prefer delta if it is available
if r.TemporalityMap[query.AggregateAttribute.Key][v3.Delta] {
query.Temporality = v3.Delta
} else if r.TemporalityMap[query.AggregateAttribute.Key][v3.Cumulative] {
query.Temporality = v3.Cumulative
} else {
query.Temporality = v3.Unspecified
}
}
// we don't have temporality for this metric
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
missingTemporality = append(missingTemporality, query.AggregateAttribute.Key)
}
if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok {
metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool)
}
}
}
var nameToTemporality map[string]map[v3.Temporality]bool
var err error
if len(missingTemporality) > 0 {
nameToTemporality, err = r.reader.FetchTemporality(ctx, missingTemporality)
if err != nil {
return err
}
}
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
for name := range qp.CompositeQuery.BuilderQueries {
query := qp.CompositeQuery.BuilderQueries[name]
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
if nameToTemporality[query.AggregateAttribute.Key][v3.Delta] {
query.Temporality = v3.Delta
} else if nameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] {
query.Temporality = v3.Cumulative
} else {
query.Temporality = v3.Unspecified
}
r.TemporalityMap[query.AggregateAttribute.Key] = nameToTemporality[query.AggregateAttribute.Key]
}
}
}
return nil
}

View File

@ -18,6 +18,7 @@ import (
"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/cache"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
@ -32,6 +33,7 @@ type PrepareTaskOptions struct {
RuleDB RuleDB
Logger *zap.Logger
Reader interfaces.Reader
Cache cache.Cache
FF interfaces.FeatureLookup
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
@ -73,6 +75,7 @@ type ManagerOptions struct {
DisableRules bool
FeatureFlags interfaces.FeatureLookup
Reader interfaces.Reader
Cache cache.Cache
EvalDelay time.Duration
@ -96,9 +99,9 @@ type Manager struct {
logger *zap.Logger
featureFlags interfaces.FeatureLookup
reader interfaces.Reader
featureFlags interfaces.FeatureLookup
reader interfaces.Reader
cache cache.Cache
prepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
UseLogsNewSchema bool
@ -209,6 +212,7 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
logger: o.Logger,
featureFlags: o.FeatureFlags,
reader: o.Reader,
cache: o.Cache,
prepareTaskFunc: o.PrepareTaskFunc,
}
return m, nil
@ -342,6 +346,7 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {
RuleDB: m.ruleDB,
Logger: m.logger,
Reader: m.reader,
Cache: m.cache,
FF: m.featureFlags,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
@ -463,6 +468,7 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error {
RuleDB: m.ruleDB,
Logger: m.logger,
Reader: m.reader,
Cache: m.cache,
FF: m.featureFlags,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),

View File

@ -131,7 +131,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
continue
}
alertSmpl, shouldAlert := r.shouldAlert(toCommonSeries(series))
alertSmpl, shouldAlert := r.ShouldAlert(toCommonSeries(series))
if !shouldAlert {
continue
}
@ -208,21 +208,21 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
// Update the last value and annotations if so, create a new alert entry otherwise.
if alert, ok := r.active[h]; ok && alert.State != model.StateInactive {
if alert, ok := r.Active[h]; ok && alert.State != model.StateInactive {
alert.Value = a.Value
alert.Annotations = a.Annotations
alert.Receivers = r.preferredChannels
continue
}
r.active[h] = a
r.Active[h] = a
}
itemsToAdd := []model.RuleStateHistory{}
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.active {
for fp, a := range r.Active {
labelsJSON, err := json.Marshal(a.QueryResultLables)
if err != nil {
zap.L().Error("error marshaling labels", zap.Error(err), zap.String("name", r.Name()))
@ -230,8 +230,8 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
delete(r.active, fp)
if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > ResolvedRetention) {
delete(r.Active, fp)
}
if a.State != model.StateInactive {
a.State = model.StateInactive
@ -283,7 +283,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
r.RecordRuleStateHistory(ctx, prevState, currentState, itemsToAdd)
return len(r.active), nil
return len(r.Active), nil
}
func (r *PromRule) String() string {

View File

@ -293,8 +293,8 @@ func (g *PromRuleTask) CopyState(fromTask Task) error {
continue
}
for fp, a := range far.active {
ar.active[fp] = a
for fp, a := range far.Active {
ar.Active[fp] = a
}
ar.handledRestart = far.handledRestart
}

View File

@ -661,7 +661,7 @@ func TestPromRuleShouldAlert(t *testing.T) {
assert.NoError(t, err)
}
_, shoulAlert := rule.shouldAlert(toCommonSeries(c.values))
_, shoulAlert := rule.ShouldAlert(toCommonSeries(c.values))
assert.Equal(t, c.expectAlert, shoulAlert, "Test case %d", idx)
}
}

View File

@ -18,6 +18,9 @@ type Rule interface {
Labels() labels.BaseLabels
Annotations() labels.BaseLabels
Condition() *RuleCondition
EvalDelay() time.Duration
EvalWindow() time.Duration
HoldDuration() time.Duration
State() model.AlertState
ActiveAlerts() []*Alert

View File

@ -285,8 +285,8 @@ func (g *RuleTask) CopyState(fromTask Task) error {
continue
}
for fp, a := range far.active {
ar.active[fp] = a
for fp, a := range far.Active {
ar.Active[fp] = a
}
ar.handledRestart = far.handledRestart
}

View File

@ -6,10 +6,8 @@ import (
"encoding/json"
"fmt"
"math"
"regexp"
"text/template"
"time"
"unicode"
"go.uber.org/zap"
@ -43,11 +41,6 @@ type ThresholdRule struct {
// if the version is "v3", then we use the old querier
// if the version is "v4", then we use the new querierV2
version string
// temporalityMap is a map of metric name to temporality
// to avoid fetching temporality for the same metric multiple times
// querying the v4 table on low cardinal temporality column
// should be fast but we can still avoid the query if we have the data in memory
temporalityMap map[string]map[v3.Temporality]bool
// querier is used for alerts created before the introduction of new metrics query builder
querier interfaces.Querier
@ -76,9 +69,8 @@ func NewThresholdRule(
}
t := ThresholdRule{
BaseRule: baseRule,
version: p.Version,
temporalityMap: make(map[string]map[v3.Temporality]bool),
BaseRule: baseRule,
version: p.Version,
}
querierOption := querier.QuerierOptions{
@ -107,63 +99,6 @@ func (r *ThresholdRule) Type() RuleType {
return RuleTypeThreshold
}
// populateTemporality same as addTemporality but for v4 and better
func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
missingTemporality := make([]string, 0)
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
for _, query := range qp.CompositeQuery.BuilderQueries {
// if there is no temporality specified in the query but we have it in the map
// then use the value from the map
if query.Temporality == "" && r.temporalityMap[query.AggregateAttribute.Key] != nil {
// We prefer delta if it is available
if r.temporalityMap[query.AggregateAttribute.Key][v3.Delta] {
query.Temporality = v3.Delta
} else if r.temporalityMap[query.AggregateAttribute.Key][v3.Cumulative] {
query.Temporality = v3.Cumulative
} else {
query.Temporality = v3.Unspecified
}
}
// we don't have temporality for this metric
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
missingTemporality = append(missingTemporality, query.AggregateAttribute.Key)
}
if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok {
metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool)
}
}
}
var nameToTemporality map[string]map[v3.Temporality]bool
var err error
if len(missingTemporality) > 0 {
nameToTemporality, err = r.reader.FetchTemporality(ctx, missingTemporality)
if err != nil {
return err
}
}
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
for name := range qp.CompositeQuery.BuilderQueries {
query := qp.CompositeQuery.BuilderQueries[name]
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
if nameToTemporality[query.AggregateAttribute.Key][v3.Delta] {
query.Temporality = v3.Delta
} else if nameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] {
query.Temporality = v3.Cumulative
} else {
query.Temporality = v3.Unspecified
}
r.temporalityMap[query.AggregateAttribute.Key] = nameToTemporality[query.AggregateAttribute.Key]
}
}
}
return nil
}
func (r *ThresholdRule) prepareQueryRange(ts time.Time) (*v3.QueryRangeParamsV3, error) {
zap.L().Info("prepareQueryRange", zap.Int64("ts", ts.UnixMilli()), zap.Int64("evalWindow", r.evalWindow.Milliseconds()), zap.Int64("evalDelay", r.evalDelay.Milliseconds()))
@ -313,7 +248,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time) (Vec
if err != nil {
return nil, err
}
err = r.populateTemporality(ctx, params)
err = r.PopulateTemporality(ctx, params)
if err != nil {
return nil, fmt.Errorf("internal error while setting temporality")
}
@ -406,7 +341,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time) (Vec
}
for _, series := range queryResult.Series {
smpl, shouldAlert := r.shouldAlert(*series)
smpl, shouldAlert := r.ShouldAlert(*series)
if shouldAlert {
resultVector = append(resultVector, smpl)
}
@ -414,23 +349,6 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time) (Vec
return resultVector, nil
}
func normalizeLabelName(name string) string {
// See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
// Regular expression to match non-alphanumeric characters except underscores
reg := regexp.MustCompile(`[^a-zA-Z0-9_]`)
// Replace all non-alphanumeric characters except underscores with underscores
normalized := reg.ReplaceAllString(name, "_")
// If the first character is not a letter or an underscore, prepend an underscore
if len(normalized) > 0 && !unicode.IsLetter(rune(normalized[0])) && normalized[0] != '_' {
normalized = "_" + normalized
}
return normalized
}
func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) {
prevState := r.State()
@ -495,7 +413,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
annotations := make(labels.Labels, 0, len(r.annotations.Map()))
for name, value := range r.annotations.Map() {
annotations = append(annotations, labels.Label{Name: normalizeLabelName(name), Value: expand(value)})
annotations = append(annotations, labels.Label{Name: common.NormalizeLabelName(name), Value: expand(value)})
}
if smpl.IsMissing {
lb.Set(labels.AlertNameLabel, "[No data] "+r.Name())
@ -547,7 +465,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
// Update the last value and annotations if so, create a new alert entry otherwise.
if alert, ok := r.active[h]; ok && alert.State != model.StateInactive {
if alert, ok := r.Active[h]; ok && alert.State != model.StateInactive {
alert.Value = a.Value
alert.Annotations = a.Annotations
@ -555,13 +473,13 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
continue
}
r.active[h] = a
r.Active[h] = a
}
itemsToAdd := []model.RuleStateHistory{}
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.active {
for fp, a := range r.Active {
labelsJSON, err := json.Marshal(a.QueryResultLables)
if err != nil {
zap.L().Error("error marshaling labels", zap.Error(err), zap.Any("labels", a.Labels))
@ -569,8 +487,8 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
delete(r.active, fp)
if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > ResolvedRetention) {
delete(r.Active, fp)
}
if a.State != model.StateInactive {
a.State = model.StateInactive
@ -623,7 +541,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
r.health = HealthGood
r.lastError = err
return len(r.active), nil
return len(r.Active), nil
}
func (r *ThresholdRule) String() string {

View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/featureManager"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils/labels"
@ -800,7 +801,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) {
values.Points[i].Timestamp = time.Now().UnixMilli()
}
smpl, shoulAlert := rule.shouldAlert(c.values)
smpl, shoulAlert := rule.ShouldAlert(c.values)
assert.Equal(t, c.expectAlert, shoulAlert, "Test case %d", idx)
if shoulAlert {
assert.Equal(t, c.expectedAlertSample.Value, smpl.V, "Test case %d", idx)
@ -844,7 +845,7 @@ func TestNormalizeLabelName(t *testing.T) {
}
for _, c := range cases {
assert.Equal(t, c.expected, normalizeLabelName(c.labelName))
assert.Equal(t, c.expected, common.NormalizeLabelName(c.labelName))
}
}
@ -1007,9 +1008,9 @@ func TestThresholdRuleLabelNormalization(t *testing.T) {
values.Points[i].Timestamp = time.Now().UnixMilli()
}
sample, shoulAlert := rule.shouldAlert(c.values)
sample, shoulAlert := rule.ShouldAlert(c.values)
for name, value := range c.values.Labels {
assert.Equal(t, value, sample.Metric.Get(normalizeLabelName(name)))
assert.Equal(t, value, sample.Metric.Get(common.NormalizeLabelName(name)))
}
assert.Equal(t, c.expectAlert, shoulAlert, "Test case %d", idx)
@ -1243,7 +1244,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true)
rule.temporalityMap = map[string]map[v3.Temporality]bool{
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
},
@ -1260,7 +1261,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx)
if c.expectAlerts != 0 {
foundCount := 0
for _, item := range rule.active {
for _, item := range rule.Active {
for _, summary := range c.summaryAny {
if strings.Contains(item.Annotations.Get("summary"), summary) {
foundCount++
@ -1342,7 +1343,7 @@ func TestThresholdRuleNoData(t *testing.T) {
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true)
rule.temporalityMap = map[string]map[v3.Temporality]bool{
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
},
@ -1357,7 +1358,7 @@ func TestThresholdRuleNoData(t *testing.T) {
}
assert.Equal(t, 1, retVal.(int), "case %d", idx)
for _, item := range rule.active {
for _, item := range rule.Active {
if c.expectNoData {
assert.True(t, strings.Contains(item.Labels.Get(labels.AlertNameLabel), "[No data]"), "case %d", idx)
} else {
@ -1447,7 +1448,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true)
rule.temporalityMap = map[string]map[v3.Temporality]bool{
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
},
@ -1465,7 +1466,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
assert.Equal(t, 0, retVal.(int), "case %d", idx)
} else {
assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx)
for _, item := range rule.active {
for _, item := range rule.Active {
for name, value := range item.Annotations.Map() {
if name == "related_traces" {
assert.NotEmpty(t, value, "case %d", idx)
@ -1572,7 +1573,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true)
rule.temporalityMap = map[string]map[v3.Temporality]bool{
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
},
@ -1590,7 +1591,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
assert.Equal(t, 0, retVal.(int), "case %d", idx)
} else {
assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx)
for _, item := range rule.active {
for _, item := range rule.Active {
for name, value := range item.Annotations.Map() {
if name == "related_logs" {
assert.NotEmpty(t, value, "case %d", idx)