mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-11 23:18:59 +08:00
chore: use querier in alerts (#5045)
This commit is contained in:
parent
0d043bf380
commit
0cf8817f3f
1
.gitignore
vendored
1
.gitignore
vendored
@ -47,6 +47,7 @@ ee/query-service/signoz.db
|
||||
ee/query-service/tests/test-deploy/data/
|
||||
|
||||
# local data
|
||||
*.backup
|
||||
*.db
|
||||
/deploy/docker/clickhouse-setup/data/
|
||||
/deploy/docker-swarm/clickhouse-setup/data/
|
||||
|
@ -1,12 +1,13 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
|
||||
"go.signoz.io/signoz/pkg/query-service/auth"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func (ah *APIHandler) lockDashboard(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -710,6 +710,7 @@ func makeRulesManager(
|
||||
Logger: nil,
|
||||
DisableRules: disableRules,
|
||||
FeatureFlags: fm,
|
||||
Reader: ch,
|
||||
}
|
||||
|
||||
// create Manager
|
||||
|
@ -16,7 +16,6 @@ import (
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/govaluate"
|
||||
"github.com/gorilla/mux"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
@ -38,6 +37,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/cache"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/postprocess"
|
||||
|
||||
"go.uber.org/multierr"
|
||||
"go.uber.org/zap"
|
||||
@ -3160,13 +3160,13 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
|
||||
return
|
||||
}
|
||||
|
||||
applyMetricLimit(result, queryRangeParams)
|
||||
postprocess.ApplyMetricLimit(result, queryRangeParams)
|
||||
|
||||
sendQueryResultEvents(r, result, queryRangeParams)
|
||||
// only adding applyFunctions instead of postProcess since experssion are
|
||||
// are executed in clickhouse directly and we wanted to add support for timeshift
|
||||
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
|
||||
applyFunctions(result, queryRangeParams)
|
||||
postprocess.ApplyFunctions(result, queryRangeParams)
|
||||
}
|
||||
|
||||
resp := v3.QueryRangeResponse{
|
||||
@ -3418,7 +3418,7 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que
|
||||
|
||||
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
|
||||
|
||||
result, err = postProcessResult(result, queryRangeParams)
|
||||
result, err = postprocess.PostProcessResult(result, queryRangeParams)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@ -3453,105 +3453,3 @@ func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
aH.queryRangeV4(r.Context(), queryRangeParams, w, r)
|
||||
}
|
||||
|
||||
// postProcessResult applies having clause, metric limit, reduce function to the result
|
||||
// This function is effective for metrics data source for now, but it can be extended to other data sources
|
||||
// if needed
|
||||
// Much of this work can be done in the ClickHouse query, but we decided to do it here because:
|
||||
// 1. Effective use of caching
|
||||
// 2. Easier to add new functions
|
||||
func postProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) ([]*v3.Result, error) {
|
||||
// Having clause is not part of the clickhouse query, so we need to apply it here
|
||||
// It's not included in the query because it doesn't work nicely with caching
|
||||
// With this change, if you have a query with a having clause, and then you change the having clause
|
||||
// to something else, the query will still be cached.
|
||||
applyHavingClause(result, queryRangeParams)
|
||||
// We apply the metric limit here because it's not part of the clickhouse query
|
||||
// The limit in the context of the time series query is the number of time series
|
||||
// So for the limit to work, we need to know what series to keep and what to discard
|
||||
// For us to know that, we need to execute the query first, and then apply the limit
|
||||
// which we found expensive, because we are executing the query twice on the same data
|
||||
// So we decided to apply the limit here, after the query is executed
|
||||
// The function is named applyMetricLimit because it only applies to metrics data source
|
||||
// In traces and logs, the limit is achieved using subqueries
|
||||
applyMetricLimit(result, queryRangeParams)
|
||||
// Each series in the result produces N number of points, where N is (end - start) / step
|
||||
// For the panel type table, we need to show one point for each series in the row
|
||||
// We do that by applying a reduce function to each series
|
||||
applyReduceTo(result, queryRangeParams)
|
||||
// We apply the functions here it's easier to add new functions
|
||||
applyFunctions(result, queryRangeParams)
|
||||
|
||||
// expressions are executed at query serivce so the value of time.now in the invdividual
|
||||
// queries will be different so for table panel we are making it same.
|
||||
if queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeTable {
|
||||
tablePanelResultProcessor(result)
|
||||
}
|
||||
|
||||
for _, query := range queryRangeParams.CompositeQuery.BuilderQueries {
|
||||
// The way we distinguish between a formula and a query is by checking if the expression
|
||||
// is the same as the query name
|
||||
// TODO(srikanthccv): Update the UI to send a flag to distinguish between a formula and a query
|
||||
if query.Expression != query.QueryName {
|
||||
expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, evalFuncs())
|
||||
// This shouldn't happen here, because it should have been caught earlier in validation
|
||||
if err != nil {
|
||||
zap.L().Error("error in expression", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
formulaResult, err := processResults(result, expression)
|
||||
if err != nil {
|
||||
zap.L().Error("error in expression", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
formulaResult.QueryName = query.QueryName
|
||||
result = append(result, formulaResult)
|
||||
}
|
||||
}
|
||||
// we are done with the formula calculations, only send the results for enabled queries
|
||||
removeDisabledQueries := func(result []*v3.Result) []*v3.Result {
|
||||
var newResult []*v3.Result
|
||||
for _, res := range result {
|
||||
if queryRangeParams.CompositeQuery.BuilderQueries[res.QueryName].Disabled {
|
||||
continue
|
||||
}
|
||||
newResult = append(newResult, res)
|
||||
}
|
||||
return newResult
|
||||
}
|
||||
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
|
||||
result = removeDisabledQueries(result)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// applyFunctions applies functions for each query in the composite query
|
||||
// The functions can be more than one, and they are applied in the order they are defined
|
||||
func applyFunctions(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
|
||||
for idx, result := range results {
|
||||
builderQueries := queryRangeParams.CompositeQuery.BuilderQueries
|
||||
|
||||
if builderQueries != nil {
|
||||
functions := builderQueries[result.QueryName].Functions
|
||||
|
||||
for _, function := range functions {
|
||||
results[idx] = queryBuilder.ApplyFunction(function, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func tablePanelResultProcessor(results []*v3.Result) {
|
||||
var ts int64
|
||||
for ridx := range results {
|
||||
for sidx := range results[ridx].Series {
|
||||
for pidx := range results[ridx].Series[sidx].Points {
|
||||
if ts == 0 {
|
||||
ts = results[ridx].Series[sidx].Points[pidx].Timestamp
|
||||
} else {
|
||||
results[ridx].Series[sidx].Points[pidx].Timestamp = ts
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -345,6 +345,33 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
|
||||
adjustStep := int64(math.Min(float64(mq.StepInterval), 60))
|
||||
end = end - (end % (adjustStep * 1000))
|
||||
|
||||
// if the aggregate operator is a histogram quantile, and user has not forgotten
|
||||
// the le tag in the group by then add the le tag to the group by
|
||||
if mq.AggregateOperator == v3.AggregateOperatorHistQuant50 ||
|
||||
mq.AggregateOperator == v3.AggregateOperatorHistQuant75 ||
|
||||
mq.AggregateOperator == v3.AggregateOperatorHistQuant90 ||
|
||||
mq.AggregateOperator == v3.AggregateOperatorHistQuant95 ||
|
||||
mq.AggregateOperator == v3.AggregateOperatorHistQuant99 {
|
||||
found := false
|
||||
for _, tag := range mq.GroupBy {
|
||||
if tag.Key == "le" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
mq.GroupBy = append(
|
||||
mq.GroupBy,
|
||||
v3.AttributeKey{
|
||||
Key: "le",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
IsColumn: false,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
var query string
|
||||
var err error
|
||||
if mq.Temporality == v3.Delta {
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/postprocess"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate"
|
||||
)
|
||||
@ -1007,7 +1008,7 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
|
||||
// Formula query
|
||||
// Check if the queries used in the expression can be joined
|
||||
if query.QueryName != query.Expression {
|
||||
expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, evalFuncs())
|
||||
expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, postprocess.EvalFuncs())
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
}
|
||||
@ -1065,34 +1066,6 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
|
||||
}
|
||||
query.ShiftBy = timeShiftBy
|
||||
|
||||
// for metrics v3
|
||||
// if the aggregate operator is a histogram quantile, and user has not forgotten
|
||||
// the le tag in the group by then add the le tag to the group by
|
||||
if query.AggregateOperator == v3.AggregateOperatorHistQuant50 ||
|
||||
query.AggregateOperator == v3.AggregateOperatorHistQuant75 ||
|
||||
query.AggregateOperator == v3.AggregateOperatorHistQuant90 ||
|
||||
query.AggregateOperator == v3.AggregateOperatorHistQuant95 ||
|
||||
query.AggregateOperator == v3.AggregateOperatorHistQuant99 {
|
||||
found := false
|
||||
for _, tag := range query.GroupBy {
|
||||
if tag.Key == "le" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
query.GroupBy = append(
|
||||
query.GroupBy,
|
||||
v3.AttributeKey{
|
||||
Key: "le",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
IsColumn: false,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if query.Filters == nil || len(query.Filters.Items) == 0 {
|
||||
continue
|
||||
}
|
||||
|
@ -697,6 +697,7 @@ func makeRulesManager(
|
||||
Logger: nil,
|
||||
DisableRules: disableRules,
|
||||
FeatureFlags: fm,
|
||||
Reader: ch,
|
||||
}
|
||||
|
||||
// create Manager
|
||||
|
@ -1,4 +1,4 @@
|
||||
package app
|
||||
package postprocess
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@ -162,7 +162,7 @@ func processResults(results []*v3.Result, expression *govaluate.EvaluableExpress
|
||||
|
||||
var SupportedFunctions = []string{"exp", "log", "ln", "exp2", "log2", "exp10", "log10", "sqrt", "cbrt", "erf", "erfc", "lgamma", "tgamma", "sin", "cos", "tan", "asin", "acos", "atan", "degrees", "radians", "now", "toUnixTimestamp"}
|
||||
|
||||
func evalFuncs() map[string]govaluate.ExpressionFunction {
|
||||
func EvalFuncs() map[string]govaluate.ExpressionFunction {
|
||||
GoValuateFuncs := make(map[string]govaluate.ExpressionFunction)
|
||||
// Returns e to the power of the given argument.
|
||||
GoValuateFuncs["exp"] = func(args ...interface{}) (interface{}, error) {
|
@ -1,4 +1,4 @@
|
||||
package app
|
||||
package postprocess
|
||||
|
||||
import (
|
||||
"math"
|
@ -1,4 +1,4 @@
|
||||
package app
|
||||
package postprocess
|
||||
|
||||
import (
|
||||
"strings"
|
@ -1,4 +1,4 @@
|
||||
package app
|
||||
package postprocess
|
||||
|
||||
import (
|
||||
"testing"
|
@ -1,4 +1,4 @@
|
||||
package app
|
||||
package postprocess
|
||||
|
||||
import (
|
||||
"math"
|
||||
@ -9,8 +9,8 @@ import (
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
// applyMetricLimit applies limit to the metrics query results
|
||||
func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
|
||||
// ApplyMetricLimit applies limit to the metrics query results
|
||||
func ApplyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
|
||||
// apply limit if any for metrics
|
||||
// use the grouping set points to apply the limit
|
||||
|
@ -1,4 +1,4 @@
|
||||
package app
|
||||
package postprocess
|
||||
|
||||
import (
|
||||
"testing"
|
||||
@ -594,7 +594,7 @@ func TestApplyLimitOnMetricResult(t *testing.T) {
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
result := c.inputResult
|
||||
applyMetricLimit(result, c.params)
|
||||
ApplyMetricLimit(result, c.params)
|
||||
if len(result) != len(c.expectedResult) {
|
||||
t.Errorf("expected result length: %d, but got: %d", len(c.expectedResult), len(result))
|
||||
}
|
110
pkg/query-service/postprocess/query_range.go
Normal file
110
pkg/query-service/postprocess/query_range.go
Normal file
@ -0,0 +1,110 @@
|
||||
package postprocess
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/govaluate"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// postProcessResult applies having clause, metric limit, reduce function to the result
|
||||
// This function is effective for metrics data source for now, but it can be extended to other data sources
|
||||
// if needed
|
||||
// Much of this work can be done in the ClickHouse query, but we decided to do it here because:
|
||||
// 1. Effective use of caching
|
||||
// 2. Easier to add new functions
|
||||
func PostProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) ([]*v3.Result, error) {
|
||||
// Having clause is not part of the clickhouse query, so we need to apply it here
|
||||
// It's not included in the query because it doesn't work nicely with caching
|
||||
// With this change, if you have a query with a having clause, and then you change the having clause
|
||||
// to something else, the query will still be cached.
|
||||
applyHavingClause(result, queryRangeParams)
|
||||
// We apply the metric limit here because it's not part of the clickhouse query
|
||||
// The limit in the context of the time series query is the number of time series
|
||||
// So for the limit to work, we need to know what series to keep and what to discard
|
||||
// For us to know that, we need to execute the query first, and then apply the limit
|
||||
// which we found expensive, because we are executing the query twice on the same data
|
||||
// So we decided to apply the limit here, after the query is executed
|
||||
// The function is named applyMetricLimit because it only applies to metrics data source
|
||||
// In traces and logs, the limit is achieved using subqueries
|
||||
ApplyMetricLimit(result, queryRangeParams)
|
||||
// Each series in the result produces N number of points, where N is (end - start) / step
|
||||
// For the panel type table, we need to show one point for each series in the row
|
||||
// We do that by applying a reduce function to each series
|
||||
applyReduceTo(result, queryRangeParams)
|
||||
// We apply the functions here it's easier to add new functions
|
||||
ApplyFunctions(result, queryRangeParams)
|
||||
|
||||
// expressions are executed at query serivce so the value of time.now in the invdividual
|
||||
// queries will be different so for table panel we are making it same.
|
||||
if queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeTable {
|
||||
tablePanelResultProcessor(result)
|
||||
}
|
||||
|
||||
for _, query := range queryRangeParams.CompositeQuery.BuilderQueries {
|
||||
// The way we distinguish between a formula and a query is by checking if the expression
|
||||
// is the same as the query name
|
||||
// TODO(srikanthccv): Update the UI to send a flag to distinguish between a formula and a query
|
||||
if query.Expression != query.QueryName {
|
||||
expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs())
|
||||
// This shouldn't happen here, because it should have been caught earlier in validation
|
||||
if err != nil {
|
||||
zap.L().Error("error in expression", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
formulaResult, err := processResults(result, expression)
|
||||
if err != nil {
|
||||
zap.L().Error("error in expression", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
formulaResult.QueryName = query.QueryName
|
||||
result = append(result, formulaResult)
|
||||
}
|
||||
}
|
||||
// we are done with the formula calculations, only send the results for enabled queries
|
||||
removeDisabledQueries := func(result []*v3.Result) []*v3.Result {
|
||||
var newResult []*v3.Result
|
||||
for _, res := range result {
|
||||
if queryRangeParams.CompositeQuery.BuilderQueries[res.QueryName].Disabled {
|
||||
continue
|
||||
}
|
||||
newResult = append(newResult, res)
|
||||
}
|
||||
return newResult
|
||||
}
|
||||
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
|
||||
result = removeDisabledQueries(result)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ApplyFunctions applies functions for each query in the composite query
|
||||
// The functions can be more than one, and they are applied in the order they are defined
|
||||
func ApplyFunctions(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
|
||||
for idx, result := range results {
|
||||
builderQueries := queryRangeParams.CompositeQuery.BuilderQueries
|
||||
|
||||
if builderQueries != nil {
|
||||
functions := builderQueries[result.QueryName].Functions
|
||||
|
||||
for _, function := range functions {
|
||||
results[idx] = queryBuilder.ApplyFunction(function, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func tablePanelResultProcessor(results []*v3.Result) {
|
||||
var ts int64
|
||||
for ridx := range results {
|
||||
for sidx := range results[ridx].Series {
|
||||
for pidx := range results[ridx].Series[sidx].Points {
|
||||
if ts == 0 {
|
||||
ts = results[ridx].Series[sidx].Points[pidx].Timestamp
|
||||
} else {
|
||||
results[ridx].Series[sidx].Points[pidx].Timestamp = ts
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package app
|
||||
package postprocess
|
||||
|
||||
import (
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
@ -1,4 +1,4 @@
|
||||
package app
|
||||
package postprocess
|
||||
|
||||
import (
|
||||
"testing"
|
@ -61,6 +61,7 @@ type ManagerOptions struct {
|
||||
ResendDelay time.Duration
|
||||
DisableRules bool
|
||||
FeatureFlags interfaces.FeatureLookup
|
||||
Reader interfaces.Reader
|
||||
}
|
||||
|
||||
// The Manager manages recording and alerting rules.
|
||||
@ -79,6 +80,7 @@ type Manager struct {
|
||||
logger log.Logger
|
||||
|
||||
featureFlags interfaces.FeatureLookup
|
||||
reader interfaces.Reader
|
||||
}
|
||||
|
||||
func defaultOptions(o *ManagerOptions) *ManagerOptions {
|
||||
@ -119,6 +121,7 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
|
||||
block: make(chan struct{}),
|
||||
logger: o.Logger,
|
||||
featureFlags: o.FeatureFlags,
|
||||
reader: o.Reader,
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
@ -516,6 +519,7 @@ func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string
|
||||
r,
|
||||
ThresholdRuleOpts{},
|
||||
m.featureFlags,
|
||||
m.reader,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@ -879,6 +883,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
|
||||
SendAlways: true,
|
||||
},
|
||||
m.featureFlags,
|
||||
m.reader,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
@ -1,17 +1,14 @@
|
||||
package rules
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"sort"
|
||||
"sync"
|
||||
"text/template"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
@ -21,20 +18,19 @@ import (
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
"go.signoz.io/signoz/pkg/query-service/converter"
|
||||
"go.signoz.io/signoz/pkg/query-service/postprocess"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/app/querier"
|
||||
querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils/labels"
|
||||
querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils/times"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils/timestamp"
|
||||
|
||||
logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
|
||||
metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
|
||||
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
|
||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/formatter"
|
||||
|
||||
yaml "gopkg.in/yaml.v2"
|
||||
@ -62,9 +58,7 @@ type ThresholdRule struct {
|
||||
// map of active alerts
|
||||
active map[uint64]*Alert
|
||||
|
||||
queryBuilder *queryBuilder.QueryBuilder
|
||||
version string
|
||||
queryBuilderV4 *queryBuilder.QueryBuilder
|
||||
version string
|
||||
// temporalityMap is a map of metric name to temporality
|
||||
// to avoid fetching temporality for the same metric multiple times
|
||||
// querying the v4 table on low cardinal temporality column
|
||||
@ -75,6 +69,9 @@ type ThresholdRule struct {
|
||||
|
||||
lastTimestampWithDatapoints time.Time
|
||||
typ string
|
||||
|
||||
querier interfaces.Querier
|
||||
querierV2 interfaces.Querier
|
||||
}
|
||||
|
||||
type ThresholdRuleOpts struct {
|
||||
@ -93,6 +90,7 @@ func NewThresholdRule(
|
||||
p *PostableRule,
|
||||
opts ThresholdRuleOpts,
|
||||
featureFlags interfaces.FeatureLookup,
|
||||
reader interfaces.Reader,
|
||||
) (*ThresholdRule, error) {
|
||||
|
||||
if p.RuleCondition == nil {
|
||||
@ -122,19 +120,22 @@ func NewThresholdRule(
|
||||
t.evalWindow = 5 * time.Minute
|
||||
}
|
||||
|
||||
builderOpts := queryBuilder.QueryBuilderOptions{
|
||||
BuildMetricQuery: metricsv3.PrepareMetricQuery,
|
||||
BuildTraceQuery: tracesV3.PrepareTracesQuery,
|
||||
BuildLogQuery: logsv3.PrepareLogsQuery,
|
||||
querierOption := querier.QuerierOptions{
|
||||
Reader: reader,
|
||||
Cache: nil,
|
||||
KeyGenerator: queryBuilder.NewKeyGenerator(),
|
||||
FeatureLookup: featureFlags,
|
||||
}
|
||||
t.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, featureFlags)
|
||||
|
||||
builderOptsV4 := queryBuilder.QueryBuilderOptions{
|
||||
BuildMetricQuery: metricsV4.PrepareMetricQuery,
|
||||
BuildTraceQuery: tracesV3.PrepareTracesQuery,
|
||||
BuildLogQuery: logsv3.PrepareLogsQuery,
|
||||
querierOptsV2 := querierV2.QuerierOptions{
|
||||
Reader: reader,
|
||||
Cache: nil,
|
||||
KeyGenerator: queryBuilder.NewKeyGenerator(),
|
||||
FeatureLookup: featureFlags,
|
||||
}
|
||||
t.queryBuilderV4 = queryBuilder.NewQueryBuilder(builderOptsV4, featureFlags)
|
||||
|
||||
t.querier = querier.NewQuerier(querierOption)
|
||||
t.querierV2 = querierV2.NewQuerier(querierOptsV2)
|
||||
|
||||
zap.L().Info("creating new ThresholdRule", zap.String("name", t.name), zap.String("id", t.id))
|
||||
|
||||
@ -166,7 +167,9 @@ func (r *ThresholdRule) targetVal() float64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
return *r.ruleCondition.Target
|
||||
unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit))
|
||||
value := unitConverter.Convert(converter.Value{F: *r.ruleCondition.Target, U: converter.Unit(r.ruleCondition.TargetUnit)}, converter.Unit(r.Unit()))
|
||||
return value.F
|
||||
}
|
||||
|
||||
func (r *ThresholdRule) matchType() MatchType {
|
||||
@ -414,39 +417,7 @@ func (r *ThresholdRule) Unit() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (r *ThresholdRule) CheckCondition(v float64) bool {
|
||||
|
||||
if math.IsNaN(v) {
|
||||
zap.L().Debug("found NaN in rule condition", zap.String("rule", r.Name()))
|
||||
return false
|
||||
}
|
||||
|
||||
if r.ruleCondition.Target == nil {
|
||||
zap.L().Debug("found null target in rule condition", zap.String("rule", r.Name()))
|
||||
return false
|
||||
}
|
||||
|
||||
unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit))
|
||||
|
||||
value := unitConverter.Convert(converter.Value{F: *r.ruleCondition.Target, U: converter.Unit(r.ruleCondition.TargetUnit)}, converter.Unit(r.Unit()))
|
||||
|
||||
zap.L().Info("Checking condition for rule", zap.String("rule", r.Name()), zap.String("converter", unitConverter.Name()), zap.Float64("value", v), zap.Float64("target", value.F), zap.String("compareOp", string(r.ruleCondition.CompareOp)))
|
||||
switch r.ruleCondition.CompareOp {
|
||||
case ValueIsEq:
|
||||
return v == value.F
|
||||
case ValueIsNotEq:
|
||||
return v != value.F
|
||||
case ValueIsBelow:
|
||||
return v < value.F
|
||||
case ValueIsAbove:
|
||||
return v > value.F
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 {
|
||||
// todo(amol): add 30 seconds to evalWindow for rate calc
|
||||
|
||||
// todo(srikanthccv): make this configurable
|
||||
// 2 minutes is reasonable time to wait for data to be available
|
||||
@ -462,9 +433,10 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 {
|
||||
return &v3.QueryRangeParamsV3{
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: 60,
|
||||
Step: int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)),
|
||||
CompositeQuery: r.ruleCondition.CompositeQuery,
|
||||
Variables: make(map[string]interface{}, 0),
|
||||
NoCache: true,
|
||||
}
|
||||
}
|
||||
|
||||
@ -478,297 +450,13 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 {
|
||||
return &v3.QueryRangeParamsV3{
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: 60,
|
||||
Step: int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)),
|
||||
CompositeQuery: r.ruleCondition.CompositeQuery,
|
||||
Variables: make(map[string]interface{}, 0),
|
||||
NoCache: true,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ThresholdRule) shouldSkipFirstRecord() bool {
|
||||
shouldSkip := false
|
||||
for _, q := range r.ruleCondition.CompositeQuery.BuilderQueries {
|
||||
if q.DataSource == v3.DataSourceMetrics && q.AggregateOperator.IsRateOperator() {
|
||||
shouldSkip = true
|
||||
}
|
||||
}
|
||||
return shouldSkip
|
||||
}
|
||||
|
||||
// queryClickhouse runs actual query against clickhouse
|
||||
func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, query string) (Vector, error) {
|
||||
rows, err := db.Query(ctx, query)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to get alert query result", zap.String("rule", r.Name()), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
columnTypes := rows.ColumnTypes()
|
||||
columnNames := rows.Columns()
|
||||
vars := make([]interface{}, len(columnTypes))
|
||||
|
||||
for i := range columnTypes {
|
||||
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
|
||||
}
|
||||
|
||||
// []sample list
|
||||
var result Vector
|
||||
|
||||
// map[fingerprint]sample
|
||||
resultMap := make(map[uint64]Sample, 0)
|
||||
|
||||
// for rates we want to skip the first record
|
||||
// but we dont know when the rates are being used
|
||||
// so we always pick timeframe - 30 seconds interval
|
||||
// and skip the first record for a given label combo
|
||||
// NOTE: this is not applicable for raw queries
|
||||
skipFirstRecord := make(map[uint64]bool, 0)
|
||||
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
|
||||
if err := rows.Scan(vars...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.lastTimestampWithDatapoints = time.Now()
|
||||
|
||||
sample := Sample{}
|
||||
// Why do we maintain two labels sets? Alertmanager requires
|
||||
// label keys to follow the model https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
|
||||
// However, our traces and logs explorers support label keys with dot and other namespace characters
|
||||
// Using normalized label keys results in invalid filter criteria.
|
||||
// The original labels are used to prepare the related{logs, traces} link in alert notification
|
||||
lbls := labels.NewBuilder(labels.Labels{})
|
||||
lblsOrig := labels.NewBuilder(labels.Labels{})
|
||||
|
||||
for i, v := range vars {
|
||||
|
||||
colName := normalizeLabelName(columnNames[i])
|
||||
|
||||
switch v := v.(type) {
|
||||
case *string:
|
||||
lbls.Set(colName, *v)
|
||||
lblsOrig.Set(columnNames[i], *v)
|
||||
case *time.Time:
|
||||
timval := *v
|
||||
|
||||
if colName == "ts" || colName == "interval" {
|
||||
sample.Point.T = timval.Unix()
|
||||
} else {
|
||||
lbls.Set(colName, timval.Format(constants.AlertTimeFormat))
|
||||
lblsOrig.Set(columnNames[i], timval.Format(constants.AlertTimeFormat))
|
||||
}
|
||||
|
||||
case *float64:
|
||||
if _, ok := constants.ReservedColumnTargetAliases[colName]; ok {
|
||||
sample.Point.V = *v
|
||||
} else {
|
||||
lbls.Set(colName, fmt.Sprintf("%f", *v))
|
||||
lblsOrig.Set(columnNames[i], fmt.Sprintf("%f", *v))
|
||||
}
|
||||
case **float64:
|
||||
// ch seems to return this type when column is derived from
|
||||
// SELECT count(*)/ SELECT count(*)
|
||||
floatVal := *v
|
||||
if floatVal != nil {
|
||||
if _, ok := constants.ReservedColumnTargetAliases[colName]; ok {
|
||||
sample.Point.V = *floatVal
|
||||
} else {
|
||||
lbls.Set(colName, fmt.Sprintf("%f", *floatVal))
|
||||
lblsOrig.Set(columnNames[i], fmt.Sprintf("%f", *floatVal))
|
||||
}
|
||||
}
|
||||
case *float32:
|
||||
float32Val := float32(*v)
|
||||
if _, ok := constants.ReservedColumnTargetAliases[colName]; ok {
|
||||
sample.Point.V = float64(float32Val)
|
||||
} else {
|
||||
lbls.Set(colName, fmt.Sprintf("%f", float32Val))
|
||||
lblsOrig.Set(columnNames[i], fmt.Sprintf("%f", float32Val))
|
||||
}
|
||||
case *uint8, *uint64, *uint16, *uint32:
|
||||
if _, ok := constants.ReservedColumnTargetAliases[colName]; ok {
|
||||
sample.Point.V = float64(reflect.ValueOf(v).Elem().Uint())
|
||||
} else {
|
||||
lbls.Set(colName, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint()))
|
||||
lblsOrig.Set(columnNames[i], fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint()))
|
||||
}
|
||||
case *int8, *int16, *int32, *int64:
|
||||
if _, ok := constants.ReservedColumnTargetAliases[colName]; ok {
|
||||
sample.Point.V = float64(reflect.ValueOf(v).Elem().Int())
|
||||
} else {
|
||||
lbls.Set(colName, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int()))
|
||||
lblsOrig.Set(columnNames[i], fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int()))
|
||||
}
|
||||
default:
|
||||
zap.L().Error("invalid var found in query result", zap.String("ruleId", r.ID()), zap.Any("value", v), zap.Any("column", columnNames[i]))
|
||||
}
|
||||
}
|
||||
|
||||
if math.IsNaN(sample.Point.V) {
|
||||
continue
|
||||
}
|
||||
sample.Point.Vs = append(sample.Point.Vs, sample.Point.V)
|
||||
|
||||
// capture lables in result
|
||||
sample.Metric = lbls.Labels()
|
||||
sample.MetricOrig = lblsOrig.Labels()
|
||||
|
||||
labelHash := lbls.Labels().Hash()
|
||||
|
||||
// here we walk through values of time series
|
||||
// and calculate the final value used to compare
|
||||
// with rule target
|
||||
if existing, ok := resultMap[labelHash]; ok {
|
||||
|
||||
switch r.matchType() {
|
||||
case AllTheTimes:
|
||||
if r.compareOp() == ValueIsAbove {
|
||||
sample.Point.V = math.Min(existing.Point.V, sample.Point.V)
|
||||
resultMap[labelHash] = sample
|
||||
} else if r.compareOp() == ValueIsBelow {
|
||||
sample.Point.V = math.Max(existing.Point.V, sample.Point.V)
|
||||
resultMap[labelHash] = sample
|
||||
} else {
|
||||
sample.Point.Vs = append(existing.Point.Vs, sample.Point.V)
|
||||
resultMap[labelHash] = sample
|
||||
}
|
||||
case AtleastOnce:
|
||||
if r.compareOp() == ValueIsAbove {
|
||||
sample.Point.V = math.Max(existing.Point.V, sample.Point.V)
|
||||
resultMap[labelHash] = sample
|
||||
} else if r.compareOp() == ValueIsBelow {
|
||||
sample.Point.V = math.Min(existing.Point.V, sample.Point.V)
|
||||
resultMap[labelHash] = sample
|
||||
} else {
|
||||
sample.Point.Vs = append(existing.Point.Vs, sample.Point.V)
|
||||
resultMap[labelHash] = sample
|
||||
}
|
||||
case OnAverage:
|
||||
sample.Point.Vs = append(existing.Point.Vs, sample.Point.V)
|
||||
sample.Point.V = (existing.Point.V + sample.Point.V)
|
||||
resultMap[labelHash] = sample
|
||||
case InTotal:
|
||||
sample.Point.V = (existing.Point.V + sample.Point.V)
|
||||
resultMap[labelHash] = sample
|
||||
}
|
||||
|
||||
} else {
|
||||
if r.Condition().QueryType() == v3.QueryTypeBuilder {
|
||||
// for query builder, time series data
|
||||
// we skip the first record to support rate cases correctly
|
||||
// improvement(amol): explore approaches to limit this only for
|
||||
// rate uses cases
|
||||
if exists := skipFirstRecord[labelHash]; exists || !r.shouldSkipFirstRecord() {
|
||||
resultMap[labelHash] = sample
|
||||
} else {
|
||||
// looks like the first record for this label combo, skip it
|
||||
skipFirstRecord[labelHash] = true
|
||||
}
|
||||
} else {
|
||||
// for clickhouse raw queries, all records are considered
|
||||
// improvement(amol): think about supporting rate queries
|
||||
// written by user. may have to skip a record, similar to qb case(above)
|
||||
resultMap[labelHash] = sample
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if r.matchType() == OnAverage {
|
||||
for hash, s := range resultMap {
|
||||
s.Point.V = s.Point.V / float64(len(s.Point.Vs))
|
||||
resultMap[hash] = s
|
||||
}
|
||||
}
|
||||
|
||||
for hash, s := range resultMap {
|
||||
if r.matchType() == AllTheTimes && r.compareOp() == ValueIsEq {
|
||||
for _, v := range s.Point.Vs {
|
||||
if v != r.targetVal() { // if any of the values is not equal to target, alert shouldn't be sent
|
||||
s.Point.V = v
|
||||
}
|
||||
}
|
||||
resultMap[hash] = s
|
||||
} else if r.matchType() == AllTheTimes && r.compareOp() == ValueIsNotEq {
|
||||
for _, v := range s.Point.Vs {
|
||||
if v == r.targetVal() { // if any of the values is equal to target, alert shouldn't be sent
|
||||
s.Point.V = v
|
||||
}
|
||||
}
|
||||
resultMap[hash] = s
|
||||
} else if r.matchType() == AtleastOnce && r.compareOp() == ValueIsEq {
|
||||
for _, v := range s.Point.Vs {
|
||||
if v == r.targetVal() { // if any of the values is equal to target, alert should be sent
|
||||
s.Point.V = v
|
||||
}
|
||||
}
|
||||
resultMap[hash] = s
|
||||
} else if r.matchType() == AtleastOnce && r.compareOp() == ValueIsNotEq {
|
||||
for _, v := range s.Point.Vs {
|
||||
if v != r.targetVal() { // if any of the values is not equal to target, alert should be sent
|
||||
s.Point.V = v
|
||||
}
|
||||
}
|
||||
resultMap[hash] = s
|
||||
}
|
||||
}
|
||||
|
||||
zap.L().Debug("resultmap(potential alerts)", zap.String("ruleid", r.ID()), zap.Int("count", len(resultMap)))
|
||||
|
||||
// if the data is missing for `For` duration then we should send alert
|
||||
if r.ruleCondition.AlertOnAbsent && r.lastTimestampWithDatapoints.Add(time.Duration(r.Condition().AbsentFor)*time.Minute).Before(time.Now()) {
|
||||
zap.L().Info("no data found for rule condition", zap.String("ruleid", r.ID()))
|
||||
lbls := labels.NewBuilder(labels.Labels{})
|
||||
if !r.lastTimestampWithDatapoints.IsZero() {
|
||||
lbls.Set("lastSeen", r.lastTimestampWithDatapoints.Format(constants.AlertTimeFormat))
|
||||
}
|
||||
result = append(result, Sample{
|
||||
Metric: lbls.Labels(),
|
||||
IsMissing: true,
|
||||
})
|
||||
return result, nil
|
||||
}
|
||||
|
||||
for _, sample := range resultMap {
|
||||
// check alert rule condition before dumping results, if sendUnmatchedResults
|
||||
// is set then add results irrespective of condition
|
||||
if r.opts.SendUnmatched || r.CheckCondition(sample.Point.V) {
|
||||
result = append(result, sample)
|
||||
}
|
||||
}
|
||||
if len(result) != 0 {
|
||||
zap.L().Info("found alerts", zap.String("ruleid", r.ID()), zap.String("query", query), zap.Int("count", len(result)))
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r *ThresholdRule) prepareBuilderQueries(ts time.Time, ch driver.Conn) (map[string]string, error) {
|
||||
params := r.prepareQueryRange(ts)
|
||||
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
|
||||
// check if any enrichment is required for logs if yes then enrich them
|
||||
if logsv3.EnrichmentRequired(params) {
|
||||
// Note: Sending empty fields key because enrichment is only needed for json
|
||||
// TODO: Add support for attribute enrichment later
|
||||
logsv3.Enrich(params, map[string]v3.AttributeKey{})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
var runQueries map[string]string
|
||||
var err error
|
||||
|
||||
if r.version == "v4" {
|
||||
if ch != nil {
|
||||
r.populateTemporality(context.Background(), params, ch)
|
||||
}
|
||||
runQueries, err = r.queryBuilderV4.PrepareQueries(params)
|
||||
} else {
|
||||
runQueries, err = r.queryBuilder.PrepareQueries(params)
|
||||
}
|
||||
|
||||
return runQueries, err
|
||||
}
|
||||
|
||||
// The following function is used to prepare the where clause for the query
|
||||
// `lbls` contains the key value pairs of the labels from the result of the query
|
||||
// We iterate over the where clause and replace the labels with the actual values
|
||||
@ -974,73 +662,26 @@ func (r *ThresholdRule) hostFromSource() string {
|
||||
return fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Hostname())
|
||||
}
|
||||
|
||||
func (r *ThresholdRule) prepareClickhouseQueries(ts time.Time) (map[string]string, error) {
|
||||
queries := make(map[string]string)
|
||||
|
||||
if r.ruleCondition == nil {
|
||||
return nil, fmt.Errorf("rule condition is empty")
|
||||
}
|
||||
|
||||
if r.ruleCondition.QueryType() != v3.QueryTypeClickHouseSQL {
|
||||
zap.L().Error("unsupported query type in prepareClickhouseQueries", zap.String("ruleid", r.ID()))
|
||||
return nil, fmt.Errorf("failed to prepare clickhouse queries")
|
||||
}
|
||||
|
||||
params := r.prepareQueryRange(ts)
|
||||
|
||||
// replace reserved go template variables
|
||||
querytemplate.AssignReservedVarsV3(params)
|
||||
|
||||
for name, chQuery := range r.ruleCondition.CompositeQuery.ClickHouseQueries {
|
||||
if chQuery.Disabled {
|
||||
continue
|
||||
}
|
||||
tmpl := template.New("clickhouse-query")
|
||||
tmpl, err := tmpl.Parse(chQuery.Query)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to parse clickhouse query to populate vars", zap.String("ruleid", r.ID()), zap.Error(err))
|
||||
r.SetHealth(HealthBad)
|
||||
return nil, err
|
||||
}
|
||||
var query bytes.Buffer
|
||||
err = tmpl.Execute(&query, params.Variables)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to populate clickhouse query", zap.String("ruleid", r.ID()), zap.Error(err))
|
||||
r.SetHealth(HealthBad)
|
||||
return nil, err
|
||||
}
|
||||
queries[name] = query.String()
|
||||
}
|
||||
return queries, nil
|
||||
}
|
||||
|
||||
func (r *ThresholdRule) GetSelectedQuery() string {
|
||||
|
||||
// The actual query string is not relevant here
|
||||
// we just need to know the selected query
|
||||
|
||||
var queries map[string]string
|
||||
var err error
|
||||
|
||||
if r.ruleCondition.QueryType() == v3.QueryTypeBuilder {
|
||||
queries, err = r.prepareBuilderQueries(time.Now(), nil)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to prepare metric queries", zap.String("ruleid", r.ID()), zap.Error(err))
|
||||
return ""
|
||||
}
|
||||
} else if r.ruleCondition.QueryType() == v3.QueryTypeClickHouseSQL {
|
||||
queries, err = r.prepareClickhouseQueries(time.Now())
|
||||
if err != nil {
|
||||
zap.L().Error("failed to prepare clickhouse queries", zap.String("ruleid", r.ID()), zap.Error(err))
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
if r.ruleCondition != nil {
|
||||
if r.ruleCondition.SelectedQuery != "" {
|
||||
return r.ruleCondition.SelectedQuery
|
||||
}
|
||||
|
||||
queryNames := map[string]struct{}{}
|
||||
|
||||
if r.ruleCondition.CompositeQuery != nil {
|
||||
if r.ruleCondition.QueryType() == v3.QueryTypeBuilder {
|
||||
for name := range r.ruleCondition.CompositeQuery.BuilderQueries {
|
||||
queryNames[name] = struct{}{}
|
||||
}
|
||||
} else if r.ruleCondition.QueryType() == v3.QueryTypeClickHouseSQL {
|
||||
for name := range r.ruleCondition.CompositeQuery.ClickHouseQueries {
|
||||
queryNames[name] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The following logic exists for backward compatibility
|
||||
// If there is no selected query, then
|
||||
// - check if F1 is present, if yes, return F1
|
||||
@ -1048,11 +689,11 @@ func (r *ThresholdRule) GetSelectedQuery() string {
|
||||
// this logic is not really correct. we should be considering
|
||||
// whether the query is enabled or not. but this is a temporary
|
||||
// fix to support backward compatibility
|
||||
if _, ok := queries["F1"]; ok {
|
||||
if _, ok := queryNames["F1"]; ok {
|
||||
return "F1"
|
||||
}
|
||||
keys := make([]string, 0, len(queries))
|
||||
for k := range queries {
|
||||
keys := make([]string, 0, len(queryNames))
|
||||
for k := range queryNames {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
@ -1062,56 +703,91 @@ func (r *ThresholdRule) GetSelectedQuery() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// query looks if alert condition is being
|
||||
// satisfied and returns the signals
|
||||
func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch clickhouse.Conn) (Vector, error) {
|
||||
if r.ruleCondition == nil || r.ruleCondition.CompositeQuery == nil {
|
||||
r.SetHealth(HealthBad)
|
||||
r.SetLastError(fmt.Errorf("no rule condition"))
|
||||
return nil, fmt.Errorf("invalid rule condition")
|
||||
}
|
||||
|
||||
// var to hold target query to be executed
|
||||
var queries map[string]string
|
||||
var err error
|
||||
params := r.prepareQueryRange(ts)
|
||||
err := r.populateTemporality(ctx, params, ch)
|
||||
if err != nil {
|
||||
r.SetHealth(HealthBad)
|
||||
zap.L().Error("failed to set temporality", zap.String("rule", r.Name()), zap.Error(err))
|
||||
return nil, fmt.Errorf("internal error while setting temporality")
|
||||
}
|
||||
|
||||
// fetch the target query based on query type
|
||||
if r.ruleCondition.QueryType() == v3.QueryTypeBuilder {
|
||||
|
||||
queries, err = r.prepareBuilderQueries(ts, ch)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("failed to prepare metric queries", zap.String("ruleid", r.ID()), zap.Error(err))
|
||||
return nil, fmt.Errorf("failed to prepare metric queries")
|
||||
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
|
||||
// check if any enrichment is required for logs if yes then enrich them
|
||||
if logsv3.EnrichmentRequired(params) {
|
||||
// Note: Sending empty fields key because enrichment is only needed for json
|
||||
// TODO: Add support for attribute enrichment later
|
||||
logsv3.Enrich(params, map[string]v3.AttributeKey{})
|
||||
}
|
||||
}
|
||||
|
||||
} else if r.ruleCondition.QueryType() == v3.QueryTypeClickHouseSQL {
|
||||
|
||||
queries, err = r.prepareClickhouseQueries(ts)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("failed to prepare clickhouse queries", zap.String("ruleid", r.ID()), zap.Error(err))
|
||||
return nil, fmt.Errorf("failed to prepare clickhouse queries")
|
||||
}
|
||||
var results []*v3.Result
|
||||
var errQuriesByName map[string]error
|
||||
|
||||
if r.version == "v4" {
|
||||
results, err, errQuriesByName = r.querierV2.QueryRange(ctx, params, map[string]v3.AttributeKey{})
|
||||
} else {
|
||||
return nil, fmt.Errorf("unexpected rule condition - query type is empty")
|
||||
results, err, errQuriesByName = r.querier.QueryRange(ctx, params, map[string]v3.AttributeKey{})
|
||||
}
|
||||
|
||||
if len(queries) == 0 {
|
||||
return nil, fmt.Errorf("no queries could be built with the rule config")
|
||||
if err != nil {
|
||||
zap.L().Error("failed to get alert query result", zap.String("rule", r.Name()), zap.Error(err), zap.Any("queries", errQuriesByName))
|
||||
r.SetHealth(HealthBad)
|
||||
return nil, fmt.Errorf("internal error while querying")
|
||||
}
|
||||
|
||||
zap.L().Info("prepared queries", zap.String("ruleid", r.ID()), zap.Any("queries", queries))
|
||||
|
||||
queryLabel := r.GetSelectedQuery()
|
||||
zap.L().Debug("Selected query lable for rule", zap.String("ruleid", r.ID()), zap.String("label", queryLabel))
|
||||
|
||||
if queryString, ok := queries[queryLabel]; ok {
|
||||
return r.runChQuery(ctx, ch, queryString)
|
||||
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
|
||||
results, err = postprocess.PostProcessResult(results, params)
|
||||
if err != nil {
|
||||
r.SetHealth(HealthBad)
|
||||
zap.L().Error("failed to post process result", zap.String("rule", r.Name()), zap.Error(err))
|
||||
return nil, fmt.Errorf("internal error while post processing")
|
||||
}
|
||||
}
|
||||
|
||||
zap.L().Error("invalid query label", zap.String("ruleid", r.ID()), zap.String("label", queryLabel), zap.Any("queries", queries))
|
||||
return nil, fmt.Errorf("this is unexpected, invalid query label")
|
||||
selectedQuery := r.GetSelectedQuery()
|
||||
|
||||
var queryResult *v3.Result
|
||||
for _, res := range results {
|
||||
if res.QueryName == selectedQuery {
|
||||
queryResult = res
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if queryResult != nil && len(queryResult.Series) > 0 {
|
||||
r.lastTimestampWithDatapoints = time.Now()
|
||||
}
|
||||
|
||||
var resultVector Vector
|
||||
|
||||
// if the data is missing for `For` duration then we should send alert
|
||||
if r.ruleCondition.AlertOnAbsent && r.lastTimestampWithDatapoints.Add(time.Duration(r.Condition().AbsentFor)*time.Minute).Before(time.Now()) {
|
||||
zap.L().Info("no data found for rule condition", zap.String("ruleid", r.ID()))
|
||||
lbls := labels.NewBuilder(labels.Labels{})
|
||||
if !r.lastTimestampWithDatapoints.IsZero() {
|
||||
lbls.Set("lastSeen", r.lastTimestampWithDatapoints.Format(constants.AlertTimeFormat))
|
||||
}
|
||||
resultVector = append(resultVector, Sample{
|
||||
Metric: lbls.Labels(),
|
||||
IsMissing: true,
|
||||
})
|
||||
return resultVector, nil
|
||||
}
|
||||
|
||||
for _, series := range queryResult.Series {
|
||||
smpl, shouldAlert := r.shouldAlert(*series)
|
||||
if shouldAlert {
|
||||
resultVector = append(resultVector, smpl)
|
||||
}
|
||||
}
|
||||
return resultVector, nil
|
||||
}
|
||||
|
||||
func normalizeLabelName(name string) string {
|
||||
@ -1307,3 +983,154 @@ func (r *ThresholdRule) String() string {
|
||||
|
||||
return string(byt)
|
||||
}
|
||||
|
||||
func removeGroupinSetPoints(series v3.Series) []v3.Point {
|
||||
var result []v3.Point
|
||||
for _, s := range series.Points {
|
||||
if s.Timestamp > 0 {
|
||||
result = append(result, s)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) {
|
||||
var alertSmpl Sample
|
||||
var shouldAlert bool
|
||||
var lbls labels.Labels
|
||||
|
||||
for name, value := range series.Labels {
|
||||
lbls = append(lbls, labels.Label{Name: name, Value: value})
|
||||
}
|
||||
|
||||
series.Points = removeGroupinSetPoints(series)
|
||||
|
||||
switch r.matchType() {
|
||||
case AtleastOnce:
|
||||
// If any sample matches the condition, the rule is firing.
|
||||
if r.compareOp() == ValueIsAbove {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value > r.targetVal() {
|
||||
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls}
|
||||
shouldAlert = true
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if r.compareOp() == ValueIsBelow {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value < r.targetVal() {
|
||||
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls}
|
||||
shouldAlert = true
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if r.compareOp() == ValueIsEq {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value == r.targetVal() {
|
||||
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls}
|
||||
shouldAlert = true
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if r.compareOp() == ValueIsNotEq {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value != r.targetVal() {
|
||||
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls}
|
||||
shouldAlert = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
case AllTheTimes:
|
||||
// If all samples match the condition, the rule is firing.
|
||||
shouldAlert = true
|
||||
alertSmpl = Sample{Point: Point{V: r.targetVal()}, Metric: lbls, MetricOrig: lbls}
|
||||
if r.compareOp() == ValueIsAbove {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value <= r.targetVal() {
|
||||
shouldAlert = false
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if r.compareOp() == ValueIsBelow {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value >= r.targetVal() {
|
||||
shouldAlert = false
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if r.compareOp() == ValueIsEq {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value != r.targetVal() {
|
||||
shouldAlert = false
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if r.compareOp() == ValueIsNotEq {
|
||||
for _, smpl := range series.Points {
|
||||
if smpl.Value == r.targetVal() {
|
||||
shouldAlert = false
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
case OnAverage:
|
||||
// If the average of all samples matches the condition, the rule is firing.
|
||||
var sum, count float64
|
||||
for _, smpl := range series.Points {
|
||||
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
|
||||
continue
|
||||
}
|
||||
sum += smpl.Value
|
||||
count++
|
||||
}
|
||||
avg := sum / count
|
||||
alertSmpl = Sample{Point: Point{V: avg}, Metric: lbls, MetricOrig: lbls}
|
||||
if r.compareOp() == ValueIsAbove {
|
||||
if avg > r.targetVal() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if r.compareOp() == ValueIsBelow {
|
||||
if avg < r.targetVal() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if r.compareOp() == ValueIsEq {
|
||||
if avg == r.targetVal() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if r.compareOp() == ValueIsNotEq {
|
||||
if avg != r.targetVal() {
|
||||
shouldAlert = true
|
||||
}
|
||||
}
|
||||
case InTotal:
|
||||
// If the sum of all samples matches the condition, the rule is firing.
|
||||
var sum float64
|
||||
|
||||
for _, smpl := range series.Points {
|
||||
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
|
||||
continue
|
||||
}
|
||||
sum += smpl.Value
|
||||
}
|
||||
alertSmpl = Sample{Point: Point{V: sum}, Metric: lbls, MetricOrig: lbls}
|
||||
if r.compareOp() == ValueIsAbove {
|
||||
if sum > r.targetVal() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if r.compareOp() == ValueIsBelow {
|
||||
if sum < r.targetVal() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if r.compareOp() == ValueIsEq {
|
||||
if sum == r.targetVal() {
|
||||
shouldAlert = true
|
||||
}
|
||||
} else if r.compareOp() == ValueIsNotEq {
|
||||
if sum != r.targetVal() {
|
||||
shouldAlert = true
|
||||
}
|
||||
}
|
||||
}
|
||||
return alertSmpl, shouldAlert
|
||||
}
|
||||
|
@ -1,18 +1,16 @@
|
||||
package rules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.signoz.io/signoz/pkg/query-service/featureManager"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils/labels"
|
||||
)
|
||||
|
||||
func TestThresholdRuleCombinations(t *testing.T) {
|
||||
func TestThresholdRuleShouldAlert(t *testing.T) {
|
||||
postableRule := PostableRule{
|
||||
AlertName: "Tricky Condition Tests",
|
||||
AlertType: "METRIC_BASED_ALERT",
|
||||
@ -37,18 +35,9 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
fm := featureManager.StartManager()
|
||||
mock, err := cmock.NewClickHouseNative(nil)
|
||||
if err != nil {
|
||||
t.Errorf("an error '%s' was not expected when opening a stub database connection", err)
|
||||
}
|
||||
|
||||
cols := make([]cmock.ColumnType, 0)
|
||||
cols = append(cols, cmock.ColumnType{Name: "value", Type: "Int32"})
|
||||
cols = append(cols, cmock.ColumnType{Name: "endpoint", Type: "String"})
|
||||
|
||||
cases := []struct {
|
||||
values [][]interface{}
|
||||
values v3.Series
|
||||
expectAlert bool
|
||||
compareOp string
|
||||
matchType string
|
||||
@ -56,12 +45,14 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
}{
|
||||
// Test cases for Equals Always
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "3", // Equals
|
||||
@ -69,12 +60,14 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
target: 0.0,
|
||||
},
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 1.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "3", // Equals
|
||||
@ -82,12 +75,14 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
target: 0.0,
|
||||
},
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(0), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 0.0},
|
||||
{Value: 1.0},
|
||||
{Value: 0.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "3", // Equals
|
||||
@ -95,12 +90,14 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
target: 0.0,
|
||||
},
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "3", // Equals
|
||||
@ -109,12 +106,14 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
},
|
||||
// Test cases for Equals Once
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "3", // Equals
|
||||
@ -122,12 +121,14 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
target: 0.0,
|
||||
},
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 1.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "3", // Equals
|
||||
@ -135,12 +136,14 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
target: 0.0,
|
||||
},
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(0), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 0.0},
|
||||
{Value: 1.0},
|
||||
{Value: 0.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "3", // Equals
|
||||
@ -148,26 +151,92 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
target: 0.0,
|
||||
},
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "3", // Equals
|
||||
matchType: "1", // Once
|
||||
target: 0.0,
|
||||
},
|
||||
// Test cases for Greater Than Always
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 4.0},
|
||||
{Value: 6.0},
|
||||
{Value: 8.0},
|
||||
{Value: 2.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "1", // Greater Than
|
||||
matchType: "2", // Always
|
||||
target: 1.5,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 4.0},
|
||||
{Value: 6.0},
|
||||
{Value: 8.0},
|
||||
{Value: 2.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "1", // Greater Than
|
||||
matchType: "2", // Always
|
||||
target: 4.5,
|
||||
},
|
||||
// Test cases for Greater Than Once
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 4.0},
|
||||
{Value: 6.0},
|
||||
{Value: 8.0},
|
||||
{Value: 2.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "1", // Greater Than
|
||||
matchType: "1", // Once
|
||||
target: 4.5,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 4.0},
|
||||
{Value: 4.0},
|
||||
{Value: 4.0},
|
||||
{Value: 4.0},
|
||||
{Value: 4.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "1", // Greater Than
|
||||
matchType: "1", // Once
|
||||
target: 4.5,
|
||||
},
|
||||
// Test cases for Not Equals Always
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(0), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 0.0},
|
||||
{Value: 1.0},
|
||||
{Value: 0.0},
|
||||
{Value: 1.0},
|
||||
{Value: 0.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "4", // Not Equals
|
||||
@ -175,12 +244,14 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
target: 0.0,
|
||||
},
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 0.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "4", // Not Equals
|
||||
@ -188,12 +259,14 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
target: 0.0,
|
||||
},
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "4", // Not Equals
|
||||
@ -201,12 +274,14 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
target: 0.0,
|
||||
},
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(1), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 1.0},
|
||||
{Value: 0.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "4", // Not Equals
|
||||
@ -215,12 +290,14 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
},
|
||||
// Test cases for Not Equals Once
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(0), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 0.0},
|
||||
{Value: 1.0},
|
||||
{Value: 0.0},
|
||||
{Value: 1.0},
|
||||
{Value: 0.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "4", // Not Equals
|
||||
@ -228,12 +305,14 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
target: 0.0,
|
||||
},
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "4", // Not Equals
|
||||
@ -241,12 +320,14 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
target: 0.0,
|
||||
},
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(0), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(0), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 0.0},
|
||||
{Value: 0.0},
|
||||
{Value: 1.0},
|
||||
{Value: 0.0},
|
||||
{Value: 1.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "4", // Not Equals
|
||||
@ -254,85 +335,294 @@ func TestThresholdRuleCombinations(t *testing.T) {
|
||||
target: 0.0,
|
||||
},
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
{int32(1), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
{Value: 1.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "4", // Not Equals
|
||||
matchType: "1", // Once
|
||||
target: 0.0,
|
||||
},
|
||||
// Test cases for Less Than Always
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(2), "endpoint"},
|
||||
{int32(3), "endpoint"},
|
||||
{int32(2), "endpoint"},
|
||||
{int32(4), "endpoint"},
|
||||
{int32(2), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 1.5},
|
||||
{Value: 1.5},
|
||||
{Value: 1.5},
|
||||
{Value: 1.5},
|
||||
{Value: 1.5},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "2", // Below
|
||||
matchType: "3", // On Average
|
||||
target: 3.0,
|
||||
compareOp: "2", // Less Than
|
||||
matchType: "2", // Always
|
||||
target: 4,
|
||||
},
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(4), "endpoint"},
|
||||
{int32(7), "endpoint"},
|
||||
{int32(5), "endpoint"},
|
||||
{int32(2), "endpoint"},
|
||||
{int32(9), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 4.5},
|
||||
{Value: 4.5},
|
||||
{Value: 4.5},
|
||||
{Value: 4.5},
|
||||
{Value: 4.5},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "2", // Below
|
||||
matchType: "3", // On Average
|
||||
target: 3.0,
|
||||
compareOp: "2", // Less Than
|
||||
matchType: "2", // Always
|
||||
target: 4,
|
||||
},
|
||||
// Test cases for Less Than Once
|
||||
{
|
||||
values: [][]interface{}{
|
||||
{int32(4), "endpoint"},
|
||||
{int32(7), "endpoint"},
|
||||
{int32(5), "endpoint"},
|
||||
{int32(2), "endpoint"},
|
||||
{int32(9), "endpoint"},
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 4.5},
|
||||
{Value: 4.5},
|
||||
{Value: 4.5},
|
||||
{Value: 4.5},
|
||||
{Value: 2.5},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "2", // Below
|
||||
matchType: "3", // On Average
|
||||
compareOp: "2", // Less Than
|
||||
matchType: "1", // Once
|
||||
target: 4,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 4.5},
|
||||
{Value: 4.5},
|
||||
{Value: 4.5},
|
||||
{Value: 4.5},
|
||||
{Value: 4.5},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "2", // Less Than
|
||||
matchType: "1", // Once
|
||||
target: 4,
|
||||
},
|
||||
// Test cases for OnAverage
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 4.0},
|
||||
{Value: 6.0},
|
||||
{Value: 8.0},
|
||||
{Value: 2.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "3", // Equals
|
||||
matchType: "3", // OnAverage
|
||||
target: 6.0,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 4.0},
|
||||
{Value: 6.0},
|
||||
{Value: 8.0},
|
||||
{Value: 2.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "3", // Equals
|
||||
matchType: "3", // OnAverage
|
||||
target: 4.5,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 4.0},
|
||||
{Value: 6.0},
|
||||
{Value: 8.0},
|
||||
{Value: 2.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "4", // Not Equals
|
||||
matchType: "3", // OnAverage
|
||||
target: 4.5,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 4.0},
|
||||
{Value: 6.0},
|
||||
{Value: 8.0},
|
||||
{Value: 2.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "4", // Not Equals
|
||||
matchType: "3", // OnAverage
|
||||
target: 6.0,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 4.0},
|
||||
{Value: 6.0},
|
||||
{Value: 8.0},
|
||||
{Value: 2.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "1", // Greater Than
|
||||
matchType: "3", // OnAverage
|
||||
target: 4.5,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 4.0},
|
||||
{Value: 6.0},
|
||||
{Value: 8.0},
|
||||
{Value: 2.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "2", // Less Than
|
||||
matchType: "3", // OnAverage
|
||||
target: 12.0,
|
||||
},
|
||||
// Test cases for InTotal
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 4.0},
|
||||
{Value: 6.0},
|
||||
{Value: 8.0},
|
||||
{Value: 2.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "3", // Equals
|
||||
matchType: "4", // InTotal
|
||||
target: 30.0,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 4.0},
|
||||
{Value: 6.0},
|
||||
{Value: 8.0},
|
||||
{Value: 2.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "3", // Equals
|
||||
matchType: "4", // InTotal
|
||||
target: 20.0,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "4", // Not Equals
|
||||
matchType: "4", // InTotal
|
||||
target: 9.0,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "4", // Not Equals
|
||||
matchType: "4", // InTotal
|
||||
target: 10.0,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 10.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "1", // Greater Than
|
||||
matchType: "4", // InTotal
|
||||
target: 10.0,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 10.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "1", // Greater Than
|
||||
matchType: "4", // InTotal
|
||||
target: 20.0,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 10.0},
|
||||
},
|
||||
},
|
||||
expectAlert: true,
|
||||
compareOp: "2", // Less Than
|
||||
matchType: "4", // InTotal
|
||||
target: 30.0,
|
||||
},
|
||||
{
|
||||
values: v3.Series{
|
||||
Points: []v3.Point{
|
||||
{Value: 10.0},
|
||||
{Value: 10.0},
|
||||
},
|
||||
},
|
||||
expectAlert: false,
|
||||
compareOp: "2", // Less Than
|
||||
matchType: "4", // InTotal
|
||||
target: 20.0,
|
||||
},
|
||||
}
|
||||
|
||||
fm := featureManager.StartManager()
|
||||
for idx, c := range cases {
|
||||
rows := cmock.NewRows(cols, c.values)
|
||||
// We are testing the eval logic after the query is run
|
||||
// so we don't care about the query string here
|
||||
queryString := "SELECT value, endpoint FROM table"
|
||||
mock.
|
||||
ExpectQuery(queryString).
|
||||
WillReturnRows(rows)
|
||||
postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp)
|
||||
postableRule.RuleCondition.MatchType = MatchType(c.matchType)
|
||||
postableRule.RuleCondition.Target = &c.target
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm)
|
||||
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil)
|
||||
if err != nil {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
result, err := rule.runChQuery(context.Background(), mock, queryString)
|
||||
if err != nil {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
if c.expectAlert {
|
||||
assert.Equal(t, 1, len(result), "case %d", idx)
|
||||
} else {
|
||||
assert.Equal(t, 0, len(result), "case %d", idx)
|
||||
values := c.values
|
||||
for i := range values.Points {
|
||||
values.Points[i].Timestamp = time.Now().UnixMilli()
|
||||
}
|
||||
|
||||
_, shoulAlert := rule.shouldAlert(c.values)
|
||||
assert.Equal(t, c.expectAlert, shoulAlert, "Test case %d", idx)
|
||||
}
|
||||
}
|
||||
|
||||
@ -407,7 +697,7 @@ func TestPrepareLinksToLogs(t *testing.T) {
|
||||
}
|
||||
fm := featureManager.StartManager()
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm)
|
||||
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil)
|
||||
if err != nil {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
@ -449,7 +739,7 @@ func TestPrepareLinksToTraces(t *testing.T) {
|
||||
}
|
||||
fm := featureManager.StartManager()
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm)
|
||||
rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil)
|
||||
if err != nil {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user