mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-10-12 14:51:32 +08:00
chore: make ee init rule manager with it's own prepareTask func (#5807)
This commit is contained in:
parent
7844522691
commit
74c994fbab
@ -28,6 +28,7 @@ import (
|
||||
"go.signoz.io/signoz/ee/query-service/dao"
|
||||
"go.signoz.io/signoz/ee/query-service/integrations/gateway"
|
||||
"go.signoz.io/signoz/ee/query-service/interfaces"
|
||||
"go.signoz.io/signoz/ee/query-service/rules"
|
||||
baseauth "go.signoz.io/signoz/pkg/query-service/auth"
|
||||
"go.signoz.io/signoz/pkg/query-service/migrate"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
@ -52,7 +53,7 @@ import (
|
||||
baseint "go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
basemodel "go.signoz.io/signoz/pkg/query-service/model"
|
||||
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
|
||||
rules "go.signoz.io/signoz/pkg/query-service/rules"
|
||||
baserules "go.signoz.io/signoz/pkg/query-service/rules"
|
||||
"go.signoz.io/signoz/pkg/query-service/telemetry"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
"go.uber.org/zap"
|
||||
@ -81,7 +82,7 @@ type ServerOptions struct {
|
||||
// Server runs HTTP api service
|
||||
type Server struct {
|
||||
serverOptions *ServerOptions
|
||||
ruleManager *rules.Manager
|
||||
ruleManager *baserules.Manager
|
||||
|
||||
// public http router
|
||||
httpConn net.Listener
|
||||
@ -727,7 +728,7 @@ func makeRulesManager(
|
||||
db *sqlx.DB,
|
||||
ch baseint.Reader,
|
||||
disableRules bool,
|
||||
fm baseint.FeatureLookup) (*rules.Manager, error) {
|
||||
fm baseint.FeatureLookup) (*baserules.Manager, error) {
|
||||
|
||||
// create engine
|
||||
pqle, err := pqle.FromConfigPath(promConfigPath)
|
||||
@ -743,9 +744,9 @@ func makeRulesManager(
|
||||
}
|
||||
|
||||
// create manager opts
|
||||
managerOpts := &rules.ManagerOptions{
|
||||
managerOpts := &baserules.ManagerOptions{
|
||||
NotifierOpts: notifierOpts,
|
||||
Queriers: &rules.Queriers{
|
||||
Queriers: &baserules.Queriers{
|
||||
PqlEngine: pqle,
|
||||
Ch: ch.GetConn(),
|
||||
},
|
||||
@ -757,10 +758,12 @@ func makeRulesManager(
|
||||
FeatureFlags: fm,
|
||||
Reader: ch,
|
||||
EvalDelay: baseconst.GetEvalDelay(),
|
||||
|
||||
PrepareTaskFunc: rules.PrepareTaskFunc,
|
||||
}
|
||||
|
||||
// create Manager
|
||||
manager, err := rules.NewManager(managerOpts)
|
||||
manager, err := baserules.NewManager(managerOpts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("rule manager error: %v", err)
|
||||
}
|
||||
|
71
ee/query-service/rules/manager.go
Normal file
71
ee/query-service/rules/manager.go
Normal file
@ -0,0 +1,71 @@
|
||||
package rules
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
baserules "go.signoz.io/signoz/pkg/query-service/rules"
|
||||
)
|
||||
|
||||
func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error) {
|
||||
|
||||
rules := make([]baserules.Rule, 0)
|
||||
var task baserules.Task
|
||||
|
||||
ruleId := baserules.RuleIdFromTaskName(opts.TaskName)
|
||||
if opts.Rule.RuleType == baserules.RuleTypeThreshold {
|
||||
// create a threshold rule
|
||||
tr, err := baserules.NewThresholdRule(
|
||||
ruleId,
|
||||
opts.Rule,
|
||||
baserules.ThresholdRuleOpts{
|
||||
EvalDelay: opts.ManagerOpts.EvalDelay,
|
||||
},
|
||||
opts.FF,
|
||||
opts.Reader,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return task, err
|
||||
}
|
||||
|
||||
rules = append(rules, tr)
|
||||
|
||||
// create ch rule task for evalution
|
||||
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.RuleDB)
|
||||
|
||||
} else if opts.Rule.RuleType == baserules.RuleTypeProm {
|
||||
|
||||
// create promql rule
|
||||
pr, err := baserules.NewPromRule(
|
||||
ruleId,
|
||||
opts.Rule,
|
||||
opts.Logger,
|
||||
baserules.PromRuleOpts{},
|
||||
opts.Reader,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return task, err
|
||||
}
|
||||
|
||||
rules = append(rules, pr)
|
||||
|
||||
// create promql rule task for evalution
|
||||
task = newTask(baserules.TaskTypeProm, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.RuleDB)
|
||||
|
||||
} else {
|
||||
return nil, fmt.Errorf("unsupported rule type. Supported types: %s, %s", baserules.RuleTypeProm, baserules.RuleTypeThreshold)
|
||||
}
|
||||
|
||||
return task, nil
|
||||
}
|
||||
|
||||
// newTask returns an appropriate group for
|
||||
// rule type
|
||||
func newTask(taskType baserules.TaskType, name string, frequency time.Duration, rules []baserules.Rule, opts *baserules.ManagerOptions, notify baserules.NotifyFunc, ruleDB baserules.RuleDB) baserules.Task {
|
||||
if taskType == baserules.TaskTypeCh {
|
||||
return baserules.NewRuleTask(name, "", frequency, rules, opts, notify, ruleDB)
|
||||
}
|
||||
return baserules.NewPromRuleTask(name, "", frequency, rules, opts, notify, ruleDB)
|
||||
}
|
@ -38,7 +38,7 @@ type PrepareTaskOptions struct {
|
||||
|
||||
const taskNamesuffix = "webAppEditor"
|
||||
|
||||
func ruleIdFromTaskName(n string) string {
|
||||
func RuleIdFromTaskName(n string) string {
|
||||
return strings.Split(n, "-groupname")[0]
|
||||
}
|
||||
|
||||
@ -121,7 +121,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
rules := make([]Rule, 0)
|
||||
var task Task
|
||||
|
||||
ruleId := ruleIdFromTaskName(opts.TaskName)
|
||||
ruleId := RuleIdFromTaskName(opts.TaskName)
|
||||
if opts.Rule.RuleType == RuleTypeThreshold {
|
||||
// create a threshold rule
|
||||
tr, err := NewThresholdRule(
|
||||
@ -400,7 +400,7 @@ func (m *Manager) deleteTask(taskName string) {
|
||||
if ok {
|
||||
oldg.Stop()
|
||||
delete(m.tasks, taskName)
|
||||
delete(m.rules, ruleIdFromTaskName(taskName))
|
||||
delete(m.rules, RuleIdFromTaskName(taskName))
|
||||
zap.L().Debug("rule task deleted", zap.String("name", taskName))
|
||||
} else {
|
||||
zap.L().Info("rule not found for deletion", zap.String("name", taskName))
|
||||
|
@ -40,7 +40,7 @@ type PromRuleTask struct {
|
||||
|
||||
// newPromRuleTask holds rules that have promql condition
|
||||
// and evalutes the rule at a given frequency
|
||||
func newPromRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, ruleDB RuleDB) *PromRuleTask {
|
||||
func NewPromRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, ruleDB RuleDB) *PromRuleTask {
|
||||
zap.L().Info("Initiating a new rule group", zap.String("name", name), zap.Duration("frequency", frequency))
|
||||
|
||||
if time.Now() == time.Now().Add(frequency) {
|
||||
|
@ -37,8 +37,8 @@ type RuleTask struct {
|
||||
|
||||
const DefaultFrequency = 1 * time.Minute
|
||||
|
||||
// newRuleTask makes a new RuleTask with the given name, options, and rules.
|
||||
func newRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, ruleDB RuleDB) *RuleTask {
|
||||
// NewRuleTask makes a new RuleTask with the given name, options, and rules.
|
||||
func NewRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, ruleDB RuleDB) *RuleTask {
|
||||
|
||||
if time.Now() == time.Now().Add(frequency) {
|
||||
frequency = DefaultFrequency
|
||||
|
@ -31,7 +31,7 @@ type Task interface {
|
||||
// rule type
|
||||
func newTask(taskType TaskType, name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, ruleDB RuleDB) Task {
|
||||
if taskType == TaskTypeCh {
|
||||
return newRuleTask(name, file, frequency, rules, opts, notify, ruleDB)
|
||||
return NewRuleTask(name, file, frequency, rules, opts, notify, ruleDB)
|
||||
}
|
||||
return newPromRuleTask(name, file, frequency, rules, opts, notify, ruleDB)
|
||||
return NewPromRuleTask(name, file, frequency, rules, opts, notify, ruleDB)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user