chore: make eval delay configurable (#5649)

This commit is contained in:
Srikanth Chekuri 2024-08-08 17:34:25 +05:30 committed by GitHub
parent 6f73bb6eca
commit f031845300
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 89 additions and 12 deletions

View File

@ -728,6 +728,7 @@ func makeRulesManager(
DisableRules: disableRules, DisableRules: disableRules,
FeatureFlags: fm, FeatureFlags: fm,
Reader: ch, Reader: ch,
EvalDelay: baseconst.GetEvalDelay(),
} }
// create Manager // create Manager

View File

@ -714,6 +714,7 @@ func makeRulesManager(
DisableRules: disableRules, DisableRules: disableRules,
FeatureFlags: fm, FeatureFlags: fm,
Reader: ch, Reader: ch,
EvalDelay: constants.GetEvalDelay(),
} }
// create Manager // create Manager

View File

@ -152,6 +152,15 @@ func GetContextTimeoutMaxAllowed() time.Duration {
return contextTimeoutDuration return contextTimeoutDuration
} }
func GetEvalDelay() time.Duration {
evalDelayStr := GetOrDefaultEnv("RULES_EVAL_DELAY", "2m")
evalDelayDuration, err := time.ParseDuration(evalDelayStr)
if err != nil {
return 0
}
return evalDelayDuration
}
var ContextTimeoutMaxAllowed = GetContextTimeoutMaxAllowed() var ContextTimeoutMaxAllowed = GetContextTimeoutMaxAllowed()
const ( const (

View File

@ -63,6 +63,8 @@ type ManagerOptions struct {
DisableRules bool DisableRules bool
FeatureFlags interfaces.FeatureLookup FeatureFlags interfaces.FeatureLookup
Reader interfaces.Reader Reader interfaces.Reader
EvalDelay time.Duration
} }
// The Manager manages recording and alerting rules. // The Manager manages recording and alerting rules.
@ -524,7 +526,9 @@ func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string
tr, err := NewThresholdRule( tr, err := NewThresholdRule(
ruleId, ruleId,
r, r,
ThresholdRuleOpts{}, ThresholdRuleOpts{
EvalDelay: m.opts.EvalDelay,
},
m.featureFlags, m.featureFlags,
m.reader, m.reader,
) )

View File

@ -75,6 +75,8 @@ type ThresholdRule struct {
querier interfaces.Querier querier interfaces.Querier
querierV2 interfaces.Querier querierV2 interfaces.Querier
evalDelay time.Duration
} }
type ThresholdRuleOpts struct { type ThresholdRuleOpts struct {
@ -86,6 +88,12 @@ type ThresholdRuleOpts struct {
// sendAlways will send alert irresepective of resendDelay // sendAlways will send alert irresepective of resendDelay
// or other params // or other params
SendAlways bool SendAlways bool
// EvalDelay is the time to wait for data to be available
// before evaluating the rule. This is useful in scenarios
// where data might not be available in the system immediately
// after the timestamp.
EvalDelay time.Duration
} }
func NewThresholdRule( func NewThresholdRule(
@ -96,6 +104,8 @@ func NewThresholdRule(
reader interfaces.Reader, reader interfaces.Reader,
) (*ThresholdRule, error) { ) (*ThresholdRule, error) {
zap.L().Info("creating new ThresholdRule", zap.String("id", id), zap.Any("opts", opts))
if p.RuleCondition == nil { if p.RuleCondition == nil {
return nil, fmt.Errorf("no rule condition") return nil, fmt.Errorf("no rule condition")
} else if !p.RuleCondition.IsValid() { } else if !p.RuleCondition.IsValid() {
@ -117,6 +127,7 @@ func NewThresholdRule(
typ: p.AlertType, typ: p.AlertType,
version: p.Version, version: p.Version,
temporalityMap: make(map[string]map[v3.Temporality]bool), temporalityMap: make(map[string]map[v3.Temporality]bool),
evalDelay: opts.EvalDelay,
} }
if int64(t.evalWindow) == 0 { if int64(t.evalWindow) == 0 {
@ -402,7 +413,6 @@ func (r *ThresholdRule) ForEachActiveAlert(f func(*Alert)) {
} }
func (r *ThresholdRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) { func (r *ThresholdRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
zap.L().Info("sending alerts", zap.String("rule", r.Name()))
alerts := []*Alert{} alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) { r.ForEachActiveAlert(func(alert *Alert) {
if r.opts.SendAlways || alert.needsSending(ts, resendDelay) { if r.opts.SendAlways || alert.needsSending(ts, resendDelay) {
@ -431,11 +441,14 @@ func (r *ThresholdRule) Unit() string {
func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 { func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 {
// todo(srikanthccv): make this configurable zap.L().Info("prepareQueryRange", zap.Int64("ts", ts.UnixMilli()), zap.Int64("evalWindow", r.evalWindow.Milliseconds()), zap.Int64("evalDelay", r.evalDelay.Milliseconds()))
// 2 minutes is reasonable time to wait for data to be available
// 60 seconds (SDK) + 10 seconds (batch) + rest for n/w + serialization + write to disk etc.. start := ts.Add(-time.Duration(r.evalWindow)).UnixMilli()
start := ts.Add(-time.Duration(r.evalWindow)).UnixMilli() - 2*60*1000 end := ts.UnixMilli()
end := ts.UnixMilli() - 2*60*1000 if r.evalDelay > 0 {
start = start - int64(r.evalDelay.Milliseconds())
end = end - int64(r.evalDelay.Milliseconds())
}
// round to minute otherwise we could potentially miss data // round to minute otherwise we could potentially miss data
start = start - (start % (60 * 1000)) start = start - (start % (60 * 1000))
end = end - (end % (60 * 1000)) end = end - (end % (60 * 1000))

View File

@ -611,7 +611,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil) rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil)
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -697,7 +697,7 @@ func TestPrepareLinksToLogs(t *testing.T) {
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil) rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil)
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -739,7 +739,7 @@ func TestPrepareLinksToTraces(t *testing.T) {
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil) rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil)
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -815,7 +815,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil) rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil)
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -834,6 +834,55 @@ func TestThresholdRuleLabelNormalization(t *testing.T) {
} }
} }
func TestThresholdRuleEvalDelay(t *testing.T) {
postableRule := PostableRule{
AlertName: "Test Eval Delay",
AlertType: "METRIC_BASED_ALERT",
RuleType: RuleTypeThreshold,
EvalWindow: Duration(5 * time.Minute),
Frequency: Duration(1 * time.Minute),
RuleCondition: &RuleCondition{
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL,
ClickHouseQueries: map[string]*v3.ClickHouseQuery{
"A": {
Query: "SELECT 1 >= {{.start_timestamp_ms}} AND 1 <= {{.end_timestamp_ms}}",
},
},
},
},
}
// 01:39:47
ts := time.Unix(1717205987, 0)
cases := []struct {
expectedQuery string
}{
// Test cases for Equals Always
{
// 01:34:00 - 01:39:00
expectedQuery: "SELECT 1 >= 1717205640000 AND 1 <= 1717205940000",
},
}
fm := featureManager.StartManager()
for idx, c := range cases {
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil) // no eval delay
if err != nil {
assert.NoError(t, err)
}
params := rule.prepareQueryRange(ts)
assert.Equal(t, c.expectedQuery, params.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx)
secondTimeParams := rule.prepareQueryRange(ts)
assert.Equal(t, c.expectedQuery, secondTimeParams.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx)
}
}
func TestThresholdRuleClickHouseTmpl(t *testing.T) { func TestThresholdRuleClickHouseTmpl(t *testing.T) {
postableRule := PostableRule{ postableRule := PostableRule{
AlertName: "Tricky Condition Tests", AlertName: "Tricky Condition Tests",
@ -868,7 +917,7 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) {
fm := featureManager.StartManager() fm := featureManager.StartManager()
for idx, c := range cases { for idx, c := range cases {
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil) rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil)
if err != nil { if err != nil {
assert.NoError(t, err) assert.NoError(t, err)
} }