Alerts backend with metrics (#1346)

Alerts backend with metrics
This commit is contained in:
Amol Umbark 2022-07-14 11:59:06 +05:30 committed by GitHub
parent c3d665e119
commit 3a287b2b16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 4735 additions and 566 deletions

View File

@ -3,15 +3,12 @@ package clickhouseReader
import (
"bytes"
"context"
"crypto/md5"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"reflect"
"regexp"
@ -26,20 +23,16 @@ import (
"github.com/google/uuid"
"github.com/oklog/oklog/pkg/group"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/promlog"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/util/stats"
"github.com/prometheus/prometheus/util/strutil"
"github.com/prometheus/tsdb"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
@ -92,24 +85,30 @@ type ClickHouseReader struct {
spansTable string
queryEngine *promql.Engine
remoteStorage *remote.Storage
ruleManager *rules.Manager
promConfigFile string
promConfig *config.Config
alertManager am.Manager
}
// NewTraceReader returns a TraceReader for the database
func NewReader(localDB *sqlx.DB) *ClickHouseReader {
func NewReader(localDB *sqlx.DB, configFile string) *ClickHouseReader {
datasource := os.Getenv("ClickHouseUrl")
options := NewOptions(datasource, primaryNamespace, archiveNamespace)
db, err := initialize(options)
if err != nil {
zap.S().Error(err)
zap.S().Error("failed to initialize ClickHouse: ", err)
os.Exit(1)
}
alertManager := am.New("")
alertManager, err := am.New("")
if err != nil {
zap.S().Errorf("msg: failed to initialize alert manager: ", "/t error:", err)
zap.S().Errorf("msg: check if the alert manager URL is correctly set and valid")
os.Exit(1)
}
return &ClickHouseReader{
db: db,
@ -121,6 +120,7 @@ func NewReader(localDB *sqlx.DB) *ClickHouseReader {
errorTable: options.primary.ErrorTable,
durationTable: options.primary.DurationTable,
spansTable: options.primary.SpansTable,
promConfigFile: configFile,
}
}
@ -139,30 +139,14 @@ func (r *ClickHouseReader) Start() {
startTime := func() (int64, error) {
return int64(promModel.Latest), nil
}
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), startTime, time.Duration(1*time.Minute))
// conf, err := config.LoadFile(*filename)
// if err != nil {
// zap.S().Error("couldn't load configuration (--config.file=%q): %v", filename, err)
// }
// err = remoteStorage.ApplyConfig(conf)
// if err != nil {
// zap.S().Error("Error in remoteStorage.ApplyConfig: ", err)
// }
cfg := struct {
configFile string
localStoragePath string
notifier notifier.Options
notifierTimeout promModel.Duration
forGracePeriod promModel.Duration
outageTolerance promModel.Duration
resendDelay promModel.Duration
tsdb tsdb.Options
lookbackDelta promModel.Duration
webTimeout promModel.Duration
queryTimeout promModel.Duration
@ -174,39 +158,15 @@ func (r *ClickHouseReader) Start() {
logLevel promlog.AllowedLevel
}{
notifier: notifier.Options{
Registerer: prometheus.DefaultRegisterer,
},
configFile: r.promConfigFile,
}
flag.StringVar(&cfg.configFile, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.Parse()
// fanoutStorage := remoteStorage
fanoutStorage := storage.NewFanout(logger, remoteStorage)
localStorage := remoteStorage
cfg.notifier.QueueCapacity = 10000
cfg.notifierTimeout = promModel.Duration(time.Duration.Seconds(10))
notifier := notifier.NewManager(&cfg.notifier, log.With(logger, "component", "notifier"))
// notifier.ApplyConfig(conf)
ExternalURL, err := computeExternalURL("", "0.0.0.0:3301")
if err != nil {
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "parse external URL %q", ExternalURL.String()))
os.Exit(2)
}
cfg.outageTolerance = promModel.Duration(time.Duration.Hours(1))
cfg.forGracePeriod = promModel.Duration(time.Duration.Minutes(10))
cfg.resendDelay = promModel.Duration(time.Duration.Minutes(1))
ctxScrape, cancelScrape := context.WithCancel(context.Background())
discoveryManagerScrape := discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), discovery.Name("scrape"))
ctxNotify, cancelNotify := context.WithCancel(context.Background())
discoveryManagerNotify := discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify"))
scrapeManager := scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
opts := promql.EngineOpts{
@ -219,25 +179,10 @@ func (r *ClickHouseReader) Start() {
queryEngine := promql.NewEngine(opts)
ruleManager := rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
TSDB: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: sendAlerts(notifier, ExternalURL.String()),
Context: context.Background(),
ExternalURL: ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
})
reloaders := []func(cfg *config.Config) error{
remoteStorage.ApplyConfig,
// The Scrape and notifier managers need to reload before the Discovery manager as
// The Scrape managers need to reload before the Discovery manager as
// they need to read the most updated config when receiving the new targets list.
notifier.ApplyConfig,
scrapeManager.ApplyConfig,
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
@ -246,32 +191,6 @@ func (r *ClickHouseReader) Start() {
}
return discoveryManagerScrape.ApplyConfig(c)
},
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.AlertingConfig.AlertmanagerConfigs {
// AlertmanagerConfigs doesn't hold an unique identifier so we use the config hash as the identifier.
b, err := json.Marshal(v)
if err != nil {
return err
}
c[fmt.Sprintf("%x", md5.Sum(b))] = v.ServiceDiscoveryConfig
}
return discoveryManagerNotify.ApplyConfig(c)
},
// func(cfg *config.Config) error {
// // Get all rule files matching the configuration oaths.
// var files []string
// for _, pat := range cfg.RuleFiles {
// fs, err := filepath.Glob(pat)
// if err != nil {
// // The only error can be a bad pattern.
// return fmt.Errorf("error retrieving rule files for %s: %s", pat, err)
// }
// files = append(files, fs...)
// }
// return ruleManager.Update(time.Duration(cfg.GlobalConfig.EvaluationInterval), files)
// },
}
// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
@ -305,20 +224,6 @@ func (r *ClickHouseReader) Start() {
},
)
}
{
// Notify discovery manager.
g.Add(
func() error {
err := discoveryManagerNotify.Run()
level.Info(logger).Log("msg", "Notify discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping notify discovery manager...")
cancelNotify()
},
)
}
{
// Scrape manager.
g.Add(
@ -354,6 +259,7 @@ func (r *ClickHouseReader) Start() {
// reloadReady.Close()
// return nil
// }
var err error
r.promConfig, err = reloadConfig(cfg.configFile, logger, reloaders...)
if err != nil {
return fmt.Errorf("error loading config from %q: %s", cfg.configFile, err)
@ -361,29 +267,19 @@ func (r *ClickHouseReader) Start() {
reloadReady.Close()
rules, apiErrorObj := r.GetRulesFromDB()
// ! commented the alert manager can now
// call query service to do this
// channels, apiErrorObj := r.GetChannels()
if apiErrorObj != nil {
zap.S().Errorf("Not able to read rules from DB")
}
for _, rule := range *rules {
apiErrorObj = r.LoadRule(rule)
if apiErrorObj != nil {
zap.S().Errorf("Not able to load rule with id=%d loaded from DB", rule.Id, rule.Data)
}
}
channels, apiErrorObj := r.GetChannels()
if apiErrorObj != nil {
zap.S().Errorf("Not able to read channels from DB")
}
for _, channel := range *channels {
apiErrorObj = r.LoadChannel(&channel)
if apiErrorObj != nil {
zap.S().Errorf("Not able to load channel with id=%d loaded from DB", channel.Id, channel.Data)
}
}
//if apiErrorObj != nil {
// zap.S().Errorf("Not able to read channels from DB")
//}
//for _, channel := range *channels {
//apiErrorObj = r.LoadChannel(&channel)
//if apiErrorObj != nil {
// zap.S().Errorf("Not able to load channel with id=%d loaded from DB", channel.Id, channel.Data)
//}
//}
<-cancel
@ -394,48 +290,8 @@ func (r *ClickHouseReader) Start() {
},
)
}
{
// Rule manager.
// TODO(krasi) refactor ruleManager.Run() to be blocking to avoid using an extra blocking channel.
cancel := make(chan struct{})
g.Add(
func() error {
<-reloadReady.C
ruleManager.Run()
<-cancel
return nil
},
func(err error) {
ruleManager.Stop()
close(cancel)
},
)
}
{
// Notifier.
// Calling notifier.Stop() before ruleManager.Stop() will cause a panic if the ruleManager isn't running,
// so keep this interrupt after the ruleManager.Stop().
g.Add(
func() error {
// When the notifier manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager
// so we wait until the config is fully loaded.
<-reloadReady.C
notifier.Run(discoveryManagerNotify.SyncCh())
level.Info(logger).Log("msg", "Notifier manager stopped")
return nil
},
func(err error) {
notifier.Stop()
},
)
}
r.queryEngine = queryEngine
r.remoteStorage = remoteStorage
r.ruleManager = ruleManager
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
@ -466,70 +322,6 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config
return conf, nil
}
func startsOrEndsWithQuote(s string) bool {
return strings.HasPrefix(s, "\"") || strings.HasPrefix(s, "'") ||
strings.HasSuffix(s, "\"") || strings.HasSuffix(s, "'")
}
// computeExternalURL computes a sanitized external URL from a raw input. It infers unset
// URL parts from the OS and the given listen address.
func computeExternalURL(u, listenAddr string) (*url.URL, error) {
if u == "" {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
_, port, err := net.SplitHostPort(listenAddr)
if err != nil {
return nil, err
}
u = fmt.Sprintf("http://%s:%s/", hostname, port)
}
if startsOrEndsWithQuote(u) {
return nil, fmt.Errorf("URL must not begin or end with quotes")
}
eu, err := url.Parse(u)
if err != nil {
return nil, err
}
ppref := strings.TrimRight(eu.Path, "/")
if ppref != "" && !strings.HasPrefix(ppref, "/") {
ppref = "/" + ppref
}
eu.Path = ppref
return eu, nil
}
// sendAlerts implements the rules.NotifyFunc for a Notifier.
func sendAlerts(n *notifier.Manager, externalURL string) rules.NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*rules.Alert) {
var res []*notifier.Alert
for _, alert := range alerts {
a := &notifier.Alert{
StartsAt: alert.FiredAt,
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
} else {
a.EndsAt = alert.ValidUntil
}
res = append(res, a)
}
if len(alerts) > 0 {
n.Send(res...)
}
}
}
func initialize(options *Options) (clickhouse.Conn, error) {
db, err := connect(options.getPrimary())
@ -548,156 +340,8 @@ func connect(cfg *namespaceConfig) (clickhouse.Conn, error) {
return cfg.Connector(cfg)
}
type byAlertStateAndNameSorter struct {
alerts []*AlertingRuleWithGroup
}
func (s byAlertStateAndNameSorter) Len() int {
return len(s.alerts)
}
func (s byAlertStateAndNameSorter) Less(i, j int) bool {
return s.alerts[i].State() > s.alerts[j].State() ||
(s.alerts[i].State() == s.alerts[j].State() &&
s.alerts[i].Name() < s.alerts[j].Name())
}
func (s byAlertStateAndNameSorter) Swap(i, j int) {
s.alerts[i], s.alerts[j] = s.alerts[j], s.alerts[i]
}
type AlertingRuleWithGroup struct {
rules.AlertingRule
Id int
}
func (r *ClickHouseReader) GetRulesFromDB() (*[]model.RuleResponseItem, *model.ApiError) {
rules := []model.RuleResponseItem{}
query := fmt.Sprintf("SELECT id, updated_at, data FROM rules")
err := r.localDB.Select(&rules, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return &rules, nil
}
func (r *ClickHouseReader) GetRule(id string) (*model.RuleResponseItem, *model.ApiError) {
idInt, err := strconv.Atoi(id)
if err != nil {
zap.S().Debug("Error in parsing param: ", err)
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
}
rule := &model.RuleResponseItem{}
query := "SELECT id, updated_at, data FROM rules WHERE id=?"
rows, err := r.localDB.Query(query, idInt)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
count := 0
// iterate over each row
for rows.Next() {
err = rows.Scan(&rule.Id, &rule.UpdatedAt, &rule.Data)
if err != nil {
zap.S().Debug(err)
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
count += 1
}
if count == 0 {
err = fmt.Errorf("no rule with id %d found", idInt)
zap.S().Debug(err)
return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: err}
}
if count > 1 {
err = fmt.Errorf("multiple rules with id %d found", idInt)
zap.S().Debug(err)
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: err}
}
return rule, nil
}
func (r *ClickHouseReader) ListRulesFromProm() (*model.AlertDiscovery, *model.ApiError) {
groups := r.ruleManager.RuleGroups()
alertingRulesWithGroupObjects := []*AlertingRuleWithGroup{}
for _, group := range groups {
groupNameParts := strings.Split(group.Name(), "-groupname")
if len(groupNameParts) < 2 {
continue
}
id, _ := strconv.Atoi(groupNameParts[0])
for _, rule := range group.Rules() {
if alertingRule, ok := rule.(*rules.AlertingRule); ok {
alertingRulesWithGroupObject := AlertingRuleWithGroup{
*alertingRule,
id,
}
alertingRulesWithGroupObjects = append(alertingRulesWithGroupObjects, &alertingRulesWithGroupObject)
}
}
}
// alertingRules := r.ruleManager.AlertingRules()
alertsSorter := byAlertStateAndNameSorter{alerts: alertingRulesWithGroupObjects}
sort.Sort(alertsSorter)
alerts := []*model.AlertingRuleResponse{}
for _, alertingRule := range alertsSorter.alerts {
alertingRuleResponseObject := &model.AlertingRuleResponse{
Labels: alertingRule.Labels(),
// Annotations: alertingRule.Annotations(),
Name: alertingRule.Name(),
Id: alertingRule.Id,
}
if len(alertingRule.ActiveAlerts()) == 0 {
alertingRuleResponseObject.State = rules.StateInactive.String()
} else {
alertingRuleResponseObject.State = (*(alertingRule.ActiveAlerts()[0])).State.String()
}
alerts = append(
alerts,
alertingRuleResponseObject,
)
}
res := &model.AlertDiscovery{Alerts: alerts}
return res, nil
}
func (r *ClickHouseReader) LoadRule(rule model.RuleResponseItem) *model.ApiError {
groupName := fmt.Sprintf("%d-groupname", rule.Id)
err := r.ruleManager.AddGroup(time.Duration(r.promConfig.GlobalConfig.EvaluationInterval), rule.Data, groupName)
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
func (r *ClickHouseReader) GetConn() clickhouse.Conn {
return r.db
}
func (r *ClickHouseReader) LoadChannel(channel *model.ChannelItem) *model.ApiError {
@ -942,138 +586,6 @@ func (r *ClickHouseReader) CreateChannel(receiver *am.Receiver) (*am.Receiver, *
}
func (r *ClickHouseReader) CreateRule(rule string) *model.ApiError {
tx, err := r.localDB.Begin()
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
var lastInsertId int64
{
stmt, err := tx.Prepare(`INSERT into rules (updated_at, data) VALUES($1,$2);`)
if err != nil {
zap.S().Errorf("Error in preparing statement for INSERT to rules\n", err)
tx.Rollback()
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
result, err := stmt.Exec(time.Now(), rule)
if err != nil {
zap.S().Errorf("Error in Executing prepared statement for INSERT to rules\n", err)
tx.Rollback() // return an error too, we may want to wrap them
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
lastInsertId, _ = result.LastInsertId()
groupName := fmt.Sprintf("%d-groupname", lastInsertId)
err = r.ruleManager.AddGroup(time.Duration(r.promConfig.GlobalConfig.EvaluationInterval), rule, groupName)
if err != nil {
tx.Rollback()
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
err = tx.Commit()
if err != nil {
zap.S().Errorf("Error in committing transaction for INSERT to rules\n", err)
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (r *ClickHouseReader) EditRule(rule string, id string) *model.ApiError {
idInt, _ := strconv.Atoi(id)
tx, err := r.localDB.Begin()
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
{
stmt, err := tx.Prepare(`UPDATE rules SET updated_at=$1, data=$2 WHERE id=$3;`)
if err != nil {
zap.S().Errorf("Error in preparing statement for UPDATE to rules\n", err)
tx.Rollback()
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
if _, err := stmt.Exec(time.Now(), rule, idInt); err != nil {
zap.S().Errorf("Error in Executing prepared statement for UPDATE to rules\n", err)
tx.Rollback() // return an error too, we may want to wrap them
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
groupName := fmt.Sprintf("%d-groupname", idInt)
err = r.ruleManager.EditGroup(time.Duration(r.promConfig.GlobalConfig.EvaluationInterval), rule, groupName)
if err != nil {
tx.Rollback()
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
err = tx.Commit()
if err != nil {
zap.S().Errorf("Error in committing transaction for UPDATE to rules\n", err)
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (r *ClickHouseReader) DeleteRule(id string) *model.ApiError {
idInt, _ := strconv.Atoi(id)
tx, err := r.localDB.Begin()
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
{
stmt, err := tx.Prepare(`DELETE FROM rules WHERE id=$1;`)
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
if _, err := stmt.Exec(idInt); err != nil {
zap.S().Errorf("Error in Executing prepared statement for DELETE to rules\n", err)
tx.Rollback() // return an error too, we may want to wrap them
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
groupName := fmt.Sprintf("%d-groupname", idInt)
rule := "" // dummy rule to pass to function
// err = r.ruleManager.UpdateGroupWithAction(time.Duration(r.promConfig.GlobalConfig.EvaluationInterval), rule, groupName, "delete")
err = r.ruleManager.DeleteGroup(time.Duration(r.promConfig.GlobalConfig.EvaluationInterval), rule, groupName)
if err != nil {
tx.Rollback()
zap.S().Errorf("Error in deleting rule from rulemanager...\n", err)
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
err = tx.Commit()
if err != nil {
zap.S().Errorf("Error in committing transaction for deleting rules\n", err)
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
qry, err := r.queryEngine.NewInstantQuery(r.remoteStorage, queryParams.Query, queryParams.Time)
if err != nil {

View File

@ -20,10 +20,12 @@ import (
"go.signoz.io/query-service/app/parser"
"go.signoz.io/query-service/auth"
"go.signoz.io/query-service/constants"
"go.signoz.io/query-service/dao"
am "go.signoz.io/query-service/integrations/alertManager"
"go.signoz.io/query-service/interfaces"
"go.signoz.io/query-service/model"
"go.signoz.io/query-service/rules"
"go.signoz.io/query-service/telemetry"
"go.signoz.io/query-service/version"
"go.uber.org/zap"
@ -50,17 +52,22 @@ type APIHandler struct {
reader *interfaces.Reader
relationalDB dao.ModelDao
alertManager am.Manager
ruleManager *rules.Manager
ready func(http.HandlerFunc) http.HandlerFunc
}
// NewAPIHandler returns an APIHandler
func NewAPIHandler(reader *interfaces.Reader, relationalDB dao.ModelDao) (*APIHandler, error) {
func NewAPIHandler(reader *interfaces.Reader, relationalDB dao.ModelDao, ruleManager *rules.Manager) (*APIHandler, error) {
alertManager := am.New("")
alertManager, err := am.New("")
if err != nil {
return nil, err
}
aH := &APIHandler{
reader: reader,
relationalDB: relationalDB,
alertManager: alertManager,
ruleManager: ruleManager,
}
aH.ready = aH.testReady
@ -297,7 +304,7 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
router.HandleFunc("/api/v1/channels/{id}", AdminAccess(aH.deleteChannel)).Methods(http.MethodDelete)
router.HandleFunc("/api/v1/channels", EditAccess(aH.createChannel)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/testChannel", EditAccess(aH.testChannel)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/rules", ViewAccess(aH.listRulesFromProm)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/rules", ViewAccess(aH.listRules)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/rules/{id}", ViewAccess(aH.getRule)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/rules", EditAccess(aH.createRule)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/rules/{id}", EditAccess(aH.editRule)).Methods(http.MethodPut)
@ -381,12 +388,12 @@ func Intersection(a, b []int) (c []int) {
func (aH *APIHandler) getRule(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
alertList, apiErrorObj := (*aH.reader).GetRule(id)
if apiErrorObj != nil {
respondError(w, apiErrorObj, nil)
ruleResponse, err := aH.ruleManager.GetRule(id)
if err != nil {
respondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.respond(w, alertList)
aH.respond(w, ruleResponse)
}
func (aH *APIHandler) metricAutocompleteMetricName(w http.ResponseWriter, r *http.Request) {
@ -617,13 +624,17 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
aH.respond(w, resp)
}
func (aH *APIHandler) listRulesFromProm(w http.ResponseWriter, r *http.Request) {
alertList, apiErrorObj := (*aH.reader).ListRulesFromProm()
if apiErrorObj != nil {
respondError(w, apiErrorObj, nil)
func (aH *APIHandler) listRules(w http.ResponseWriter, r *http.Request) {
rules, err := aH.ruleManager.ListRuleStates()
if err != nil {
respondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.respond(w, alertList)
// todo(amol): need to add sorter
aH.respond(w, rules)
}
func (aH *APIHandler) getDashboards(w http.ResponseWriter, r *http.Request) {
@ -759,32 +770,35 @@ func (aH *APIHandler) createDashboards(w http.ResponseWriter, r *http.Request) {
}
func (aH *APIHandler) deleteRule(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
apiErrorObj := (*aH.reader).DeleteRule(id)
err := aH.ruleManager.DeleteRule(id)
if apiErrorObj != nil {
respondError(w, apiErrorObj, nil)
if err != nil {
respondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.respond(w, "rule successfully deleted")
}
func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
var postData map[string]string
err := json.NewDecoder(r.Body).Decode(&postData)
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
if err != nil {
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading request body")
zap.S().Errorf("msg: error in getting req body of edit rule API\n", "\t error:", err)
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
apiErrorObj := (*aH.reader).EditRule(postData["data"], id)
err = aH.ruleManager.EditRule(string(body), id)
if apiErrorObj != nil {
respondError(w, apiErrorObj, nil)
if err != nil {
respondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
@ -908,20 +922,17 @@ func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var postData map[string]string
err := decoder.Decode(&postData)
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
if err != nil {
zap.S().Errorf("Error in getting req body for create rule API\n", err)
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
apiErrorObj := (*aH.reader).CreateRule(postData["data"])
if apiErrorObj != nil {
respondError(w, apiErrorObj, nil)
err = aH.ruleManager.CreateRule(string(body))
if err != nil {
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}

View File

@ -11,6 +11,7 @@ import (
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/jmoiron/sqlx"
"github.com/rs/cors"
"github.com/soheilhy/cmux"
@ -19,15 +20,22 @@ import (
"go.signoz.io/query-service/constants"
"go.signoz.io/query-service/dao"
"go.signoz.io/query-service/healthcheck"
am "go.signoz.io/query-service/integrations/alertManager"
"go.signoz.io/query-service/interfaces"
pqle "go.signoz.io/query-service/pqlEngine"
"go.signoz.io/query-service/rules"
"go.signoz.io/query-service/telemetry"
"go.signoz.io/query-service/utils"
"go.uber.org/zap"
)
type ServerOptions struct {
PromConfigPath string
HTTPHostPort string
PrivateHostPort string
// alert specific params
DisableRules bool
RuleRepoURL string
}
// Server runs HTTP, Mux and a grpc server
@ -35,6 +43,9 @@ type Server struct {
// logger *zap.Logger
// tracer opentracing.Tracer // TODO make part of flags.Service
serverOptions *ServerOptions
conn net.Listener
ruleManager *rules.Manager
separatePorts bool
// public http router
httpConn net.Listener
@ -58,6 +69,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
if err := dao.InitDao("sqlite", constants.RELATIONAL_DATASOURCE_PATH); err != nil {
return nil, err
}
localDB, err := dashboards.InitDB(constants.RELATIONAL_DATASOURCE_PATH)
if err != nil {
@ -70,16 +82,20 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
storage := os.Getenv("STORAGE")
if storage == "clickhouse" {
zap.S().Info("Using ClickHouse as datastore ...")
clickhouseReader := clickhouseReader.NewReader(localDB)
clickhouseReader := clickhouseReader.NewReader(localDB, serverOptions.PromConfigPath)
go clickhouseReader.Start()
reader = clickhouseReader
} else {
return nil, fmt.Errorf("Storage type: %s is not supported in query service", storage)
}
telemetry.GetInstance().SetReader(reader)
rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules)
if err != nil {
return nil, err
}
apiHandler, err := NewAPIHandler(&reader, dao.DB())
telemetry.GetInstance().SetReader(reader)
apiHandler, err := NewAPIHandler(&reader, dao.DB(), rm)
if err != nil {
return nil, err
}
@ -87,6 +103,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
s := &Server{
// logger: logger,
// tracer: tracer,
ruleManager: rm,
serverOptions: serverOptions,
unavailableChannel: make(chan healthcheck.Status),
}
@ -262,6 +279,13 @@ func (s *Server) initListeners() error {
// Start listening on http and private http port concurrently
func (s *Server) Start() error {
// initiate rule manager first
if !s.serverOptions.DisableRules {
s.ruleManager.Start()
} else {
zap.S().Info("msg: Rules disabled as rules.disable is set to TRUE")
}
err := s.initListeners()
if err != nil {
return err
@ -315,3 +339,49 @@ func (s *Server) Start() error {
return nil
}
func makeRulesManager(
promConfigPath,
alertManagerURL string,
ruleRepoURL string,
db *sqlx.DB,
ch interfaces.Reader,
disableRules bool) (*rules.Manager, error) {
// create engine
pqle, err := pqle.FromConfigPath(promConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to create pql engine : %v", err)
}
// notifier opts
notifierOpts := am.NotifierOptions{
QueueCapacity: 10000,
Timeout: 1 * time.Second,
AlertManagerURLs: []string{alertManagerURL},
}
// create manager opts
managerOpts := &rules.ManagerOptions{
NotifierOpts: notifierOpts,
Queriers: &rules.Queriers{
PqlEngine: pqle,
Ch: ch.GetConn(),
},
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),
Logger: nil,
DisableRules: disableRules,
}
// create Manager
manager, err := rules.NewManager(managerOpts)
if err != nil {
return nil, fmt.Errorf("rule manager error: %v", err)
}
zap.S().Info("rules manager is ready")
return manager, nil
}

View File

@ -74,6 +74,12 @@ const (
SIGNOZ_TIMESERIES_TABLENAME = "time_series_v2"
)
// alert related constants
const (
// AlertHelpPage is used in case default alert repo url is not set
AlertHelpPage = "https://signoz.io/docs/userguide/alerts-management/#generator-url"
)
func GetOrDefaultEnv(key string, fallback string) string {
v := os.Getenv(key)
if len(v) == 0 {

View File

@ -5,35 +5,44 @@ import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"go.signoz.io/query-service/constants"
"go.signoz.io/query-service/model"
"go.uber.org/zap"
"net/http"
neturl "net/url"
)
const contentType = "application/json"
type Manager interface {
URL() *neturl.URL
URLPath(path string) *neturl.URL
AddRoute(receiver *Receiver) *model.ApiError
EditRoute(receiver *Receiver) *model.ApiError
DeleteRoute(name string) *model.ApiError
TestReceiver(receiver *Receiver) *model.ApiError
}
func New(url string) Manager {
func New(url string) (Manager, error) {
if url == "" {
url = constants.GetAlertManagerApiPrefix()
}
urlParsed, err := neturl.Parse(url)
if err != nil {
return nil, err
}
return &manager{
url: url,
}
parsedURL: urlParsed,
}, nil
}
type manager struct {
url string
parsedURL *neturl.URL
}
func prepareAmChannelApiURL() string {
@ -52,6 +61,19 @@ func prepareTestApiURL() string {
return fmt.Sprintf("%s%s", basePath, "v1/testReceiver")
}
func (m *manager) URL() *neturl.URL {
return m.parsedURL
}
func (m *manager) URLPath(path string) *neturl.URL {
upath, err := neturl.Parse(path)
if err != nil {
return nil
}
return m.parsedURL.ResolveReference(upath)
}
func (m *manager) AddRoute(receiver *Receiver) *model.ApiError {
receiverString, _ := json.Marshal(receiver)

View File

@ -1,5 +1,11 @@
package alertManager
import (
"fmt"
"go.signoz.io/query-service/utils/labels"
"time"
)
// Receiver configuration provides configuration on how to contact a receiver.
type Receiver struct {
// A unique identifier for this receiver.
@ -20,3 +26,50 @@ type ReceiverResponse struct {
Status string `json:"status"`
Data Receiver `json:"data"`
}
// Alert is a generic representation of an alert in the Prometheus eco-system.
type Alert struct {
// Label value pairs for purpose of aggregation, matching, and disposition
// dispatching. This must minimally include an "alertname" label.
Labels labels.BaseLabels `json:"labels"`
// Extra key/value information which does not define alert identity.
Annotations labels.BaseLabels `json:"annotations"`
// The known time range for this alert. Both ends are optional.
StartsAt time.Time `json:"startsAt,omitempty"`
EndsAt time.Time `json:"endsAt,omitempty"`
GeneratorURL string `json:"generatorURL,omitempty"`
}
// Name returns the name of the alert. It is equivalent to the "alertname" label.
func (a *Alert) Name() string {
return a.Labels.Get(labels.AlertNameLabel)
}
// Hash returns a hash over the alert. It is equivalent to the alert labels hash.
func (a *Alert) Hash() uint64 {
return a.Labels.Hash()
}
func (a *Alert) String() string {
s := fmt.Sprintf("%s[%s]", a.Name(), fmt.Sprintf("%016x", a.Hash())[:7])
if a.Resolved() {
return s + "[resolved]"
}
return s + "[active]"
}
// Resolved returns true iff the activity interval ended in the past.
func (a *Alert) Resolved() bool {
return a.ResolvedAt(time.Now())
}
// ResolvedAt returns true off the activity interval ended before
// the given timestamp.
func (a *Alert) ResolvedAt(ts time.Time) bool {
if a.EndsAt.IsZero() {
return false
}
return !a.EndsAt.After(ts)
}

View File

@ -0,0 +1,310 @@
package alertManager
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync/atomic"
"net/http"
"net/url"
"sync"
"time"
old_ctx "golang.org/x/net/context"
"github.com/go-kit/kit/log"
"github.com/go-kit/log/level"
"go.uber.org/zap"
"golang.org/x/net/context/ctxhttp"
)
const (
alertPushEndpoint = "v1/alerts"
contentTypeJSON = "application/json"
)
// Notifier is responsible for dispatching alert notifications to an
// alert manager service.
type Notifier struct {
queue []*Alert
opts *NotifierOptions
more chan struct{}
mtx sync.RWMutex
ctx context.Context
cancel func()
alertmanagers *alertmanagerSet
logger log.Logger
}
// NotifierOptions are the configurable parameters of a Handler.
type NotifierOptions struct {
QueueCapacity int
// Used for sending HTTP requests to the Alertmanager.
Do func(ctx old_ctx.Context, client *http.Client, req *http.Request) (*http.Response, error)
// List of alert manager urls
AlertManagerURLs []string
// timeout limit on requests
Timeout time.Duration
}
func (opts *NotifierOptions) String() string {
var urls string
for _, u := range opts.AlertManagerURLs {
urls = fmt.Sprintf("%s %s", urls, u)
}
return urls
}
// todo(amol): add metrics
func NewNotifier(o *NotifierOptions, logger log.Logger) (*Notifier, error) {
ctx, cancel := context.WithCancel(context.Background())
if o.Do == nil {
o.Do = ctxhttp.Do
}
if logger == nil {
logger = log.NewNopLogger()
}
n := &Notifier{
queue: make([]*Alert, 0, o.QueueCapacity),
ctx: ctx,
cancel: cancel,
more: make(chan struct{}, 1),
opts: o,
logger: logger,
}
timeout := o.Timeout
if int64(timeout) == 0 {
timeout = time.Duration(30 * time.Second)
}
amset, err := newAlertmanagerSet(o.AlertManagerURLs, timeout, logger)
if err != nil {
zap.S().Errorf("failed to parse alert manager urls")
return n, err
}
n.alertmanagers = amset
zap.S().Info("Starting notifier with alert manager:", o.AlertManagerURLs)
return n, nil
}
const maxBatchSize = 64
func (n *Notifier) queueLen() int {
n.mtx.RLock()
defer n.mtx.RUnlock()
return len(n.queue)
}
func (n *Notifier) nextBatch() []*Alert {
n.mtx.Lock()
defer n.mtx.Unlock()
var alerts []*Alert
if len(n.queue) > maxBatchSize {
alerts = append(make([]*Alert, 0, maxBatchSize), n.queue[:maxBatchSize]...)
n.queue = n.queue[maxBatchSize:]
} else {
alerts = append(make([]*Alert, 0, len(n.queue)), n.queue...)
n.queue = n.queue[:0]
}
return alerts
}
// Run dispatches notifications continuously.
func (n *Notifier) Run() {
zap.S().Info("msg: Initiating alert notifier...")
for {
select {
case <-n.ctx.Done():
return
case <-n.more:
}
alerts := n.nextBatch()
if !n.sendAll(alerts...) {
zap.S().Warn("msg: dropped alerts", "\t count:", len(alerts))
// n.metrics.dropped.Add(float64(len(alerts)))
}
// If the queue still has items left, kick off the next iteration.
if n.queueLen() > 0 {
n.setMore()
}
}
}
// Send queues the given notification requests for processing.
// Panics if called on a handler that is not running.
func (n *Notifier) Send(alerts ...*Alert) {
n.mtx.Lock()
defer n.mtx.Unlock()
// Queue capacity should be significantly larger than a single alert
// batch could be.
if d := len(alerts) - n.opts.QueueCapacity; d > 0 {
alerts = alerts[d:]
level.Warn(n.logger).Log("msg", "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d)
//n.metrics.dropped.Add(float64(d))
}
// If the queue is full, remove the oldest alerts in favor
// of newer ones.
if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 {
n.queue = n.queue[d:]
level.Warn(n.logger).Log("msg", "Alert notification queue full, dropping alerts", "num_dropped", d)
//n.metrics.dropped.Add(float64(d))
}
n.queue = append(n.queue, alerts...)
// Notify sending goroutine that there are alerts to be processed.
n.setMore()
}
// setMore signals that the alert queue has items.
func (n *Notifier) setMore() {
// If we cannot send on the channel, it means the signal already exists
// and has not been consumed yet.
select {
case n.more <- struct{}{}:
default:
}
}
// Alertmanagers returns a slice of Alertmanager URLs.
func (n *Notifier) Alertmanagers() []*url.URL {
n.mtx.RLock()
amset := n.alertmanagers
n.mtx.RUnlock()
var res []*url.URL
amset.mtx.RLock()
for _, am := range amset.ams {
res = append(res, am.URLPath(alertPushEndpoint))
}
amset.mtx.RUnlock()
return res
}
// sendAll sends the alerts to all configured Alertmanagers concurrently.
// It returns true if the alerts could be sent successfully to at least one Alertmanager.
func (n *Notifier) sendAll(alerts ...*Alert) bool {
b, err := json.Marshal(alerts)
if err != nil {
zap.S().Errorf("msg", "Encoding alerts failed", "err", err)
return false
}
n.mtx.RLock()
ams := n.alertmanagers
n.mtx.RUnlock()
var (
wg sync.WaitGroup
numSuccess uint64
)
ams.mtx.RLock()
for _, am := range ams.ams {
wg.Add(1)
ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.timeout))
defer cancel()
go func(ams *alertmanagerSet, am Manager) {
u := am.URLPath(alertPushEndpoint).String()
if err := n.sendOne(ctx, ams.client, u, b); err != nil {
zap.S().Errorf("alertmanager", u, "count", len(alerts), "msg", "Error calling alert API", "err", err)
} else {
atomic.AddUint64(&numSuccess, 1)
}
// n.metrics.latency.WithLabelValues(u).Observe(time.Since(begin).Seconds())
// n.metrics.sent.WithLabelValues(u).Add(float64(len(alerts)))
wg.Done()
}(ams, am)
}
ams.mtx.RUnlock()
wg.Wait()
return numSuccess > 0
}
func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {
req, err := http.NewRequest("POST", url, bytes.NewReader(b))
if err != nil {
return err
}
req.Header.Set("Content-Type", contentTypeJSON)
resp, err := n.opts.Do(ctx, c, req)
if err != nil {
return err
}
defer resp.Body.Close()
// Any HTTP status 2xx is OK.
if resp.StatusCode/100 != 2 {
return fmt.Errorf("bad response status %v", resp.Status)
}
return err
}
// Stop shuts down the notification handler.
func (n *Notifier) Stop() {
level.Info(n.logger).Log("msg", "Stopping notification manager...")
n.cancel()
}
// alertmanagerSet contains a set of Alertmanagers discovered via a group of service
// discovery definitions that have a common configuration on how alerts should be sent.
type alertmanagerSet struct {
urls []string
client *http.Client
timeout time.Duration
mtx sync.RWMutex
ams []Manager
logger log.Logger
}
func newAlertmanagerSet(urls []string, timeout time.Duration, logger log.Logger) (*alertmanagerSet, error) {
client := &http.Client{}
s := &alertmanagerSet{
client: client,
urls: urls,
logger: logger,
timeout: timeout,
}
ams := []Manager{}
for _, u := range urls {
am, err := New(u)
if err != nil {
level.Error(s.logger).Log(fmt.Sprintf("invalid alert manager url %s: %s", u, err))
} else {
ams = append(ams, am)
}
}
if len(ams) == 0 {
return s, fmt.Errorf("no alert managers")
}
s.ams = ams
return s, nil
}

View File

@ -3,6 +3,7 @@ package interfaces
import (
"context"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/stats"
am "go.signoz.io/query-service/integrations/alertManager"
@ -16,12 +17,6 @@ type Reader interface {
CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError)
EditChannel(receiver *am.Receiver, id string) (*am.Receiver, *model.ApiError)
GetRule(id string) (*model.RuleResponseItem, *model.ApiError)
ListRulesFromProm() (*model.AlertDiscovery, *model.ApiError)
CreateRule(alert string) *model.ApiError
EditRule(alert string, id string) *model.ApiError
DeleteRule(id string) *model.ApiError
GetInstantQueryMetricsResult(ctx context.Context, query *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, *model.ApiError)
@ -62,4 +57,7 @@ type Reader interface {
GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error)
GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error)
GetSamplesInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error)
// Connection needed for rules, not ideal but required
GetConn() clickhouse.Conn
}

View File

@ -2,6 +2,7 @@ package main
import (
"context"
"flag"
"os"
"os/signal"
"syscall"
@ -25,6 +26,18 @@ func initZapLog() *zap.Logger {
}
func main() {
var promConfigPath string
// disables rule execution but allows change to the rule definition
var disableRules bool
// the url used to build link in the alert messages in slack and other systems
var ruleRepoURL string
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)")
flag.Parse()
loggerMgr := initZapLog()
zap.ReplaceGlobals(loggerMgr)
@ -35,7 +48,10 @@ func main() {
serverOptions := &app.ServerOptions{
HTTPHostPort: constants.HTTPHostPort,
PromConfigPath: promConfigPath,
PrivateHostPort: constants.PrivateHostPort,
DisableRules: disableRules,
RuleRepoURL: ruleRepoURL,
}
// Read the jwt secret key

View File

@ -0,0 +1,85 @@
package promql
import (
"context"
"fmt"
"github.com/go-kit/log"
pmodel "github.com/prometheus/common/model"
plog "github.com/prometheus/common/promlog"
pconfig "github.com/prometheus/prometheus/config"
plabels "github.com/prometheus/prometheus/pkg/labels"
pql "github.com/prometheus/prometheus/promql"
pstorage "github.com/prometheus/prometheus/storage"
premote "github.com/prometheus/prometheus/storage/remote"
"time"
)
type PqlEngine struct {
engine *pql.Engine
fanoutStorage pstorage.Storage
}
func FromConfigPath(promConfigPath string) (*PqlEngine, error) {
// load storage path
c, err := pconfig.LoadFile(promConfigPath)
if err != nil {
return nil, fmt.Errorf("couldn't load configuration (--config.file=%q): %v", promConfigPath, err)
}
return NewPqlEngine(c)
}
func NewPqlEngine(config *pconfig.Config) (*PqlEngine, error) {
logLevel := plog.AllowedLevel{}
logLevel.Set("debug")
logger := plog.New(logLevel)
opts := pql.EngineOpts{
Logger: log.With(logger, "component", "promql evaluator"),
Reg: nil,
MaxConcurrent: 20,
MaxSamples: 50000000,
Timeout: time.Duration(2 * time.Minute),
}
e := pql.NewEngine(opts)
startTime := func() (int64, error) {
return int64(pmodel.Latest), nil
}
remoteStorage := premote.NewStorage(log.With(logger, "component", "remote"), startTime, time.Duration(1*time.Minute))
fanoutStorage := pstorage.NewFanout(logger, remoteStorage)
remoteStorage.ApplyConfig(config)
return &PqlEngine{
engine: e,
fanoutStorage: fanoutStorage,
}, nil
}
func (p *PqlEngine) RunAlertQuery(ctx context.Context, qs string, t time.Time) (pql.Vector, error) {
q, err := p.engine.NewInstantQuery(p.fanoutStorage, qs, t)
if err != nil {
return nil, err
}
res := q.Exec(ctx)
if res.Err != nil {
return nil, res.Err
}
switch v := res.Value.(type) {
case pql.Vector:
return v, nil
case pql.Scalar:
return pql.Vector{pql.Sample{
Point: pql.Point(v),
Metric: plabels.Labels{},
}}, nil
default:
return nil, fmt.Errorf("rule result is not a vector or scalar")
}
}

View File

@ -0,0 +1,200 @@
package rules
import (
"encoding/json"
"github.com/pkg/errors"
"go.signoz.io/query-service/model"
"go.signoz.io/query-service/utils/labels"
"time"
)
// how long before re-sending the alert
const resolvedRetention = 15 * time.Minute
const (
// AlertMetricName is the metric name for synthetic alert timeseries.
alertMetricName = "ALERTS"
// AlertForStateMetricName is the metric name for 'for' state of alert.
alertForStateMetricName = "ALERTS_FOR_STATE"
)
type RuleType string
const (
RuleTypeThreshold = "threshold_rule"
RuleTypeProm = "promql_rule"
)
type RuleHealth string
const (
HealthUnknown RuleHealth = "unknown"
HealthGood RuleHealth = "ok"
HealthBad RuleHealth = "err"
)
// AlertState denotes the state of an active alert.
type AlertState int
const (
StateInactive AlertState = iota
StatePending
StateFiring
)
func (s AlertState) String() string {
switch s {
case StateInactive:
return "inactive"
case StatePending:
return "pending"
case StateFiring:
return "firing"
}
panic(errors.Errorf("unknown alert state: %d", s))
}
type Alert struct {
State AlertState
Labels labels.BaseLabels
Annotations labels.BaseLabels
GeneratorURL string
Value float64
ActiveAt time.Time
FiredAt time.Time
ResolvedAt time.Time
LastSentAt time.Time
ValidUntil time.Time
}
// todo(amol): need to review this with ankit
func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
if a.State == StatePending {
return false
}
// if an alert has been resolved since the last send, resend it
if a.ResolvedAt.After(a.LastSentAt) {
return true
}
return a.LastSentAt.Add(resendDelay).Before(ts)
}
type NamedAlert struct {
Name string
*Alert
}
type CompareOp string
const (
CompareOpNone CompareOp = "0"
ValueIsAbove CompareOp = "1"
ValueIsBelow CompareOp = "2"
ValueIsEq CompareOp = "3"
ValueIsNotEq CompareOp = "4"
)
func ResolveCompareOp(cop CompareOp) string {
switch cop {
case ValueIsAbove:
return ">"
case ValueIsBelow:
return "<"
case ValueIsEq:
return "=="
case ValueIsNotEq:
return "!="
}
return ""
}
type MatchType string
const (
MatchTypeNone MatchType = "0"
AtleastOnce MatchType = "1"
AllTheTimes MatchType = "2"
OnAverage MatchType = "3"
InTotal MatchType = "4"
)
type RuleCondition struct {
CompositeMetricQuery *model.CompositeMetricQuery `json:"compositeMetricQuery,omitempty" yaml:"compositeMetricQuery,omitempty"`
CompareOp CompareOp `yaml:"op,omitempty" json:"op,omitempty"`
Target *float64 `yaml:"target,omitempty" json:"target,omitempty"`
MatchType `json:"matchType,omitempty"`
}
func (rc *RuleCondition) IsValid() bool {
if rc.CompositeMetricQuery == nil {
return false
}
if rc.QueryType() == model.QUERY_BUILDER {
if rc.Target == nil {
return false
}
if rc.CompareOp == "" {
return false
}
}
if rc.QueryType() == model.PROM {
if len(rc.CompositeMetricQuery.PromQueries) == 0 {
return false
}
}
return true
}
// QueryType is a short hand method to get query type
func (rc *RuleCondition) QueryType() model.QueryType {
if rc.CompositeMetricQuery != nil {
return rc.CompositeMetricQuery.QueryType
}
return 0
}
// String is useful in printing rule condition in logs
func (rc *RuleCondition) String() string {
if rc == nil {
return ""
}
data, _ := json.Marshal(*rc)
return string(data)
}
type Duration time.Duration
func (d Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(d).String())
}
func (d *Duration) UnmarshalJSON(b []byte) error {
var v interface{}
if err := json.Unmarshal(b, &v); err != nil {
return err
}
switch value := v.(type) {
case float64:
*d = Duration(time.Duration(value))
return nil
case string:
tmp, err := time.ParseDuration(value)
if err != nil {
return err
}
*d = Duration(tmp)
return nil
default:
return errors.New("invalid duration")
}
}

View File

@ -0,0 +1,230 @@
package rules
import (
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"go.signoz.io/query-service/model"
"go.uber.org/zap"
"time"
"unicode/utf8"
"go.signoz.io/query-service/utils/times"
"go.signoz.io/query-service/utils/timestamp"
yaml "gopkg.in/yaml.v2"
)
// this file contains api request and responses to be
// served over http
// PostableRule is used to create alerting rule from HTTP api
type PostableRule struct {
Alert string `yaml:"alert,omitempty" json:"alert,omitempty"`
Description string `yaml:"description,omitempty" json:"description,omitempty"`
RuleType RuleType `yaml:"ruleType,omitempty" json:"ruleType,omitempty"`
EvalWindow Duration `yaml:"evalWindow,omitempty" json:"evalWindow,omitempty"`
Frequency Duration `yaml:"frequency,omitempty" json:"frequency,omitempty"`
RuleCondition *RuleCondition `yaml:"condition,omitempty" json:"condition,omitempty"`
Labels map[string]string `yaml:"labels,omitempty" json:"labels,omitempty"`
Annotations map[string]string `yaml:"annotations,omitempty" json:"annotations,omitempty"`
// Source captures the source url where rule has been created
Source string `json:"source,omitempty"`
// legacy
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"`
OldYaml string `json:"yaml,omitempty"`
}
func ParsePostableRule(content []byte) (*PostableRule, []error) {
return parsePostableRule(content, "json")
}
func parsePostableRule(content []byte, kind string) (*PostableRule, []error) {
rule := PostableRule{}
var err error
if kind == "json" {
if err = json.Unmarshal(content, &rule); err != nil {
zap.S().Debugf("postable rule content", string(content), "\t kind:", kind)
return nil, []error{fmt.Errorf("failed to load json")}
}
} else if kind == "yaml" {
if err = yaml.Unmarshal(content, &rule); err != nil {
zap.S().Debugf("postable rule content", string(content), "\t kind:", kind)
return nil, []error{fmt.Errorf("failed to load yaml")}
}
} else {
return nil, []error{fmt.Errorf("invalid data type")}
}
zap.S().Debugf("postable rule(parsed):", rule)
if rule.RuleCondition == nil && rule.Expr != "" {
// account for legacy rules
rule.RuleType = RuleTypeProm
rule.EvalWindow = Duration(5 * time.Minute)
rule.Frequency = Duration(1 * time.Minute)
rule.RuleCondition = &RuleCondition{
CompositeMetricQuery: &model.CompositeMetricQuery{
QueryType: model.PROM,
PromQueries: map[string]*model.PromQuery{
"A": &model.PromQuery{
Query: rule.Expr,
},
},
},
}
}
if rule.EvalWindow == 0 {
rule.EvalWindow = Duration(5 * time.Minute)
}
if rule.Frequency == 0 {
rule.Frequency = Duration(1 * time.Minute)
}
if rule.RuleCondition != nil {
if rule.RuleCondition.CompositeMetricQuery.QueryType == model.QUERY_BUILDER {
rule.RuleType = RuleTypeThreshold
} else if rule.RuleCondition.CompositeMetricQuery.QueryType == model.PROM {
rule.RuleType = RuleTypeProm
}
for qLabel, q := range rule.RuleCondition.CompositeMetricQuery.BuilderQueries {
if q.MetricName != "" && q.Expression == "" {
q.Expression = qLabel
}
}
}
zap.S().Debugf("postable rule:", rule, "\t condition", rule.RuleCondition.String())
if errs := rule.Validate(); len(errs) > 0 {
return nil, errs
}
return &rule, []error{}
}
func isValidLabelName(ln string) bool {
if len(ln) == 0 {
return false
}
for i, b := range ln {
if !((b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || b == '_' || (b >= '0' && b <= '9' && i > 0)) {
return false
}
}
return true
}
func isValidLabelValue(v string) bool {
return utf8.ValidString(v)
}
func (r *PostableRule) Validate() (errs []error) {
if r.RuleCondition == nil {
errs = append(errs, errors.Errorf("rule condition is required"))
} else {
if r.RuleCondition.CompositeMetricQuery == nil {
errs = append(errs, errors.Errorf("composite metric query is required"))
}
}
if r.RuleType == RuleTypeThreshold {
if r.RuleCondition.Target == nil {
errs = append(errs, errors.Errorf("rule condition missing the threshold"))
}
if r.RuleCondition.CompareOp == "" {
errs = append(errs, errors.Errorf("rule condition missing the compare op"))
}
if r.RuleCondition.MatchType == "" {
errs = append(errs, errors.Errorf("rule condition missing the match option"))
}
}
for k, v := range r.Labels {
if !isValidLabelName(k) {
errs = append(errs, errors.Errorf("invalid label name: %s", k))
}
if !isValidLabelValue(v) {
errs = append(errs, errors.Errorf("invalid label value: %s", v))
}
}
for k := range r.Annotations {
if !isValidLabelName(k) {
errs = append(errs, errors.Errorf("invalid annotation name: %s", k))
}
}
errs = append(errs, testTemplateParsing(r)...)
return errs
}
func testTemplateParsing(rl *PostableRule) (errs []error) {
if rl.Alert == "" {
// Not an alerting rule.
return errs
}
// Trying to parse templates.
tmplData := AlertTemplateData(make(map[string]string), 0)
defs := "{{$labels := .Labels}}{{$value := .Value}}"
parseTest := func(text string) error {
tmpl := NewTemplateExpander(
context.TODO(),
defs+text,
"__alert_"+rl.Alert,
tmplData,
times.Time(timestamp.FromTime(time.Now())),
nil,
)
return tmpl.ParseTest()
}
// Parsing Labels.
for _, val := range rl.Labels {
err := parseTest(val)
if err != nil {
errs = append(errs, fmt.Errorf("msg=%s", err.Error()))
}
}
// Parsing Annotations.
for _, val := range rl.Annotations {
err := parseTest(val)
if err != nil {
errs = append(errs, fmt.Errorf("msg=%s", err.Error()))
}
}
return errs
}
// GettableRules has info for all stored rules.
type GettableRules struct {
Rules []*GettableRule `json:"rules"`
}
// GettableRule has info for an alerting rules.
type GettableRule struct {
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
State string `json:"state"`
Alert string `json:"alert"`
// Description string `yaml:"description,omitempty" json:"description,omitempty"`
Id string `json:"id"`
RuleType RuleType `yaml:"ruleType,omitempty" json:"ruleType,omitempty"`
EvalWindow Duration `yaml:"evalWindow,omitempty" json:"evalWindow,omitempty"`
Frequency Duration `yaml:"frequency,omitempty" json:"frequency,omitempty"`
RuleCondition RuleCondition `yaml:"condition,omitempty" json:"condition,omitempty"`
// ActiveAt *time.Time `json:"activeAt,omitempty"`
// Value float64 `json:"value"`
}

View File

@ -0,0 +1,187 @@
package rules
import (
"fmt"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
"strconv"
"time"
)
// Data store to capture user alert rule settings
type RuleDB interface {
// CreateRuleTx stores rule in the db and returns tx and group name (on success)
CreateRuleTx(rule string) (string, Tx, error)
// EditRuleTx updates the given rule in the db and returns tx and group name (on success)
EditRuleTx(rule string, id string) (string, Tx, error)
// DeleteRuleTx deletes the given rule in the db and returns tx and group name (on success)
DeleteRuleTx(id string) (string, Tx, error)
// GetStoredRules fetches the rule definitions from db
GetStoredRules() ([]StoredRule, error)
// GetStoredRule for a given ID from DB
GetStoredRule(id string) (*StoredRule, error)
}
type StoredRule struct {
Id int `json:"id" db:"id"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
Data string `json:"data" db:"data"`
}
type Tx interface {
Commit() error
Rollback() error
}
type ruleDB struct {
*sqlx.DB
}
// todo: move init methods for creating tables
func newRuleDB(db *sqlx.DB) RuleDB {
return &ruleDB{
db,
}
}
// CreateRuleTx stores a given rule in db and returns task name,
// sql tx and error (if any)
func (r *ruleDB) CreateRuleTx(rule string) (string, Tx, error) {
var groupName string
var lastInsertId int64
tx, err := r.Begin()
if err != nil {
return groupName, nil, err
}
stmt, err := tx.Prepare(`INSERT into rules (updated_at, data) VALUES($1,$2);`)
if err != nil {
zap.S().Errorf("Error in preparing statement for INSERT to rules\n", err)
tx.Rollback()
return groupName, nil, err
}
defer stmt.Close()
result, err := stmt.Exec(time.Now(), rule)
if err != nil {
zap.S().Errorf("Error in Executing prepared statement for INSERT to rules\n", err)
tx.Rollback() // return an error too, we may want to wrap them
return groupName, nil, err
}
lastInsertId, _ = result.LastInsertId()
groupName = prepareTaskName(lastInsertId)
return groupName, tx, nil
}
// EditRuleTx stores a given rule string in database and returns
// task name, sql tx and error (if any)
func (r *ruleDB) EditRuleTx(rule string, id string) (string, Tx, error) {
var groupName string
idInt, _ := strconv.Atoi(id)
if idInt == 0 {
return groupName, nil, fmt.Errorf("failed to read alert id from parameters")
}
groupName = prepareTaskName(int64(idInt))
// todo(amol): resolve this error - database locked when using
// edit transaction with sqlx
// tx, err := r.Begin()
//if err != nil {
// return groupName, tx, err
//}
stmt, err := r.Prepare(`UPDATE rules SET updated_at=$1, data=$2 WHERE id=$3;`)
if err != nil {
zap.S().Errorf("Error in preparing statement for UPDATE to rules\n", err)
// tx.Rollback()
return groupName, nil, err
}
defer stmt.Close()
if _, err := stmt.Exec(time.Now(), rule, idInt); err != nil {
zap.S().Errorf("Error in Executing prepared statement for UPDATE to rules\n", err)
// tx.Rollback() // return an error too, we may want to wrap them
return groupName, nil, err
}
return groupName, nil, nil
}
// DeleteRuleTx deletes a given rule with id and returns
// taskname, sql tx and error (if any)
func (r *ruleDB) DeleteRuleTx(id string) (string, Tx, error) {
idInt, _ := strconv.Atoi(id)
groupName := prepareTaskName(int64(idInt))
// commented as this causes db locked error
// tx, err := r.Begin()
// if err != nil {
// return groupName, tx, err
// }
stmt, err := r.Prepare(`DELETE FROM rules WHERE id=$1;`)
if err != nil {
return groupName, nil, err
}
defer stmt.Close()
if _, err := stmt.Exec(idInt); err != nil {
zap.S().Errorf("Error in Executing prepared statement for DELETE to rules\n", err)
// tx.Rollback()
return groupName, nil, err
}
return groupName, nil, nil
}
func (r *ruleDB) GetStoredRules() ([]StoredRule, error) {
rules := []StoredRule{}
query := fmt.Sprintf("SELECT id, updated_at, data FROM rules")
err := r.Select(&rules, query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, err
}
return rules, nil
}
func (r *ruleDB) GetStoredRule(id string) (*StoredRule, error) {
intId, err := strconv.Atoi(id)
if err != nil {
return nil, fmt.Errorf("invalid id parameter")
}
rule := &StoredRule{}
query := fmt.Sprintf("SELECT id, updated_at, data FROM rules WHERE id=%d", intId)
err = r.Get(rule, query)
// zap.S().Info(query)
if err != nil {
zap.S().Error("Error in processing sql query: ", err)
return nil, err
}
return rule, nil
}

View File

@ -0,0 +1,595 @@
package rules
import (
"context"
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/go-kit/log"
"go.uber.org/zap"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
// opentracing "github.com/opentracing/opentracing-go"
am "go.signoz.io/query-service/integrations/alertManager"
)
// namespace for prom metrics
const namespace = "signoz"
const taskNamesuffix = "webAppEditor"
func ruleIdFromTaskName(n string) string {
return strings.Split(n, "-groupname")[0]
}
func prepareTaskName(ruleId int64) string {
return fmt.Sprintf("%d-groupname", ruleId)
}
// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
NotifierOpts am.NotifierOptions
Queriers *Queriers
// RepoURL is used to generate a backlink in sent alert messages
RepoURL string
// rule db conn
DBConn *sqlx.DB
Context context.Context
Logger log.Logger
ResendDelay time.Duration
DisableRules bool
}
// The Manager manages recording and alerting rules.
type Manager struct {
opts *ManagerOptions
tasks map[string]Task
rules map[string]Rule
mtx sync.RWMutex
block chan struct{}
// Notifier sends messages through alert manager
notifier *am.Notifier
// datastore to store alert definitions
ruleDB RuleDB
// pause all rule tasks
pause bool
logger log.Logger
}
func defaultOptions(o *ManagerOptions) *ManagerOptions {
if o.NotifierOpts.QueueCapacity == 0 {
o.NotifierOpts.QueueCapacity = 10000
}
if o.NotifierOpts.Timeout == 0 {
o.NotifierOpts.Timeout = 10 * time.Second
}
if o.ResendDelay == time.Duration(0) {
o.ResendDelay = 1 * time.Minute
}
return o
}
// NewManager returns an implementation of Manager, ready to be started
// by calling the Run method.
func NewManager(o *ManagerOptions) (*Manager, error) {
o = defaultOptions(o)
// here we just initiate notifier, it will be started
// in run()
notifier, err := am.NewNotifier(&o.NotifierOpts, nil)
if err != nil {
// todo(amol): rethink on this, the query service
// should not be down because alert manager is not available
return nil, err
}
db := newRuleDB(o.DBConn)
m := &Manager{
tasks: map[string]Task{},
rules: map[string]Rule{},
notifier: notifier,
ruleDB: db,
opts: o,
block: make(chan struct{}),
logger: o.Logger,
}
return m, nil
}
func (m *Manager) Start() {
if err := m.initiate(); err != nil {
zap.S().Errorf("failed to initialize alerting rules manager: %v", err)
}
m.run()
}
func (m *Manager) Pause(b bool) {
m.mtx.Lock()
defer m.mtx.Unlock()
for _, t := range m.tasks {
t.Pause(b)
}
}
func (m *Manager) initiate() error {
storedRules, err := m.ruleDB.GetStoredRules()
if err != nil {
return err
}
if len(storedRules) == 0 {
return nil
}
var loadErrors []error
for _, rec := range storedRules {
taskName := fmt.Sprintf("%d-groupname", rec.Id)
parsedRule, errs := ParsePostableRule([]byte(rec.Data))
if len(errs) > 0 {
if errs[0].Error() == "failed to load json" {
zap.S().Info("failed to load rule in json format, trying yaml now:", rec.Data)
// see if rule is stored in yaml format
parsedRule, errs = parsePostableRule([]byte(rec.Data), "yaml")
if parsedRule == nil {
zap.S().Errorf("failed to parse and initialize yaml rule:", errs)
// just one rule is being parsed so expect just one error
loadErrors = append(loadErrors, errs[0])
continue
} else {
// rule stored in yaml, so migrate it to json
zap.S().Info("msg:", "migrating rule from JSON to yaml", "\t rule:", rec.Data, "\t parsed rule:", parsedRule)
ruleJSON, err := json.Marshal(parsedRule)
if err == nil {
taskName, _, err := m.ruleDB.EditRuleTx(string(ruleJSON), fmt.Sprintf("%d", rec.Id))
if err != nil {
zap.S().Errorf("msg: failed to migrate rule ", "/t error:", err)
} else {
zap.S().Info("msg:", "migrated rule from yaml to json", "/t rule:", taskName)
}
}
}
} else {
zap.S().Errorf("failed to parse and initialize rule:", errs)
// just one rule is being parsed so expect just one error
loadErrors = append(loadErrors, errs[0])
continue
}
}
err := m.addTask(parsedRule, taskName)
if err != nil {
zap.S().Errorf("failed to load the rule definition (%s): %v", taskName, err)
}
}
return nil
}
// Run starts processing of the rule manager.
func (m *Manager) run() {
// initiate notifier
go m.notifier.Run()
// initiate blocked tasks
close(m.block)
}
// Stop the rule manager's rule evaluation cycles.
func (m *Manager) Stop() {
m.mtx.Lock()
defer m.mtx.Unlock()
zap.S().Info("msg: ", "Stopping rule manager...")
for _, t := range m.tasks {
t.Stop()
}
zap.S().Info("msg: ", "Rule manager stopped")
}
// EditRuleDefinition writes the rule definition to the
// datastore and also updates the rule executor
func (m *Manager) EditRule(ruleStr string, id string) error {
// todo(amol): fetch recent rule from db first
parsedRule, errs := ParsePostableRule([]byte(ruleStr))
if len(errs) > 0 {
zap.S().Errorf("failed to parse rules:", errs)
// just one rule is being parsed so expect just one error
return errs[0]
}
taskName, _, err := m.ruleDB.EditRuleTx(ruleStr, id)
if err != nil {
return err
}
if !m.opts.DisableRules {
err = m.editTask(parsedRule, taskName)
if err != nil {
// todo(amol): using tx with sqllite3 is gets
// database locked. need to research and resolve this
//tx.Rollback()
return err
}
}
// return tx.Commit()
return nil
}
func (m *Manager) editTask(rule *PostableRule, taskName string) error {
m.mtx.Lock()
defer m.mtx.Unlock()
newTask, err := m.prepareTask(false, rule, taskName)
if err != nil {
zap.S().Errorf("msg:", "loading tasks failed", "\t err:", err)
return errors.New("error preparing rule with given parameters, previous rule set restored")
}
// If there is an old task with the same identifier, stop it and wait for
// it to finish the current iteration. Then copy it into the new group.
oldTask, ok := m.tasks[taskName]
if !ok {
zap.S().Errorf("msg:", "rule task not found, edit task failed", "\t task name:", taskName)
return errors.New("rule task not found, edit task failed")
}
delete(m.tasks, taskName)
if ok {
oldTask.Stop()
newTask.CopyState(oldTask)
}
go func() {
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<-m.block
newTask.Run(m.opts.Context)
}()
m.tasks[taskName] = newTask
return nil
}
func (m *Manager) DeleteRule(id string) error {
idInt, err := strconv.Atoi(id)
if err != nil {
zap.S().Errorf("msg: ", "delete rule received an rule id in invalid format, must be a number", "\t ruleid:", id)
return fmt.Errorf("delete rule received an rule id in invalid format, must be a number")
}
taskName := prepareTaskName(int64(idInt))
if !m.opts.DisableRules {
if err := m.deleteTask(taskName); err != nil {
zap.S().Errorf("msg: ", "failed to unload the rule task from memory, please retry", "\t ruleid: ", id)
return err
}
}
if _, _, err := m.ruleDB.DeleteRuleTx(id); err != nil {
zap.S().Errorf("msg: ", "failed to delete the rule from rule db", "\t ruleid: ", id)
return err
}
return nil
}
func (m *Manager) deleteTask(taskName string) error {
m.mtx.Lock()
defer m.mtx.Unlock()
oldg, ok := m.tasks[taskName]
if ok {
oldg.Stop()
delete(m.tasks, taskName)
delete(m.rules, ruleIdFromTaskName(taskName))
} else {
zap.S().Errorf("msg:", "rule not found for deletion", "\t name:", taskName)
return fmt.Errorf("rule not found")
}
return nil
}
// CreateRule stores rule def into db and also
// starts an executor for the rule
func (m *Manager) CreateRule(ruleStr string) error {
parsedRule, errs := ParsePostableRule([]byte(ruleStr))
if len(errs) > 0 {
zap.S().Errorf("failed to parse rules:", errs)
// just one rule is being parsed so expect just one error
return errs[0]
}
taskName, tx, err := m.ruleDB.CreateRuleTx(ruleStr)
if err != nil {
return err
}
if !m.opts.DisableRules {
if err := m.addTask(parsedRule, taskName); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}
func (m *Manager) addTask(rule *PostableRule, taskName string) error {
m.mtx.Lock()
defer m.mtx.Unlock()
newTask, err := m.prepareTask(false, rule, taskName)
if err != nil {
zap.S().Errorf("msg:", "creating rule task failed", "\t name:", taskName, "\t err", err)
return errors.New("error loading rules, previous rule set restored")
}
// If there is an another task with the same identifier, raise an error
_, ok := m.tasks[taskName]
if ok {
return fmt.Errorf("a rule with the same name already exists")
}
go func() {
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<-m.block
newTask.Run(m.opts.Context)
}()
m.tasks[taskName] = newTask
return nil
}
// prepareTask prepares a rule task from postable rule
func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string) (Task, error) {
if acquireLock {
m.mtx.Lock()
defer m.mtx.Unlock()
}
rules := make([]Rule, 0)
var task Task
if r.Alert == "" {
zap.S().Errorf("msg:", "task load failed, at least one rule must be set", "\t task name:", taskName)
return task, fmt.Errorf("task load failed, at least one rule must be set")
}
ruleId := ruleIdFromTaskName(taskName)
if r.RuleType == RuleTypeThreshold {
// create a threshold rule
tr, err := NewThresholdRule(
ruleId,
r.Alert,
r.RuleCondition,
time.Duration(r.EvalWindow),
r.Labels,
r.Annotations,
r.Source,
)
if err != nil {
return task, err
}
rules = append(rules, tr)
// create ch rule task for evalution
task = newTask(TaskTypeCh, taskName, taskNamesuffix, time.Duration(r.Frequency), rules, m.opts, m.prepareNotifyFunc())
// add rule to memory
m.rules[ruleId] = tr
} else if r.RuleType == RuleTypeProm {
// create promql rule
pr, err := NewPromRule(
ruleId,
r.Alert,
r.RuleCondition,
time.Duration(r.EvalWindow),
r.Labels,
r.Annotations,
// required as promql engine works with logger and not zap
log.With(m.logger, "alert", r.Alert),
r.Source,
)
if err != nil {
return task, err
}
rules = append(rules, pr)
// create promql rule task for evalution
task = newTask(TaskTypeProm, taskName, taskNamesuffix, time.Duration(r.Frequency), rules, m.opts, m.prepareNotifyFunc())
// add rule to memory
m.rules[ruleId] = pr
} else {
return nil, fmt.Errorf(fmt.Sprintf("unsupported rule type. Supported types: %s, %s", RuleTypeProm, RuleTypeThreshold))
}
return task, nil
}
// RuleTasks returns the list of manager's rule tasks.
func (m *Manager) RuleTasks() []Task {
m.mtx.RLock()
defer m.mtx.RUnlock()
rgs := make([]Task, 0, len(m.tasks))
for _, g := range m.tasks {
rgs = append(rgs, g)
}
sort.Slice(rgs, func(i, j int) bool {
return rgs[i].Name() < rgs[j].Name()
})
return rgs
}
// RuleTasks returns the list of manager's rule tasks.
func (m *Manager) RuleTasksWithoutLock() []Task {
rgs := make([]Task, 0, len(m.tasks))
for _, g := range m.tasks {
rgs = append(rgs, g)
}
sort.Slice(rgs, func(i, j int) bool {
return rgs[i].Name() < rgs[j].Name()
})
return rgs
}
// Rules returns the list of the manager's rules.
func (m *Manager) Rules() []Rule {
m.mtx.RLock()
defer m.mtx.RUnlock()
rules := []Rule{}
for _, r := range m.rules {
rules = append(rules, r)
}
return rules
}
// TriggeredAlerts returns the list of the manager's rules.
func (m *Manager) TriggeredAlerts() []*NamedAlert {
// m.mtx.RLock()
// defer m.mtx.RUnlock()
namedAlerts := []*NamedAlert{}
for _, r := range m.rules {
active := r.ActiveAlerts()
for _, a := range active {
awn := &NamedAlert{
Alert: a,
Name: r.Name(),
}
namedAlerts = append(namedAlerts, awn)
}
}
return namedAlerts
}
// NotifyFunc sends notifications about a set of alerts generated by the given expression.
type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)
// prepareNotifyFunc implements the NotifyFunc for a Notifier.
func (m *Manager) prepareNotifyFunc() NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*Alert) {
var res []*am.Alert
for _, alert := range alerts {
generatorURL := alert.GeneratorURL
if generatorURL == "" {
generatorURL = m.opts.RepoURL
}
a := &am.Alert{
StartsAt: alert.FiredAt,
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: generatorURL,
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
} else {
a.EndsAt = alert.ValidUntil
}
res = append(res, a)
}
if len(alerts) > 0 {
m.notifier.Send(res...)
}
}
}
func (m *Manager) ListActiveRules() ([]Rule, error) {
ruleList := []Rule{}
for _, r := range m.rules {
ruleList = append(ruleList, r)
}
return ruleList, nil
}
func (m *Manager) ListRuleStates() (*GettableRules, error) {
// fetch rules from DB
storedRules, err := m.ruleDB.GetStoredRules()
// initiate response object
resp := make([]*GettableRule, 0)
for _, s := range storedRules {
ruleResponse := &GettableRule{}
if err := json.Unmarshal([]byte(s.Data), ruleResponse); err != nil { // Parse []byte to go struct pointer
zap.S().Errorf("msg:", "invalid rule data", "\t err:", err)
continue
}
ruleResponse.Id = fmt.Sprintf("%d", s.Id)
// fetch state of rule from memory
if rm, ok := m.rules[ruleResponse.Id]; !ok {
zap.S().Warnf("msg:", "invalid rule id found while fetching list of rules", "\t err:", err, "\t rule_id:", ruleResponse.Id)
} else {
ruleResponse.State = rm.State().String()
}
resp = append(resp, ruleResponse)
}
return &GettableRules{Rules: resp}, nil
}
func (m *Manager) GetRule(id string) (*GettableRule, error) {
s, err := m.ruleDB.GetStoredRule(id)
if err != nil {
return nil, err
}
r := &GettableRule{}
if err := json.Unmarshal([]byte(s.Data), r); err != nil {
return nil, err
}
r.Id = fmt.Sprintf("%d", s.Id)
return r, nil
}

View File

@ -0,0 +1,155 @@
package rules
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
"go.signoz.io/query-service/app/clickhouseReader"
am "go.signoz.io/query-service/integrations/alertManager"
"go.signoz.io/query-service/model"
pqle "go.signoz.io/query-service/pqlEngine"
"go.signoz.io/query-service/utils/value"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"net/url"
"testing"
"time"
)
func initZapLog() *zap.Logger {
config := zap.NewDevelopmentConfig()
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
config.EncoderConfig.TimeKey = "timestamp"
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
logger, _ := config.Build()
return logger
}
func TestRules(t *testing.T) {
fmt.Println("starting test TestRules..")
loggerMgr := initZapLog()
zap.ReplaceGlobals(loggerMgr)
defer loggerMgr.Sync() // flushes buffer, if any
logger := loggerMgr.Sugar()
configFile := "../config/prometheus.yml"
// create engine
pqle, err := pqle.FromConfigPath(configFile)
if err != nil {
fmt.Println("failed to create pql:", err)
t.Errorf("failed to create pql engine : %v", err)
}
// create db conn
db, err := sqlx.Open("sqlite3", "../signoz.db")
if err != nil {
fmt.Println("failed to create db conn:", err)
t.Errorf("failed to create db conn: %v", err)
}
// create ch reader
ch := clickhouseReader.NewReader(db, configFile)
// notifier opts
notifierOpts := am.NotifierOptions{
QueueCapacity: 10000,
Timeout: 1 * time.Second,
AlertManagerURLs: []string{"http://localhost:9093/api/"},
}
externalURL, _ := url.Parse("http://signoz.io")
// create manager opts
managerOpts := &ManagerOptions{
NotifierOpts: notifierOpts,
Queriers: &Queriers{
PqlEngine: pqle,
Ch: ch,
},
ExternalURL: externalURL,
Conn: db,
Context: context.Background(),
Logger: nil,
}
// create Manager
manager, err := NewManager(managerOpts)
if err != nil {
fmt.Println("manager error:", err)
t.Errorf("manager error: %v", err)
}
fmt.Println("manager is ready:", manager)
manager.run()
// test rules
// create promql rule
/* promql rule
postableRule := PostableRule{
Alert: "test alert 1 - promql",
RuleType: RuleTypeProm,
EvalWindow: 5 * time.Minute,
Frequency: 30 * time.Second,
RuleCondition: RuleCondition{
CompositeMetricQuery: &model.CompositeMetricQuery{
QueryType: model.PROM,
PromQueries: map[string]*model.PromQuery{
"A": &model.PromQuery{Query: `sum(signoz_latency_count{span_kind="SPAN_KIND_SERVER"}) by (service_name) > 100`},
},
},
},
Labels: map[string]string{},
Annotations: map[string]string{},
}*/
// create builder rule
metricQuery := &model.MetricQuery{
QueryName: "A",
MetricName: "signoz_latency_count",
TagFilters: &model.FilterSet{Operation: "AND", Items: []model.FilterItem{
{Key: "span_kind", Value: "SPAN_KIND_SERVER", Operation: "neq"},
}},
GroupingTags: []string{"service_name"},
AggregateOperator: model.RATE_SUM,
Expression: "A",
}
postableRule := PostableRule{
Alert: "test alert 2 - builder",
RuleType: RuleTypeThreshold,
EvalWindow: 5 * time.Minute,
Frequency: 30 * time.Second,
RuleCondition: RuleCondition{
Target: value.Float64(500),
CompareOp: TargetIsMore,
CompositeMetricQuery: &model.CompositeMetricQuery{
QueryType: model.QUERY_BUILDER,
BuilderQueries: map[string]*model.MetricQuery{
"A": metricQuery,
},
},
},
Labels: map[string]string{"host": "server1"},
Annotations: map[string]string{},
}
err = manager.addTask(&postableRule, postableRule.Alert)
if err != nil {
fmt.Println("failed to add rule: ", err)
t.Errorf("failed to add rule")
}
signalsChannel := make(chan os.Signal, 1)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
for {
select {
case <-signalsChannel:
logger.Fatal("Received OS Interrupt Signal ... ")
}
}
}

View File

@ -0,0 +1,445 @@
package rules
import (
"context"
"fmt"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"go.uber.org/zap"
"sync"
"time"
plabels "github.com/prometheus/prometheus/pkg/labels"
pql "github.com/prometheus/prometheus/promql"
"go.signoz.io/query-service/model"
qslabels "go.signoz.io/query-service/utils/labels"
"go.signoz.io/query-service/utils/times"
"go.signoz.io/query-service/utils/timestamp"
yaml "gopkg.in/yaml.v2"
)
type PromRule struct {
id string
name string
source string
ruleCondition *RuleCondition
evalWindow time.Duration
holdDuration time.Duration
labels plabels.Labels
annotations plabels.Labels
mtx sync.Mutex
evaluationDuration time.Duration
evaluationTimestamp time.Time
health RuleHealth
lastError error
// map of active alerts
active map[uint64]*Alert
logger log.Logger
}
func NewPromRule(
id string,
name string,
ruleCondition *RuleCondition,
evalWindow time.Duration,
labels, annotations map[string]string,
logger log.Logger,
source string,
) (*PromRule, error) {
if int64(evalWindow) == 0 {
evalWindow = 5 * time.Minute
}
if ruleCondition == nil {
return nil, fmt.Errorf("no rule condition")
} else if !ruleCondition.IsValid() {
return nil, fmt.Errorf("invalid rule condition")
}
zap.S().Info("msg:", "creating new alerting rule", "\t name:", name, "\t condition:", ruleCondition.String())
return &PromRule{
id: id,
name: name,
source: source,
ruleCondition: ruleCondition,
evalWindow: evalWindow,
labels: plabels.FromMap(labels),
annotations: plabels.FromMap(annotations),
health: HealthUnknown,
active: map[uint64]*Alert{},
logger: logger,
}, nil
}
func (r *PromRule) Name() string {
return r.name
}
func (r *PromRule) ID() string {
return r.id
}
func (r *PromRule) Condition() *RuleCondition {
return r.ruleCondition
}
func (r *PromRule) Type() RuleType {
return RuleTypeProm
}
func (r *PromRule) GeneratorURL() string {
return r.source
}
func (r *PromRule) SetLastError(err error) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.lastError = err
}
func (r *PromRule) LastError() error {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.lastError
}
func (r *PromRule) SetHealth(health RuleHealth) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.health = health
}
func (r *PromRule) Health() RuleHealth {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.health
}
// SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation.
func (r *PromRule) SetEvaluationDuration(dur time.Duration) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.evaluationDuration = dur
}
func (r *PromRule) HoldDuration() time.Duration {
return r.holdDuration
}
func (r *PromRule) EvalWindow() time.Duration {
return r.evalWindow
}
// Labels returns the labels of the alerting rule.
func (r *PromRule) Labels() qslabels.BaseLabels {
return r.labels
}
// Annotations returns the annotations of the alerting rule.
func (r *PromRule) Annotations() qslabels.BaseLabels {
return r.annotations
}
func (r *PromRule) sample(alert *Alert, ts time.Time) pql.Sample {
lb := plabels.NewBuilder(r.labels)
alertLabels := alert.Labels.(plabels.Labels)
for _, l := range alertLabels {
lb.Set(l.Name, l.Value)
}
lb.Set(qslabels.MetricNameLabel, alertMetricName)
lb.Set(qslabels.AlertNameLabel, r.name)
lb.Set(qslabels.AlertStateLabel, alert.State.String())
s := pql.Sample{
Metric: lb.Labels(),
Point: pql.Point{T: timestamp.FromTime(ts), V: 1},
}
return s
}
// forStateSample returns the sample for ALERTS_FOR_STATE.
func (r *PromRule) forStateSample(alert *Alert, ts time.Time, v float64) pql.Sample {
lb := plabels.NewBuilder(r.labels)
alertLabels := alert.Labels.(plabels.Labels)
for _, l := range alertLabels {
lb.Set(l.Name, l.Value)
}
lb.Set(plabels.MetricName, alertForStateMetricName)
lb.Set(plabels.AlertName, r.name)
s := pql.Sample{
Metric: lb.Labels(),
Point: pql.Point{T: timestamp.FromTime(ts), V: v},
}
return s
}
// GetEvaluationDuration returns the time in seconds it took to evaluate the alerting rule.
func (r *PromRule) GetEvaluationDuration() time.Duration {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.evaluationDuration
}
// SetEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule was last evaluated.
func (r *PromRule) SetEvaluationTimestamp(ts time.Time) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.evaluationTimestamp = ts
}
// GetEvaluationTimestamp returns the time the evaluation took place.
func (r *PromRule) GetEvaluationTimestamp() time.Time {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.evaluationTimestamp
}
// State returns the maximum state of alert instances for this rule.
// StateFiring > StatePending > StateInactive
func (r *PromRule) State() AlertState {
r.mtx.Lock()
defer r.mtx.Unlock()
maxState := StateInactive
for _, a := range r.active {
if a.State > maxState {
maxState = a.State
}
}
return maxState
}
func (r *PromRule) currentAlerts() []*Alert {
r.mtx.Lock()
defer r.mtx.Unlock()
alerts := make([]*Alert, 0, len(r.active))
for _, a := range r.active {
anew := *a
alerts = append(alerts, &anew)
}
return alerts
}
func (r *PromRule) ActiveAlerts() []*Alert {
var res []*Alert
for _, a := range r.currentAlerts() {
if a.ResolvedAt.IsZero() {
res = append(res, a)
}
}
return res
}
// ForEachActiveAlert runs the given function on each alert.
// This should be used when you want to use the actual alerts from the ThresholdRule
// and not on its copy.
// If you want to run on a copy of alerts then don't use this, get the alerts from 'ActiveAlerts()'.
func (r *PromRule) ForEachActiveAlert(f func(*Alert)) {
r.mtx.Lock()
defer r.mtx.Unlock()
for _, a := range r.active {
f(a)
}
}
func (r *PromRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if alert.needsSending(ts, resendDelay) {
alert.LastSentAt = ts
// Allow for two Eval or Alertmanager send failures.
delta := resendDelay
if interval > resendDelay {
delta = interval
}
alert.ValidUntil = ts.Add(4 * delta)
anew := *alert
alerts = append(alerts, &anew)
}
})
notifyFunc(ctx, "", alerts...)
}
func (r *PromRule) getPqlQuery() (string, error) {
if r.ruleCondition.CompositeMetricQuery.QueryType == model.PROM {
if len(r.ruleCondition.CompositeMetricQuery.PromQueries) > 0 {
if promQuery, ok := r.ruleCondition.CompositeMetricQuery.PromQueries["A"]; ok {
query := promQuery.Query
if query == "" {
return query, fmt.Errorf("a promquery needs to be set for this rule to function")
}
if r.ruleCondition.Target != nil && r.ruleCondition.CompareOp != CompareOpNone {
query = fmt.Sprintf("%s %s %f", query, ResolveCompareOp(r.ruleCondition.CompareOp), *r.ruleCondition.Target)
return query, nil
} else {
return query, nil
}
}
}
}
return "", fmt.Errorf("invalid promql rule query")
}
func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) {
q, err := r.getPqlQuery()
if err != nil {
return nil, err
}
zap.S().Info("rule:", r.Name(), "\t evaluating promql query: ", q)
res, err := queriers.PqlEngine.RunAlertQuery(ctx, q, ts)
if err != nil {
r.SetHealth(HealthBad)
r.SetLastError(err)
return nil, err
}
r.mtx.Lock()
defer r.mtx.Unlock()
resultFPs := map[uint64]struct{}{}
var vec pql.Vector
var alerts = make(map[uint64]*Alert, len(res))
for _, smpl := range res {
l := make(map[string]string, len(smpl.Metric))
for _, lbl := range smpl.Metric {
l[lbl.Name] = lbl.Value
}
tmplData := AlertTemplateData(l, smpl.V)
// Inject some convenience variables that are easier to remember for users
// who are not used to Go's templating system.
defs := "{{$labels := .Labels}}{{$value := .Value}}"
expand := func(text string) string {
tmpl := NewTemplateExpander(
ctx,
defs+text,
"__alert_"+r.Name(),
tmplData,
times.Time(timestamp.FromTime(ts)),
nil,
)
result, err := tmpl.Expand()
if err != nil {
result = fmt.Sprintf("<error expanding template: %s>", err)
level.Warn(r.logger).Log("msg", "Expanding alert template failed", "err", err, "data", tmplData)
}
return result
}
lb := plabels.NewBuilder(smpl.Metric).Del(plabels.MetricName)
for _, l := range r.labels {
lb.Set(l.Name, expand(l.Value))
}
lb.Set(qslabels.AlertNameLabel, r.Name())
lb.Set(qslabels.AlertRuleIdLabel, r.ID())
lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL())
annotations := make(plabels.Labels, 0, len(r.annotations))
for _, a := range r.annotations {
annotations = append(annotations, plabels.Label{Name: a.Name, Value: expand(a.Value)})
}
lbs := lb.Labels()
h := lbs.Hash()
resultFPs[h] = struct{}{}
if _, ok := alerts[h]; ok {
err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels")
// We have already acquired the lock above hence using SetHealth and
// SetLastError will deadlock.
r.health = HealthBad
r.lastError = err
return nil, err
}
alerts[h] = &Alert{
Labels: lbs,
Annotations: annotations,
ActiveAt: ts,
State: StatePending,
Value: smpl.V,
GeneratorURL: r.GeneratorURL(),
}
}
// alerts[h] is ready, add or update active list now
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
// Update the last value and annotations if so, create a new alert entry otherwise.
if alert, ok := r.active[h]; ok && alert.State != StateInactive {
alert.Value = a.Value
alert.Annotations = a.Annotations
continue
}
r.active[h] = a
}
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.active {
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
delete(r.active, fp)
}
if a.State != StateInactive {
a.State = StateInactive
a.ResolvedAt = ts
}
continue
}
if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {
a.State = StateFiring
a.FiredAt = ts
}
}
r.health = HealthGood
r.lastError = err
return vec, nil
}
func (r *PromRule) String() string {
ar := PostableRule{
Alert: r.name,
RuleCondition: r.ruleCondition,
EvalWindow: Duration(r.evalWindow),
Labels: r.labels.Map(),
Annotations: r.annotations.Map(),
}
byt, err := yaml.Marshal(ar)
if err != nil {
return fmt.Sprintf("error marshaling alerting rule: %s", err.Error())
}
return string(byt)
}

View File

@ -0,0 +1,370 @@
package rules
import (
"context"
"fmt"
"github.com/go-kit/log"
opentracing "github.com/opentracing/opentracing-go"
plabels "github.com/prometheus/prometheus/pkg/labels"
pql "github.com/prometheus/prometheus/promql"
"go.uber.org/zap"
"sort"
"sync"
"time"
)
// PromRuleTask is a promql rule executor
type PromRuleTask struct {
name string
file string
frequency time.Duration
rules []Rule
seriesInPreviousEval []map[string]plabels.Labels // One per Rule.
staleSeries []plabels.Labels
opts *ManagerOptions
mtx sync.Mutex
evaluationDuration time.Duration
evaluationTime time.Duration
lastEvaluation time.Time
markStale bool
done chan struct{}
terminated chan struct{}
managerDone chan struct{}
pause bool
logger log.Logger
notify NotifyFunc
}
// newPromRuleTask holds rules that have promql condition
// and evalutes the rule at a given frequency
func newPromRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc) *PromRuleTask {
zap.S().Info("Initiating a new rule group:", name, "\t frequency:", frequency)
if time.Now() == time.Now().Add(frequency) {
frequency = DefaultFrequency
}
return &PromRuleTask{
name: name,
file: file,
pause: false,
frequency: frequency,
rules: rules,
opts: opts,
seriesInPreviousEval: make([]map[string]plabels.Labels, len(rules)),
done: make(chan struct{}),
terminated: make(chan struct{}),
notify: notify,
logger: log.With(opts.Logger, "group", name),
}
}
// Name returns the group name.
func (g *PromRuleTask) Name() string { return g.name }
// Key returns the group key
func (g *PromRuleTask) Key() string {
return g.name + ";" + g.file
}
func (g *PromRuleTask) Type() TaskType { return TaskTypeProm }
// Rules returns the group's rules.
func (g *PromRuleTask) Rules() []Rule { return g.rules }
// Interval returns the group's interval.
func (g *PromRuleTask) Interval() time.Duration { return g.frequency }
func (g *PromRuleTask) Pause(b bool) {
g.mtx.Lock()
defer g.mtx.Unlock()
g.pause = b
}
func (g *PromRuleTask) Run(ctx context.Context) {
defer close(g.terminated)
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
select {
case <-time.After(time.Until(evalTimestamp)):
case <-g.done:
return
}
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
"ruleGroup": map[string]string{
"name": g.Name(),
},
})
iter := func() {
start := time.Now()
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}
// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.frequency` occurrence.
tick := time.NewTicker(g.frequency)
defer tick.Stop()
// defer cleanup
defer func() {
if !g.markStale {
return
}
go func(now time.Time) {
for _, rule := range g.seriesInPreviousEval {
for _, r := range rule {
g.staleSeries = append(g.staleSeries, r)
}
}
// That can be garbage collected at this point.
g.seriesInPreviousEval = nil
}(time.Now())
}()
iter()
// let the group iterate and run
for {
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.frequency) - 1
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
iter()
}
}
}
}
func (g *PromRuleTask) Stop() {
close(g.done)
<-g.terminated
}
func (g *PromRuleTask) hash() uint64 {
l := plabels.New(
plabels.Label{Name: "name", Value: g.name},
)
return l.Hash()
}
// PromRules returns the list of the group's promql rules.
func (g *PromRuleTask) PromRules() []*PromRule {
g.mtx.Lock()
defer g.mtx.Unlock()
var alerts []*PromRule
for _, rule := range g.rules {
if tr, ok := rule.(*PromRule); ok {
alerts = append(alerts, tr)
}
}
sort.Slice(alerts, func(i, j int) bool {
return alerts[i].State() > alerts[j].State() ||
(alerts[i].State() == alerts[j].State() &&
alerts[i].Name() < alerts[j].Name())
})
return alerts
}
// HasAlertingRules returns true if the group contains at least one AlertingRule.
func (g *PromRuleTask) HasAlertingRules() bool {
g.mtx.Lock()
defer g.mtx.Unlock()
for _, rule := range g.rules {
if _, ok := rule.(*ThresholdRule); ok {
return true
}
}
return false
}
// GetEvaluationDuration returns the time in seconds it took to evaluate the rule group.
func (g *PromRuleTask) GetEvaluationDuration() time.Duration {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.evaluationDuration
}
// SetEvaluationDuration sets the time in seconds the last evaluation took.
func (g *PromRuleTask) SetEvaluationDuration(dur time.Duration) {
g.mtx.Lock()
defer g.mtx.Unlock()
g.evaluationDuration = dur
}
// GetEvaluationTime returns the time in seconds it took to evaluate the rule group.
func (g *PromRuleTask) GetEvaluationTime() time.Duration {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.evaluationTime
}
// setEvaluationTime sets the time in seconds the last evaluation took.
func (g *PromRuleTask) setEvaluationTime(dur time.Duration) {
g.mtx.Lock()
defer g.mtx.Unlock()
g.evaluationTime = dur
}
// GetLastEvaluation returns the time the last evaluation of the rule group took place.
func (g *PromRuleTask) GetLastEvaluation() time.Time {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.lastEvaluation
}
// setLastEvaluation updates evaluationTimestamp to the timestamp of when the rule group was last evaluated.
func (g *PromRuleTask) setLastEvaluation(ts time.Time) {
g.mtx.Lock()
defer g.mtx.Unlock()
g.lastEvaluation = ts
}
// EvalTimestamp returns the immediately preceding consistently slotted evaluation time.
func (g *PromRuleTask) EvalTimestamp(startTime int64) time.Time {
var (
offset = int64(g.hash() % uint64(g.frequency))
adjNow = startTime - offset
base = adjNow - (adjNow % int64(g.frequency))
)
return time.Unix(0, base+offset).UTC()
}
// CopyState copies the alerting rule and staleness related state from the given group.
//
// Rules are matched based on their name and labels. If there are duplicates, the
// first is matched with the first, second with the second etc.
func (g *PromRuleTask) CopyState(fromTask Task) error {
from, ok := fromTask.(*PromRuleTask)
if !ok {
return fmt.Errorf("you can only copy rule groups with same type")
}
g.evaluationTime = from.evaluationTime
g.lastEvaluation = from.lastEvaluation
ruleMap := make(map[string][]int, len(from.rules))
for fi, fromRule := range from.rules {
nameAndLabels := nameAndLabels(fromRule)
l := ruleMap[nameAndLabels]
ruleMap[nameAndLabels] = append(l, fi)
}
for i, rule := range g.rules {
nameAndLabels := nameAndLabels(rule)
indexes := ruleMap[nameAndLabels]
if len(indexes) == 0 {
continue
}
fi := indexes[0]
g.seriesInPreviousEval[i] = from.seriesInPreviousEval[fi]
ruleMap[nameAndLabels] = indexes[1:]
ar, ok := rule.(*ThresholdRule)
if !ok {
continue
}
far, ok := from.rules[fi].(*ThresholdRule)
if !ok {
continue
}
for fp, a := range far.active {
ar.active[fp] = a
}
}
// Handle deleted and unmatched duplicate rules.
g.staleSeries = from.staleSeries
for fi, fromRule := range from.rules {
nameAndLabels := nameAndLabels(fromRule)
l := ruleMap[nameAndLabels]
if len(l) != 0 {
for _, series := range from.seriesInPreviousEval[fi] {
g.staleSeries = append(g.staleSeries, series)
}
}
}
return nil
}
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
zap.S().Info("promql rule task:", g.name, "\t eval started at:", ts)
var samplesTotal float64
for i, rule := range g.rules {
if rule == nil {
continue
}
select {
case <-g.done:
return
default:
}
func(i int, rule Rule) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "rule")
sp.SetTag("name", rule.Name())
defer func(t time.Time) {
sp.Finish()
since := time.Since(t)
rule.SetEvaluationDuration(since)
rule.SetEvaluationTimestamp(t)
}(time.Now())
data, err := rule.Eval(ctx, ts, g.opts.Queriers)
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
zap.S().Warn("msg", "Evaluating rule failed", "rule", rule, "err", err)
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
//! if _, ok := err.(promql.ErrQueryCanceled); !ok {
// level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
//}
return
}
vector := data.(pql.Vector)
samplesTotal += float64(len(vector))
rule.SendAlerts(ctx, ts, g.opts.ResendDelay, g.frequency, g.notify)
seriesReturned := make(map[string]plabels.Labels, len(g.seriesInPreviousEval[i]))
defer func() {
g.seriesInPreviousEval[i] = seriesReturned
}()
for _, s := range vector {
seriesReturned[s.Metric.String()] = s.Metric
}
}(i, rule)
}
}

View File

@ -0,0 +1,21 @@
package rules
import (
"github.com/ClickHouse/clickhouse-go/v2"
pqle "go.signoz.io/query-service/pqlEngine"
)
// Queriers register the options for querying metrics or event sources
// which return a condition that results in a alert. Currently we support
// promql engine and clickhouse queries but in future we may include
// api readers for Machine Learning (ML) use cases.
// Note: each rule will pick up the querier it is interested in
// and use it. This allows rules to have flexibility in choosing
// the query engines.
type Queriers struct {
// promql engine
PqlEngine *pqle.PqlEngine
// metric querier
Ch clickhouse.Conn
}

View File

@ -0,0 +1,50 @@
package rules
import (
"encoding/json"
"fmt"
"strconv"
"go.signoz.io/query-service/utils/labels"
)
// common result format of query
type Vector []Sample
type Sample struct {
Point
Metric labels.Labels
}
func (s Sample) String() string {
return fmt.Sprintf("%s => %s", s.Metric, s.Point)
}
func (s Sample) MarshalJSON() ([]byte, error) {
v := struct {
M labels.Labels `json:"metric"`
V Point `json:"value"`
}{
M: s.Metric,
V: s.Point,
}
return json.Marshal(v)
}
type Point struct {
T int64
V float64
}
func (p Point) String() string {
v := strconv.FormatFloat(p.V, 'f', -1, 64)
return fmt.Sprintf("%v @[%v]", v, p.T)
}
// MarshalJSON implements json.Marshaler.
func (p Point) MarshalJSON() ([]byte, error) {
v := strconv.FormatFloat(p.V, 'f', -1, 64)
return json.Marshal([...]interface{}{float64(p.T) / 1000, v})
}

View File

@ -0,0 +1,35 @@
package rules
import (
"context"
"go.signoz.io/query-service/utils/labels"
"time"
)
// A Rule encapsulates a vector expression which is evaluated at a specified
// interval and acted upon (currently used for alerting).
type Rule interface {
ID() string
Name() string
Type() RuleType
Labels() labels.BaseLabels
Annotations() labels.BaseLabels
Condition() *RuleCondition
State() AlertState
ActiveAlerts() []*Alert
Eval(context.Context, time.Time, *Queriers) (interface{}, error)
String() string
// Query() string
SetLastError(error)
LastError() error
SetHealth(RuleHealth)
Health() RuleHealth
SetEvaluationDuration(time.Duration)
GetEvaluationDuration() time.Duration
SetEvaluationTimestamp(time.Time)
GetEvaluationTimestamp() time.Time
SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc)
}

View File

@ -0,0 +1,385 @@
package rules
import (
"context"
"fmt"
opentracing "github.com/opentracing/opentracing-go"
"go.signoz.io/query-service/utils/labels"
"go.uber.org/zap"
"sort"
"sync"
"time"
)
// RuleTask holds a rule (with composite queries)
// and evaluates the rule at a given frequency
type RuleTask struct {
name string
file string
frequency time.Duration
rules []Rule
seriesInPreviousEval []map[string]labels.Labels // One per Rule.
staleSeries []labels.Labels
opts *ManagerOptions
mtx sync.Mutex
evaluationDuration time.Duration
evaluationTime time.Duration
lastEvaluation time.Time
markStale bool
done chan struct{}
terminated chan struct{}
managerDone chan struct{}
pause bool
notify NotifyFunc
}
const DefaultFrequency = 1 * time.Minute
// newRuleTask makes a new RuleTask with the given name, options, and rules.
func newRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc) *RuleTask {
if time.Now() == time.Now().Add(frequency) {
frequency = DefaultFrequency
}
zap.S().Info("msg:", "initiating a new rule task", "\t name:", name, "\t frequency:", frequency)
return &RuleTask{
name: name,
file: file,
pause: false,
frequency: frequency,
rules: rules,
opts: opts,
seriesInPreviousEval: make([]map[string]labels.Labels, len(rules)),
done: make(chan struct{}),
terminated: make(chan struct{}),
notify: notify,
}
}
// Name returns the group name.
func (g *RuleTask) Name() string { return g.name }
// Key returns the group key
func (g *RuleTask) Key() string {
return g.name + ";" + g.file
}
// Name returns the group name.
func (g *RuleTask) Type() TaskType { return TaskTypeCh }
// Rules returns the group's rules.
func (g *RuleTask) Rules() []Rule { return g.rules }
// Interval returns the group's interval.
func (g *RuleTask) Interval() time.Duration { return g.frequency }
func (g *RuleTask) Pause(b bool) {
g.mtx.Lock()
defer g.mtx.Unlock()
g.pause = b
}
type QueryOrigin struct{}
func NewQueryOriginContext(ctx context.Context, data map[string]interface{}) context.Context {
return context.WithValue(ctx, QueryOrigin{}, data)
}
func (g *RuleTask) Run(ctx context.Context) {
defer close(g.terminated)
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
zap.S().Debugf("group:", g.name, "\t group run to begin at: ", evalTimestamp)
select {
case <-time.After(time.Until(evalTimestamp)):
case <-g.done:
return
}
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
"ruleRuleTask": map[string]string{
"name": g.Name(),
},
})
iter := func() {
if g.pause {
// todo(amol): remove in memory active alerts
// and last series state
return
}
start := time.Now()
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}
// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.frequency` occurrence.
tick := time.NewTicker(g.frequency)
defer tick.Stop()
// defer cleanup
defer func() {
if !g.markStale {
return
}
go func(now time.Time) {
for _, rule := range g.seriesInPreviousEval {
for _, r := range rule {
g.staleSeries = append(g.staleSeries, r)
}
}
// That can be garbage collected at this point.
g.seriesInPreviousEval = nil
}(time.Now())
}()
iter()
// let the group iterate and run
for {
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.frequency) - 1
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
iter()
}
}
}
}
func (g *RuleTask) Stop() {
close(g.done)
<-g.terminated
}
func (g *RuleTask) hash() uint64 {
l := labels.New(
labels.Label{Name: "name", Value: g.name},
)
return l.Hash()
}
// ThresholdRules returns the list of the group's threshold rules.
func (g *RuleTask) ThresholdRules() []*ThresholdRule {
g.mtx.Lock()
defer g.mtx.Unlock()
var alerts []*ThresholdRule
for _, rule := range g.rules {
if tr, ok := rule.(*ThresholdRule); ok {
alerts = append(alerts, tr)
}
}
sort.Slice(alerts, func(i, j int) bool {
return alerts[i].State() > alerts[j].State() ||
(alerts[i].State() == alerts[j].State() &&
alerts[i].Name() < alerts[j].Name())
})
return alerts
}
// HasAlertingRules returns true if the group contains at least one AlertingRule.
func (g *RuleTask) HasAlertingRules() bool {
g.mtx.Lock()
defer g.mtx.Unlock()
for _, rule := range g.rules {
if _, ok := rule.(*ThresholdRule); ok {
return true
}
}
return false
}
// GetEvaluationDuration returns the time in seconds it took to evaluate the rule group.
func (g *RuleTask) GetEvaluationDuration() time.Duration {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.evaluationDuration
}
// SetEvaluationDuration sets the time in seconds the last evaluation took.
func (g *RuleTask) SetEvaluationDuration(dur time.Duration) {
g.mtx.Lock()
defer g.mtx.Unlock()
g.evaluationDuration = dur
}
// GetEvaluationTime returns the time in seconds it took to evaluate the rule group.
func (g *RuleTask) GetEvaluationTime() time.Duration {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.evaluationTime
}
// setEvaluationTime sets the time in seconds the last evaluation took.
func (g *RuleTask) setEvaluationTime(dur time.Duration) {
g.mtx.Lock()
defer g.mtx.Unlock()
g.evaluationTime = dur
}
// GetLastEvaluation returns the time the last evaluation of the rule group took place.
func (g *RuleTask) GetLastEvaluation() time.Time {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.lastEvaluation
}
// setLastEvaluation updates evaluationTimestamp to the timestamp of when the rule group was last evaluated.
func (g *RuleTask) setLastEvaluation(ts time.Time) {
g.mtx.Lock()
defer g.mtx.Unlock()
g.lastEvaluation = ts
}
// EvalTimestamp returns the immediately preceding consistently slotted evaluation time.
func (g *RuleTask) EvalTimestamp(startTime int64) time.Time {
var (
offset = int64(g.hash() % uint64(g.frequency))
adjNow = startTime - offset
base = adjNow - (adjNow % int64(g.frequency))
)
return time.Unix(0, base+offset).UTC()
}
func nameAndLabels(rule Rule) string {
return rule.Name() + rule.Labels().String()
}
// CopyState copies the alerting rule and staleness related state from the given group.
//
// Rules are matched based on their name and labels. If there are duplicates, the
// first is matched with the first, second with the second etc.
func (g *RuleTask) CopyState(fromTask Task) error {
from, ok := fromTask.(*RuleTask)
if !ok {
return fmt.Errorf("invalid from task for copy")
}
g.evaluationTime = from.evaluationTime
g.lastEvaluation = from.lastEvaluation
ruleMap := make(map[string][]int, len(from.rules))
for fi, fromRule := range from.rules {
nameAndLabels := nameAndLabels(fromRule)
l := ruleMap[nameAndLabels]
ruleMap[nameAndLabels] = append(l, fi)
}
for i, rule := range g.rules {
nameAndLabels := nameAndLabels(rule)
indexes := ruleMap[nameAndLabels]
if len(indexes) == 0 {
continue
}
fi := indexes[0]
g.seriesInPreviousEval[i] = from.seriesInPreviousEval[fi]
ruleMap[nameAndLabels] = indexes[1:]
// todo(amol): support other rules too here
ar, ok := rule.(*ThresholdRule)
if !ok {
continue
}
far, ok := from.rules[fi].(*ThresholdRule)
if !ok {
continue
}
for fp, a := range far.active {
ar.active[fp] = a
}
}
// Handle deleted and unmatched duplicate rules.
// todo(amol): possibly not needed any more
g.staleSeries = from.staleSeries
for fi, fromRule := range from.rules {
nameAndLabels := nameAndLabels(fromRule)
l := ruleMap[nameAndLabels]
if len(l) != 0 {
for _, series := range from.seriesInPreviousEval[fi] {
g.staleSeries = append(g.staleSeries, series)
}
}
}
return nil
}
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
zap.S().Debugf("msg:", "rule task eval started", "\t name:", g.name, "\t start time:", ts)
var samplesTotal float64
for i, rule := range g.rules {
if rule == nil {
continue
}
select {
case <-g.done:
return
default:
}
func(i int, rule Rule) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "rule")
sp.SetTag("name", rule.Name())
defer func(t time.Time) {
sp.Finish()
since := time.Since(t)
rule.SetEvaluationDuration(since)
rule.SetEvaluationTimestamp(t)
}(time.Now())
data, err := rule.Eval(ctx, ts, g.opts.Queriers)
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
zap.S().Warn("msg:", "Evaluating rule failed", "\t rule:", rule, "\t err: ", err)
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
//! if _, ok := err.(promql.ErrQueryCanceled); !ok {
// level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
//}
return
}
vector := data.(Vector)
samplesTotal += float64(len(vector))
rule.SendAlerts(ctx, ts, g.opts.ResendDelay, g.frequency, g.notify)
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
for _, s := range vector {
seriesReturned[s.Metric.String()] = s.Metric
}
g.seriesInPreviousEval[i] = seriesReturned
}(i, rule)
}
}

View File

@ -0,0 +1,37 @@
package rules
import (
"context"
"time"
)
type TaskType string
const (
TaskTypeProm = "promql_ruletask"
TaskTypeCh = "ch_ruletask"
)
type Task interface {
Name() string
// Key returns the group key
Key() string
Type() TaskType
CopyState(from Task) error
Eval(ctx context.Context, ts time.Time)
Run(ctx context.Context)
Rules() []Rule
Stop()
Pause(b bool)
}
// newTask returns an appropriate group for
// rule type
func newTask(taskType TaskType, name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc) Task {
if taskType == TaskTypeCh {
return newRuleTask(name, file, frequency, rules, opts, notify)
}
return newPromRuleTask(name, file, frequency, rules, opts, notify)
}

View File

@ -0,0 +1,290 @@
package rules
import (
"bytes"
"context"
"errors"
"fmt"
"math"
"net/url"
"regexp"
"sort"
"strings"
html_template "html/template"
text_template "text/template"
"go.signoz.io/query-service/utils/times"
)
type tmplQueryRecord struct {
Labels map[string]string
Value float64
}
type tmplQueryResults []*tmplQueryRecord
type tmplQueryResultsByLabelSorter struct {
results tmplQueryResults
by string
}
func (q tmplQueryResultsByLabelSorter) Len() int {
return len(q.results)
}
func (q tmplQueryResultsByLabelSorter) Less(i, j int) bool {
return q.results[i].Labels[q.by] < q.results[j].Labels[q.by]
}
func (q tmplQueryResultsByLabelSorter) Swap(i, j int) {
q.results[i], q.results[j] = q.results[j], q.results[i]
}
// Expander executes templates in text or HTML mode with a common set of Prometheus template functions.
type TemplateExpander struct {
text string
name string
data interface{}
funcMap text_template.FuncMap
}
// NewTemplateExpander returns a template expander ready to use.
func NewTemplateExpander(
ctx context.Context,
text string,
name string,
data interface{},
timestamp times.Time,
externalURL *url.URL,
) *TemplateExpander {
return &TemplateExpander{
text: text,
name: name,
data: data,
funcMap: text_template.FuncMap{
"first": func(v tmplQueryResults) (*tmplQueryRecord, error) {
if len(v) > 0 {
return v[0], nil
}
return nil, errors.New("first() called on vector with no elements")
},
"label": func(label string, s *tmplQueryRecord) string {
return s.Labels[label]
},
"value": func(s *tmplQueryRecord) float64 {
return s.Value
},
"strvalue": func(s *tmplQueryRecord) string {
return s.Labels["__value__"]
},
"args": func(args ...interface{}) map[string]interface{} {
result := make(map[string]interface{})
for i, a := range args {
result[fmt.Sprintf("arg%d", i)] = a
}
return result
},
"reReplaceAll": func(pattern, repl, text string) string {
re := regexp.MustCompile(pattern)
return re.ReplaceAllString(text, repl)
},
"safeHtml": func(text string) html_template.HTML {
return html_template.HTML(text)
},
"match": regexp.MatchString,
"title": strings.Title,
"toUpper": strings.ToUpper,
"toLower": strings.ToLower,
"sortByLabel": func(label string, v tmplQueryResults) tmplQueryResults {
sorter := tmplQueryResultsByLabelSorter{v[:], label}
sort.Stable(sorter)
return v
},
"humanize": func(v float64) string {
if v == 0 || math.IsNaN(v) || math.IsInf(v, 0) {
return fmt.Sprintf("%.4g", v)
}
if math.Abs(v) >= 1 {
prefix := ""
for _, p := range []string{"k", "M", "G", "T", "P", "E", "Z", "Y"} {
if math.Abs(v) < 1000 {
break
}
prefix = p
v /= 1000
}
return fmt.Sprintf("%.4g%s", v, prefix)
}
prefix := ""
for _, p := range []string{"m", "u", "n", "p", "f", "a", "z", "y"} {
if math.Abs(v) >= 1 {
break
}
prefix = p
v *= 1000
}
return fmt.Sprintf("%.4g%s", v, prefix)
},
"humanize1024": func(v float64) string {
if math.Abs(v) <= 1 || math.IsNaN(v) || math.IsInf(v, 0) {
return fmt.Sprintf("%.4g", v)
}
prefix := ""
for _, p := range []string{"ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi", "Yi"} {
if math.Abs(v) < 1024 {
break
}
prefix = p
v /= 1024
}
return fmt.Sprintf("%.4g%s", v, prefix)
},
"humanizeDuration": func(v float64) string {
if math.IsNaN(v) || math.IsInf(v, 0) {
return fmt.Sprintf("%.4g", v)
}
if v == 0 {
return fmt.Sprintf("%.4gs", v)
}
if math.Abs(v) >= 1 {
sign := ""
if v < 0 {
sign = "-"
v = -v
}
seconds := int64(v) % 60
minutes := (int64(v) / 60) % 60
hours := (int64(v) / 60 / 60) % 24
days := (int64(v) / 60 / 60 / 24)
// For days to minutes, we display seconds as an integer.
if days != 0 {
return fmt.Sprintf("%s%dd %dh %dm %ds", sign, days, hours, minutes, seconds)
}
if hours != 0 {
return fmt.Sprintf("%s%dh %dm %ds", sign, hours, minutes, seconds)
}
if minutes != 0 {
return fmt.Sprintf("%s%dm %ds", sign, minutes, seconds)
}
// For seconds, we display 4 significant digts.
return fmt.Sprintf("%s%.4gs", sign, v)
}
prefix := ""
for _, p := range []string{"m", "u", "n", "p", "f", "a", "z", "y"} {
if math.Abs(v) >= 1 {
break
}
prefix = p
v *= 1000
}
return fmt.Sprintf("%.4g%ss", v, prefix)
},
"humanizeTimestamp": func(v float64) string {
if math.IsNaN(v) || math.IsInf(v, 0) {
return fmt.Sprintf("%.4g", v)
}
t := times.TimeFromUnixNano(int64(v * 1e9)).Time().UTC()
return fmt.Sprint(t)
},
"pathPrefix": func() string {
return externalURL.Path
},
"externalURL": func() string {
return externalURL.String()
},
},
}
}
// AlertTemplateData returns the interface to be used in expanding the template.
func AlertTemplateData(labels map[string]string, value float64) interface{} {
return struct {
Labels map[string]string
Value float64
}{
Labels: labels,
Value: value,
}
}
// Funcs adds the functions in fm to the Expander's function map.
// Existing functions will be overwritten in case of conflict.
func (te TemplateExpander) Funcs(fm text_template.FuncMap) {
for k, v := range fm {
te.funcMap[k] = v
}
}
// Expand expands a template in text (non-HTML) mode.
func (te TemplateExpander) Expand() (result string, resultErr error) {
// It'd better to have no alert description than to kill the whole process
// if there's a bug in the template.
defer func() {
if r := recover(); r != nil {
var ok bool
resultErr, ok = r.(error)
if !ok {
resultErr = fmt.Errorf("panic expanding template %v: %v", te.name, r)
}
}
}()
tmpl, err := text_template.New(te.name).Funcs(te.funcMap).Option("missingkey=zero").Parse(te.text)
if err != nil {
return "", fmt.Errorf("error parsing template %v: %v", te.name, err)
}
var buffer bytes.Buffer
err = tmpl.Execute(&buffer, te.data)
if err != nil {
return "", fmt.Errorf("error executing template %v: %v", te.name, err)
}
return buffer.String(), nil
}
// ExpandHTML expands a template with HTML escaping, with templates read from the given files.
func (te TemplateExpander) ExpandHTML(templateFiles []string) (result string, resultErr error) {
defer func() {
if r := recover(); r != nil {
var ok bool
resultErr, ok = r.(error)
if !ok {
resultErr = fmt.Errorf("panic expanding template %v: %v", te.name, r)
}
}
}()
tmpl := html_template.New(te.name).Funcs(html_template.FuncMap(te.funcMap))
tmpl.Option("missingkey=zero")
tmpl.Funcs(html_template.FuncMap{
"tmpl": func(name string, data interface{}) (html_template.HTML, error) {
var buffer bytes.Buffer
err := tmpl.ExecuteTemplate(&buffer, name, data)
return html_template.HTML(buffer.String()), err
},
})
tmpl, err := tmpl.Parse(te.text)
if err != nil {
return "", fmt.Errorf("error parsing template %v: %v", te.name, err)
}
if len(templateFiles) > 0 {
_, err = tmpl.ParseFiles(templateFiles...)
if err != nil {
return "", fmt.Errorf("error parsing template files for %v: %v", te.name, err)
}
}
var buffer bytes.Buffer
err = tmpl.Execute(&buffer, te.data)
if err != nil {
return "", fmt.Errorf("error executing template %v: %v", te.name, err)
}
return buffer.String(), nil
}
// ParseTest parses the templates and returns the error if any.
func (te TemplateExpander) ParseTest() error {
_, err := text_template.New(te.name).Funcs(te.funcMap).Option("missingkey=zero").Parse(te.text)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,679 @@
package rules
import (
"context"
"fmt"
"go.uber.org/zap"
"math"
"reflect"
"sort"
"sync"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"go.signoz.io/query-service/app/metrics"
"go.signoz.io/query-service/constants"
qsmodel "go.signoz.io/query-service/model"
"go.signoz.io/query-service/utils/labels"
"go.signoz.io/query-service/utils/times"
"go.signoz.io/query-service/utils/timestamp"
"go.signoz.io/query-service/utils/value"
yaml "gopkg.in/yaml.v2"
)
type ThresholdRule struct {
id string
name string
source string
ruleCondition *RuleCondition
evalWindow time.Duration
holdDuration time.Duration
labels labels.Labels
annotations labels.Labels
mtx sync.Mutex
evaluationDuration time.Duration
evaluationTimestamp time.Time
health RuleHealth
lastError error
// map of active alerts
active map[uint64]*Alert
}
func NewThresholdRule(
id string,
name string,
ruleCondition *RuleCondition,
evalWindow time.Duration,
l, a map[string]string,
source string,
) (*ThresholdRule, error) {
if int64(evalWindow) == 0 {
evalWindow = 5 * time.Minute
}
if ruleCondition == nil {
return nil, fmt.Errorf("no rule condition")
} else if !ruleCondition.IsValid() {
return nil, fmt.Errorf("invalid rule condition")
}
zap.S().Info("msg:", "creating new alerting rule", "\t name:", name, "\t condition:", ruleCondition.String())
return &ThresholdRule{
id: id,
name: name,
source: source,
ruleCondition: ruleCondition,
evalWindow: evalWindow,
labels: labels.FromMap(l),
annotations: labels.FromMap(a),
health: HealthUnknown,
active: map[uint64]*Alert{},
}, nil
}
func (r *ThresholdRule) Name() string {
return r.name
}
func (r *ThresholdRule) ID() string {
return r.id
}
func (r *ThresholdRule) Condition() *RuleCondition {
return r.ruleCondition
}
func (r *ThresholdRule) GeneratorURL() string {
return r.source
}
func (r *ThresholdRule) target() *float64 {
if r.ruleCondition == nil {
return nil
}
return r.ruleCondition.Target
}
func (r *ThresholdRule) matchType() MatchType {
if r.ruleCondition == nil {
return AtleastOnce
}
return r.ruleCondition.MatchType
}
func (r *ThresholdRule) compareOp() CompareOp {
if r.ruleCondition == nil {
return ValueIsEq
}
return r.ruleCondition.CompareOp
}
func (r *ThresholdRule) Type() RuleType {
return RuleTypeThreshold
}
func (r *ThresholdRule) SetLastError(err error) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.lastError = err
}
func (r *ThresholdRule) LastError() error {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.lastError
}
func (r *ThresholdRule) SetHealth(health RuleHealth) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.health = health
}
func (r *ThresholdRule) Health() RuleHealth {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.health
}
// SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation.
func (r *ThresholdRule) SetEvaluationDuration(dur time.Duration) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.evaluationDuration = dur
}
func (r *ThresholdRule) HoldDuration() time.Duration {
return r.holdDuration
}
func (r *ThresholdRule) EvalWindow() time.Duration {
return r.evalWindow
}
// Labels returns the labels of the alerting rule.
func (r *ThresholdRule) Labels() labels.BaseLabels {
return r.labels
}
// Annotations returns the annotations of the alerting rule.
func (r *ThresholdRule) Annotations() labels.BaseLabels {
return r.annotations
}
func (r *ThresholdRule) sample(alert *Alert, ts time.Time) Sample {
lb := labels.NewBuilder(r.labels)
alertLabels := alert.Labels.(labels.Labels)
for _, l := range alertLabels {
lb.Set(l.Name, l.Value)
}
lb.Set(labels.MetricNameLabel, alertMetricName)
lb.Set(labels.AlertNameLabel, r.name)
lb.Set(labels.AlertRuleIdLabel, r.ID())
lb.Set(labels.AlertStateLabel, alert.State.String())
s := Sample{
Metric: lb.Labels(),
Point: Point{T: timestamp.FromTime(ts), V: 1},
}
return s
}
// forStateSample returns the sample for ALERTS_FOR_STATE.
func (r *ThresholdRule) forStateSample(alert *Alert, ts time.Time, v float64) Sample {
lb := labels.NewBuilder(r.labels)
alertLabels := alert.Labels.(labels.Labels)
for _, l := range alertLabels {
lb.Set(l.Name, l.Value)
}
lb.Set(labels.MetricNameLabel, alertForStateMetricName)
lb.Set(labels.AlertNameLabel, r.name)
s := Sample{
Metric: lb.Labels(),
Point: Point{T: timestamp.FromTime(ts), V: v},
}
return s
}
// GetEvaluationDuration returns the time in seconds it took to evaluate the alerting rule.
func (r *ThresholdRule) GetEvaluationDuration() time.Duration {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.evaluationDuration
}
// SetEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule was last evaluated.
func (r *ThresholdRule) SetEvaluationTimestamp(ts time.Time) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.evaluationTimestamp = ts
}
// GetEvaluationTimestamp returns the time the evaluation took place.
func (r *ThresholdRule) GetEvaluationTimestamp() time.Time {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.evaluationTimestamp
}
// State returns the maximum state of alert instances for this rule.
// StateFiring > StatePending > StateInactive
func (r *ThresholdRule) State() AlertState {
r.mtx.Lock()
defer r.mtx.Unlock()
maxState := StateInactive
for _, a := range r.active {
if a.State > maxState {
maxState = a.State
}
}
return maxState
}
func (r *ThresholdRule) currentAlerts() []*Alert {
r.mtx.Lock()
defer r.mtx.Unlock()
alerts := make([]*Alert, 0, len(r.active))
for _, a := range r.active {
anew := *a
alerts = append(alerts, &anew)
}
return alerts
}
func (r *ThresholdRule) ActiveAlerts() []*Alert {
var res []*Alert
for _, a := range r.currentAlerts() {
if a.ResolvedAt.IsZero() {
res = append(res, a)
}
}
return res
}
// ForEachActiveAlert runs the given function on each alert.
// This should be used when you want to use the actual alerts from the ThresholdRule
// and not on its copy.
// If you want to run on a copy of alerts then don't use this, get the alerts from 'ActiveAlerts()'.
func (r *ThresholdRule) ForEachActiveAlert(f func(*Alert)) {
r.mtx.Lock()
defer r.mtx.Unlock()
for _, a := range r.active {
f(a)
}
}
func (r *ThresholdRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
zap.S().Info("msg:", "initiating send alerts (if any)", "\t rule:", r.Name())
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if alert.needsSending(ts, resendDelay) {
alert.LastSentAt = ts
// Allow for two Eval or Alertmanager send failures.
delta := resendDelay
if interval > resendDelay {
delta = interval
}
alert.ValidUntil = ts.Add(4 * delta)
anew := *alert
alerts = append(alerts, &anew)
} else {
zap.S().Debugf("msg: skipping send alert due to resend delay", "\t rule: ", r.Name(), "\t alert:", alert.Labels)
}
})
notifyFunc(ctx, "", alerts...)
}
func (r *ThresholdRule) CheckCondition(v float64) bool {
if value.IsNaN(v) {
zap.S().Debugf("msg:", "found NaN in rule condition", "\t rule name:", r.Name())
return false
}
if r.ruleCondition.Target == nil {
zap.S().Debugf("msg:", "found null target in rule condition", "\t rulename:", r.Name())
return false
}
switch r.ruleCondition.CompareOp {
case ValueIsEq:
return v == *r.ruleCondition.Target
case ValueIsNotEq:
return v != *r.ruleCondition.Target
case ValueIsBelow:
return v < *r.ruleCondition.Target
case ValueIsAbove:
return v > *r.ruleCondition.Target
default:
return false
}
}
func (r *ThresholdRule) prepareQueryRange(ts time.Time) *qsmodel.QueryRangeParamsV2 {
// todo(amol): add 30 seconds to evalWindow for rate calc
tsEnd := ts.UnixNano() / int64(time.Millisecond)
tsStart := ts.Add(-time.Duration(r.evalWindow)).UnixNano() / int64(time.Millisecond)
// for k, v := range r.ruleCondition.CompositeMetricQuery.BuilderQueries {
// v.ReduceTo = qsmodel.RMAX
// r.ruleCondition.CompositeMetricQuery.BuilderQueries[k] = v
// }
return &qsmodel.QueryRangeParamsV2{
Start: tsStart,
End: tsEnd,
Step: 30,
CompositeMetricQuery: r.ruleCondition.CompositeMetricQuery,
}
}
// queryClickhouse runs actual query against clickhouse
func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, query string) (Vector, error) {
rows, err := db.Query(ctx, query)
if err != nil {
zap.S().Errorf("rule:", r.Name(), "\t failed to get alert query result")
return nil, err
}
columnTypes := rows.ColumnTypes()
if err != nil {
return nil, err
}
columnNames := rows.Columns()
if err != nil {
return nil, err
}
vars := make([]interface{}, len(columnTypes))
for i := range columnTypes {
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}
// []sample list
var result Vector
// map[fingerprint]sample
resultMap := make(map[uint64]Sample, 0)
// for rates we want to skip the first record
// but we dont know when the rates are being used
// so we always pick timeframe - 30 seconds interval
// and skip the first record for a given label combo
skipFirstRecord := make(map[uint64]bool, 0)
defer rows.Close()
for rows.Next() {
if err := rows.Scan(vars...); err != nil {
return nil, err
}
sample := Sample{}
lbls := labels.NewBuilder(labels.Labels{})
for i, v := range vars {
colName := columnNames[i]
switch v := v.(type) {
case *string:
lbls.Set(colName, *v)
case *time.Time:
timval := *v
if colName == "ts" {
sample.Point.T = timval.Unix()
} else {
lbls.Set(colName, timval.Format("2006-01-02 15:04:05"))
}
case *float64:
if colName == "res" || colName == "value" {
sample.Point.V = *v
} else {
lbls.Set(colName, fmt.Sprintf("%f", *v))
}
case *uint64:
intv := *v
if colName == "res" || colName == "value" {
sample.Point.V = float64(intv)
} else {
lbls.Set(colName, fmt.Sprintf("%d", intv))
}
case *uint8:
intv := *v
if colName == "res" || colName == "value" {
sample.Point.V = float64(intv)
} else {
lbls.Set(colName, fmt.Sprintf("%d", intv))
}
default:
zap.S().Errorf("ruleId:", r.ID(), "\t error: invalid var found in query result", v, columnNames[i])
}
}
if value.IsNaN(sample.Point.V) {
continue
}
// capture lables in result
sample.Metric = lbls.Labels()
labelHash := lbls.Labels().Hash()
// here we walk through values of time series
// and calculate the final value used to compare
// with rule target
if existing, ok := resultMap[labelHash]; ok {
switch r.matchType() {
case AllTheTimes:
if r.compareOp() == ValueIsAbove {
sample.Point.V = math.Min(existing.Point.V, sample.Point.V)
resultMap[labelHash] = sample
} else if r.compareOp() == ValueIsBelow {
sample.Point.V = math.Max(existing.Point.V, sample.Point.V)
resultMap[labelHash] = sample
}
case AtleastOnce:
if r.compareOp() == ValueIsAbove {
sample.Point.V = math.Max(existing.Point.V, sample.Point.V)
resultMap[labelHash] = sample
} else if r.compareOp() == ValueIsBelow {
sample.Point.V = math.Min(existing.Point.V, sample.Point.V)
resultMap[labelHash] = sample
}
case OnAverage:
sample.Point.V = (existing.Point.V + sample.Point.V) / 2
resultMap[labelHash] = sample
case InTotal:
sample.Point.V = (existing.Point.V + sample.Point.V)
resultMap[labelHash] = sample
}
} else {
if exists, _ := skipFirstRecord[labelHash]; exists {
resultMap[labelHash] = sample
} else {
// looks like the first record for this label combo, skip it
skipFirstRecord[labelHash] = true
}
}
}
for _, sample := range resultMap {
// check alert rule condition before dumping results
if r.CheckCondition(sample.Point.V) {
result = append(result, sample)
}
}
return result, nil
}
// query looks if alert condition is being
// satisfied and returns the signals
func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch clickhouse.Conn) (Vector, error) {
params := r.prepareQueryRange(ts)
runQueries := metrics.PrepareBuilderMetricQueries(params, constants.SIGNOZ_TIMESERIES_TABLENAME)
if runQueries.Err != nil {
return nil, fmt.Errorf("failed to prepare metric queries: %v", runQueries.Err)
}
if len(runQueries.Queries) == 0 {
return nil, fmt.Errorf("no queries could be built with the rule config")
}
zap.S().Debugf("ruleid:", r.ID(), "\t runQueries:", runQueries.Queries)
// find target query label
if query, ok := runQueries.Queries["F1"]; ok {
// found a formula query, run with it
return r.runChQuery(ctx, ch, query)
}
// no formula in rule condition, now look for
// query label with max ascii val
keys := make([]string, 0, len(runQueries.Queries))
for k := range runQueries.Queries {
keys = append(keys, k)
}
sort.Strings(keys)
queryLabel := keys[len(keys)-1]
zap.S().Debugf("ruleId: ", r.ID(), "\t result query label:", queryLabel)
if queryString, ok := runQueries.Queries[queryLabel]; ok {
return r.runChQuery(ctx, ch, queryString)
}
zap.S().Errorf("ruleId: ", r.ID(), "\t invalid query label:", queryLabel, "\t queries:", runQueries.Queries)
return nil, fmt.Errorf("this is unexpected, invalid query label")
}
func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) {
res, err := r.buildAndRunQuery(ctx, ts, queriers.Ch)
if err != nil {
r.SetHealth(HealthBad)
r.SetLastError(err)
zap.S().Debugf("ruleid:", r.ID(), "\t failure in buildAndRunQuery:", err)
return nil, err
}
r.mtx.Lock()
defer r.mtx.Unlock()
resultFPs := map[uint64]struct{}{}
var vec Vector
var alerts = make(map[uint64]*Alert, len(res))
for _, smpl := range res {
l := make(map[string]string, len(smpl.Metric))
for _, lbl := range smpl.Metric {
l[lbl.Name] = lbl.Value
}
tmplData := AlertTemplateData(l, smpl.V)
// Inject some convenience variables that are easier to remember for users
// who are not used to Go's templating system.
defs := "{{$labels := .Labels}}{{$value := .Value}}"
expand := func(text string) string {
tmpl := NewTemplateExpander(
ctx,
defs+text,
"__alert_"+r.Name(),
tmplData,
times.Time(timestamp.FromTime(ts)),
nil,
)
result, err := tmpl.Expand()
if err != nil {
result = fmt.Sprintf("<error expanding template: %s>", err)
zap.S().Errorf("msg:", "Expanding alert template failed", "\t err", err, "\t data", tmplData)
}
return result
}
lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel)
for _, l := range r.labels {
lb.Set(l.Name, expand(l.Value))
}
lb.Set(labels.AlertNameLabel, r.Name())
lb.Set(labels.AlertRuleIdLabel, r.ID())
lb.Set(labels.RuleSourceLabel, r.GeneratorURL())
annotations := make(labels.Labels, 0, len(r.annotations))
for _, a := range r.annotations {
annotations = append(annotations, labels.Label{Name: a.Name, Value: expand(a.Value)})
}
lbs := lb.Labels()
h := lbs.Hash()
resultFPs[h] = struct{}{}
if _, ok := alerts[h]; ok {
zap.S().Errorf("ruleId: ", r.ID(), "\t msg:", "the alert query returns duplicate records:", alerts[h])
err = fmt.Errorf("duplicate alert found, vector contains metrics with the same labelset after applying alert labels")
// We have already acquired the lock above hence using SetHealth and
// SetLastError will deadlock.
r.health = HealthBad
r.lastError = err
return nil, err
}
alerts[h] = &Alert{
Labels: lbs,
Annotations: annotations,
ActiveAt: ts,
State: StatePending,
Value: smpl.V,
GeneratorURL: r.GeneratorURL(),
}
}
zap.S().Info("rule:", r.Name(), "\t alerts found: ", len(alerts))
// alerts[h] is ready, add or update active list now
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
// Update the last value and annotations if so, create a new alert entry otherwise.
if alert, ok := r.active[h]; ok && alert.State != StateInactive {
alert.Value = a.Value
alert.Annotations = a.Annotations
continue
}
r.active[h] = a
}
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.active {
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
delete(r.active, fp)
}
if a.State != StateInactive {
a.State = StateInactive
a.ResolvedAt = ts
}
continue
}
if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {
a.State = StateFiring
a.FiredAt = ts
}
}
r.health = HealthGood
r.lastError = err
return vec, nil
}
func (r *ThresholdRule) String() string {
ar := PostableRule{
Alert: r.name,
RuleCondition: r.ruleCondition,
EvalWindow: Duration(r.evalWindow),
Labels: r.labels.Map(),
Annotations: r.annotations.Map(),
}
byt, err := yaml.Marshal(ar)
if err != nil {
return fmt.Sprintf("error marshaling alerting rule: %s", err.Error())
}
return string(byt)
}

View File

@ -0,0 +1,13 @@
package labels
type BaseLabels interface {
Len() int
Swap(i, j int)
Less(i, j int) bool
String() string
Hash() uint64
HashForLabels(names ...string) uint64
Get(name string) string
Has(name string) bool
Map() map[string]string
}

View File

@ -0,0 +1,306 @@
package labels
import (
"bytes"
"encoding/json"
"sort"
"strconv"
"strings"
"github.com/cespare/xxhash"
)
const sep = '\xff'
// Well-known label names used by Prometheus components.
const (
MetricNameLabel = "__name__"
AlertNameLabel = "alertname"
BucketLabel = "le"
InstanceName = "instance"
// AlertStateLabel is the label name indicating the state of an alert.
AlertStateLabel = "alertstate"
AlertRuleIdLabel = "ruleId"
RuleSourceLabel = "ruleSource"
)
// Label is a key/value pair of strings.
type Label struct {
Name, Value string
}
// Labels is a sorted set of labels. Order has to be guaranteed upon
// instantiation.
type Labels []Label
func (ls Labels) Len() int { return len(ls) }
func (ls Labels) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] }
func (ls Labels) Less(i, j int) bool { return ls[i].Name < ls[j].Name }
func (ls Labels) String() string {
var b bytes.Buffer
b.WriteByte('{')
for i, l := range ls {
if i > 0 {
b.WriteByte(',')
b.WriteByte(' ')
}
b.WriteString(l.Name)
b.WriteByte('=')
b.WriteString(strconv.Quote(l.Value))
}
b.WriteByte('}')
return b.String()
}
// MarshalJSON implements json.Marshaler.
func (ls Labels) MarshalJSON() ([]byte, error) {
return json.Marshal(ls.Map())
}
// UnmarshalJSON implements json.Unmarshaler.
func (ls *Labels) UnmarshalJSON(b []byte) error {
var m map[string]string
if err := json.Unmarshal(b, &m); err != nil {
return err
}
*ls = FromMap(m)
return nil
}
// Hash returns a hash value for the label set.
func (ls Labels) Hash() uint64 {
b := make([]byte, 0, 1024)
for _, v := range ls {
b = append(b, v.Name...)
b = append(b, sep)
b = append(b, v.Value...)
b = append(b, sep)
}
return xxhash.Sum64(b)
}
// HashForLabels returns a hash value for the labels matching the provided names.
func (ls Labels) HashForLabels(names ...string) uint64 {
b := make([]byte, 0, 1024)
for _, v := range ls {
for _, n := range names {
if v.Name == n {
b = append(b, v.Name...)
b = append(b, sep)
b = append(b, v.Value...)
b = append(b, sep)
break
}
}
}
return xxhash.Sum64(b)
}
// HashWithoutLabels returns a hash value for all labels except those matching
// the provided names.
func (ls Labels) HashWithoutLabels(names ...string) uint64 {
b := make([]byte, 0, 1024)
Outer:
for _, v := range ls {
if v.Name == MetricNameLabel {
continue
}
for _, n := range names {
if v.Name == n {
continue Outer
}
}
b = append(b, v.Name...)
b = append(b, sep)
b = append(b, v.Value...)
b = append(b, sep)
}
return xxhash.Sum64(b)
}
// Copy returns a copy of the labels.
func (ls Labels) Copy() Labels {
res := make(Labels, len(ls))
copy(res, ls)
return res
}
// Get returns the value for the label with the given name.
// Returns an empty string if the label doesn't exist.
func (ls Labels) Get(name string) string {
for _, l := range ls {
if l.Name == name {
return l.Value
}
}
return ""
}
// Has returns true if the label with the given name is present.
func (ls Labels) Has(name string) bool {
for _, l := range ls {
if l.Name == name {
return true
}
}
return false
}
// Equal returns whether the two label sets are equal.
func Equal(ls, o Labels) bool {
if len(ls) != len(o) {
return false
}
for i, l := range ls {
if l.Name != o[i].Name || l.Value != o[i].Value {
return false
}
}
return true
}
// Map returns a string map of the labels.
func (ls Labels) Map() map[string]string {
m := make(map[string]string, len(ls))
for _, l := range ls {
m[l.Name] = l.Value
}
return m
}
// New returns a sorted Labels from the given labels.
// The caller has to guarantee that all label names are unique.
func New(ls ...Label) Labels {
set := make(Labels, 0, len(ls))
for _, l := range ls {
set = append(set, l)
}
sort.Sort(set)
return set
}
// FromMap returns new sorted Labels from the given map.
func FromMap(m map[string]string) Labels {
l := make([]Label, 0, len(m))
for k, v := range m {
l = append(l, Label{Name: k, Value: v})
}
return New(l...)
}
// FromStrings creates new labels from pairs of strings.
func FromStrings(ss ...string) Labels {
if len(ss)%2 != 0 {
panic("invalid number of strings")
}
var res Labels
for i := 0; i < len(ss); i += 2 {
res = append(res, Label{Name: ss[i], Value: ss[i+1]})
}
sort.Sort(res)
return res
}
// Compare compares the two label sets.
// The result will be 0 if a==b, <0 if a < b, and >0 if a > b.
func Compare(a, b Labels) int {
l := len(a)
if len(b) < l {
l = len(b)
}
for i := 0; i < l; i++ {
if d := strings.Compare(a[i].Name, b[i].Name); d != 0 {
return d
}
if d := strings.Compare(a[i].Value, b[i].Value); d != 0 {
return d
}
}
// If all labels so far were in common, the set with fewer labels comes first.
return len(a) - len(b)
}
// Builder allows modifiying Labels.
type Builder struct {
base Labels
del []string
add []Label
}
// NewBuilder returns a new LabelsBuilder
func NewBuilder(base Labels) *Builder {
return &Builder{
base: base,
del: make([]string, 0, 5),
add: make([]Label, 0, 5),
}
}
// Del deletes the label of the given name.
func (b *Builder) Del(ns ...string) *Builder {
for _, n := range ns {
for i, a := range b.add {
if a.Name == n {
b.add = append(b.add[:i], b.add[i+1:]...)
}
}
b.del = append(b.del, n)
}
return b
}
// Set the name/value pair as a label.
func (b *Builder) Set(n, v string) *Builder {
for i, a := range b.add {
if a.Name == n {
b.add[i].Value = v
return b
}
}
b.add = append(b.add, Label{Name: n, Value: v})
return b
}
// Labels returns the labels from the builder. If no modifications
// were made, the original labels are returned.
func (b *Builder) Labels() Labels {
if len(b.del) == 0 && len(b.add) == 0 {
return b.base
}
// In the general case, labels are removed, modified or moved
// rather than added.
res := make(Labels, 0, len(b.base))
Outer:
for _, l := range b.base {
for _, n := range b.del {
if l.Name == n {
continue Outer
}
}
for _, la := range b.add {
if l.Name == la.Name {
continue Outer
}
}
res = append(res, l)
}
res = append(res, b.add...)
sort.Sort(res)
return res
}

View File

@ -0,0 +1,35 @@
package times
import (
"math"
"time"
)
const (
// MinimumTick is the minimum supported time resolution. This has to be
// at least time.Second in order for the code below to work.
minimumTick = time.Millisecond
// second is the Time duration equivalent to one second.
second = int64(time.Second / minimumTick)
// The number of nanoseconds per minimum tick.
nanosPerTick = int64(minimumTick / time.Nanosecond)
// Earliest is the earliest Time representable. Handy for
// initializing a high watermark.
Earliest = Time(math.MinInt64)
// Latest is the latest Time representable. Handy for initializing
// a low watermark.
Latest = Time(math.MaxInt64)
)
type Time int64
// TimeFromUnixNano returns the Time equivalent to the Unix Time
// t provided in nanoseconds.
func TimeFromUnixNano(t int64) Time {
return Time(t / nanosPerTick)
}
func (t Time) Time() time.Time {
return time.Unix(int64(t)/second, (int64(t)%second)*nanosPerTick)
}

View File

@ -0,0 +1,13 @@
package timestamp
import "time"
// FromTime returns a new millisecond timestamp from a time.
func FromTime(t time.Time) int64 {
return t.Unix()*1000 + int64(t.Nanosecond())/int64(time.Millisecond)
}
// Time returns a new time.Time object from a millisecond timestamp.
func Time(ts int64) time.Time {
return time.Unix(ts/1000, (ts%1000)*int64(time.Millisecond))
}

View File

@ -0,0 +1,40 @@
package value
import (
"math"
)
const (
// NormalNaN is a quiet NaN. This is also math.NaN().
NormalNaN uint64 = 0x7ff8000000000001
// StaleNaN is a signalling NaN, due to the MSB of the mantissa being 0.
// This value is chosen with many leading 0s, so we have scope to store more
// complicated values in the future. It is 2 rather than 1 to make
// it easier to distinguish from the NormalNaN by a human when debugging.
StaleNaN uint64 = 0x7ff0000000000002
)
// IsNaN checks if the value is nan
func IsNaN(v float64) bool {
return math.Float64bits(v) == NormalNaN
}
// IsStaleNaN returns true when the provided NaN value is a stale marker.
func IsStaleNaN(v float64) bool {
return math.Float64bits(v) == StaleNaN
}
// Float64 returns a pointer to the float64 value passed in.
func Float64(v float64) *float64 {
return &v
}
// Float64Value returns the value of the float64 pointer passed in or
// 0 if the pointer is nil.
func Float64Value(v *float64) float64 {
if v != nil {
return *v
}
return 0
}