diff --git a/.gitignore b/.gitignore index 3f1834e9fa..46915dccb8 100644 --- a/.gitignore +++ b/.gitignore @@ -47,6 +47,7 @@ ee/query-service/signoz.db ee/query-service/tests/test-deploy/data/ # local data +*.backup *.db /deploy/docker/clickhouse-setup/data/ /deploy/docker-swarm/clickhouse-setup/data/ diff --git a/ee/query-service/app/api/dashboard.go b/ee/query-service/app/api/dashboard.go index 83c82a1477..0628ae18f6 100644 --- a/ee/query-service/app/api/dashboard.go +++ b/ee/query-service/app/api/dashboard.go @@ -1,12 +1,13 @@ package api import ( + "net/http" + "github.com/gorilla/mux" "go.signoz.io/signoz/pkg/query-service/app/dashboards" "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/model" - "net/http" ) func (ah *APIHandler) lockDashboard(w http.ResponseWriter, r *http.Request) { diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 53b9a27314..8dd3ed56e0 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -710,6 +710,7 @@ func makeRulesManager( Logger: nil, DisableRules: disableRules, FeatureFlags: fm, + Reader: ch, } // create Manager diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index e3f7c5a165..4d63c887fd 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -16,7 +16,6 @@ import ( "text/template" "time" - "github.com/SigNoz/govaluate" "github.com/gorilla/mux" jsoniter "github.com/json-iterator/go" _ "github.com/mattn/go-sqlite3" @@ -38,6 +37,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/postprocess" "go.uber.org/multierr" "go.uber.org/zap" @@ -3160,13 +3160,13 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que return } - applyMetricLimit(result, queryRangeParams) + postprocess.ApplyMetricLimit(result, queryRangeParams) sendQueryResultEvents(r, result, queryRangeParams) // only adding applyFunctions instead of postProcess since experssion are // are executed in clickhouse directly and we wanted to add support for timeshift if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder { - applyFunctions(result, queryRangeParams) + postprocess.ApplyFunctions(result, queryRangeParams) } resp := v3.QueryRangeResponse{ @@ -3418,7 +3418,7 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder { - result, err = postProcessResult(result, queryRangeParams) + result, err = postprocess.PostProcessResult(result, queryRangeParams) } if err != nil { @@ -3453,105 +3453,3 @@ func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) { aH.queryRangeV4(r.Context(), queryRangeParams, w, r) } - -// postProcessResult applies having clause, metric limit, reduce function to the result -// This function is effective for metrics data source for now, but it can be extended to other data sources -// if needed -// Much of this work can be done in the ClickHouse query, but we decided to do it here because: -// 1. Effective use of caching -// 2. Easier to add new functions -func postProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) ([]*v3.Result, error) { - // Having clause is not part of the clickhouse query, so we need to apply it here - // It's not included in the query because it doesn't work nicely with caching - // With this change, if you have a query with a having clause, and then you change the having clause - // to something else, the query will still be cached. - applyHavingClause(result, queryRangeParams) - // We apply the metric limit here because it's not part of the clickhouse query - // The limit in the context of the time series query is the number of time series - // So for the limit to work, we need to know what series to keep and what to discard - // For us to know that, we need to execute the query first, and then apply the limit - // which we found expensive, because we are executing the query twice on the same data - // So we decided to apply the limit here, after the query is executed - // The function is named applyMetricLimit because it only applies to metrics data source - // In traces and logs, the limit is achieved using subqueries - applyMetricLimit(result, queryRangeParams) - // Each series in the result produces N number of points, where N is (end - start) / step - // For the panel type table, we need to show one point for each series in the row - // We do that by applying a reduce function to each series - applyReduceTo(result, queryRangeParams) - // We apply the functions here it's easier to add new functions - applyFunctions(result, queryRangeParams) - - // expressions are executed at query serivce so the value of time.now in the invdividual - // queries will be different so for table panel we are making it same. - if queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeTable { - tablePanelResultProcessor(result) - } - - for _, query := range queryRangeParams.CompositeQuery.BuilderQueries { - // The way we distinguish between a formula and a query is by checking if the expression - // is the same as the query name - // TODO(srikanthccv): Update the UI to send a flag to distinguish between a formula and a query - if query.Expression != query.QueryName { - expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, evalFuncs()) - // This shouldn't happen here, because it should have been caught earlier in validation - if err != nil { - zap.L().Error("error in expression", zap.Error(err)) - return nil, err - } - formulaResult, err := processResults(result, expression) - if err != nil { - zap.L().Error("error in expression", zap.Error(err)) - return nil, err - } - formulaResult.QueryName = query.QueryName - result = append(result, formulaResult) - } - } - // we are done with the formula calculations, only send the results for enabled queries - removeDisabledQueries := func(result []*v3.Result) []*v3.Result { - var newResult []*v3.Result - for _, res := range result { - if queryRangeParams.CompositeQuery.BuilderQueries[res.QueryName].Disabled { - continue - } - newResult = append(newResult, res) - } - return newResult - } - if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder { - result = removeDisabledQueries(result) - } - return result, nil -} - -// applyFunctions applies functions for each query in the composite query -// The functions can be more than one, and they are applied in the order they are defined -func applyFunctions(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) { - for idx, result := range results { - builderQueries := queryRangeParams.CompositeQuery.BuilderQueries - - if builderQueries != nil { - functions := builderQueries[result.QueryName].Functions - - for _, function := range functions { - results[idx] = queryBuilder.ApplyFunction(function, result) - } - } - } -} - -func tablePanelResultProcessor(results []*v3.Result) { - var ts int64 - for ridx := range results { - for sidx := range results[ridx].Series { - for pidx := range results[ridx].Series[sidx].Points { - if ts == 0 { - ts = results[ridx].Series[sidx].Points[pidx].Timestamp - } else { - results[ridx].Series[sidx].Points[pidx].Timestamp = ts - } - } - } - } -} diff --git a/pkg/query-service/app/metrics/v3/query_builder.go b/pkg/query-service/app/metrics/v3/query_builder.go index b5453e97b4..5e842a346a 100644 --- a/pkg/query-service/app/metrics/v3/query_builder.go +++ b/pkg/query-service/app/metrics/v3/query_builder.go @@ -345,6 +345,33 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P adjustStep := int64(math.Min(float64(mq.StepInterval), 60)) end = end - (end % (adjustStep * 1000)) + // if the aggregate operator is a histogram quantile, and user has not forgotten + // the le tag in the group by then add the le tag to the group by + if mq.AggregateOperator == v3.AggregateOperatorHistQuant50 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant75 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant90 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant95 || + mq.AggregateOperator == v3.AggregateOperatorHistQuant99 { + found := false + for _, tag := range mq.GroupBy { + if tag.Key == "le" { + found = true + break + } + } + if !found { + mq.GroupBy = append( + mq.GroupBy, + v3.AttributeKey{ + Key: "le", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + }, + ) + } + } + var query string var err error if mq.Temporality == v3.Delta { diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 773156cc0d..0e5177bea8 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -24,6 +24,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/postprocess" "go.signoz.io/signoz/pkg/query-service/utils" querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate" ) @@ -1007,7 +1008,7 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE // Formula query // Check if the queries used in the expression can be joined if query.QueryName != query.Expression { - expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, evalFuncs()) + expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, postprocess.EvalFuncs()) if err != nil { return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} } @@ -1065,34 +1066,6 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE } query.ShiftBy = timeShiftBy - // for metrics v3 - // if the aggregate operator is a histogram quantile, and user has not forgotten - // the le tag in the group by then add the le tag to the group by - if query.AggregateOperator == v3.AggregateOperatorHistQuant50 || - query.AggregateOperator == v3.AggregateOperatorHistQuant75 || - query.AggregateOperator == v3.AggregateOperatorHistQuant90 || - query.AggregateOperator == v3.AggregateOperatorHistQuant95 || - query.AggregateOperator == v3.AggregateOperatorHistQuant99 { - found := false - for _, tag := range query.GroupBy { - if tag.Key == "le" { - found = true - break - } - } - if !found { - query.GroupBy = append( - query.GroupBy, - v3.AttributeKey{ - Key: "le", - DataType: v3.AttributeKeyDataTypeString, - Type: v3.AttributeKeyTypeTag, - IsColumn: false, - }, - ) - } - } - if query.Filters == nil || len(query.Filters.Items) == 0 { continue } diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index d28fd62666..773cd7218d 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -697,6 +697,7 @@ func makeRulesManager( Logger: nil, DisableRules: disableRules, FeatureFlags: fm, + Reader: ch, } // create Manager diff --git a/pkg/query-service/app/formula.go b/pkg/query-service/postprocess/formula.go similarity index 99% rename from pkg/query-service/app/formula.go rename to pkg/query-service/postprocess/formula.go index f1f10e4499..c6a3aa7bb2 100644 --- a/pkg/query-service/app/formula.go +++ b/pkg/query-service/postprocess/formula.go @@ -1,4 +1,4 @@ -package app +package postprocess import ( "fmt" @@ -162,7 +162,7 @@ func processResults(results []*v3.Result, expression *govaluate.EvaluableExpress var SupportedFunctions = []string{"exp", "log", "ln", "exp2", "log2", "exp10", "log10", "sqrt", "cbrt", "erf", "erfc", "lgamma", "tgamma", "sin", "cos", "tan", "asin", "acos", "atan", "degrees", "radians", "now", "toUnixTimestamp"} -func evalFuncs() map[string]govaluate.ExpressionFunction { +func EvalFuncs() map[string]govaluate.ExpressionFunction { GoValuateFuncs := make(map[string]govaluate.ExpressionFunction) // Returns e to the power of the given argument. GoValuateFuncs["exp"] = func(args ...interface{}) (interface{}, error) { diff --git a/pkg/query-service/app/formula_test.go b/pkg/query-service/postprocess/formula_test.go similarity index 99% rename from pkg/query-service/app/formula_test.go rename to pkg/query-service/postprocess/formula_test.go index 365d794836..315344636f 100644 --- a/pkg/query-service/app/formula_test.go +++ b/pkg/query-service/postprocess/formula_test.go @@ -1,4 +1,4 @@ -package app +package postprocess import ( "math" diff --git a/pkg/query-service/app/having.go b/pkg/query-service/postprocess/having.go similarity index 99% rename from pkg/query-service/app/having.go rename to pkg/query-service/postprocess/having.go index b99471ef4e..a61aac3c9d 100644 --- a/pkg/query-service/app/having.go +++ b/pkg/query-service/postprocess/having.go @@ -1,4 +1,4 @@ -package app +package postprocess import ( "strings" diff --git a/pkg/query-service/app/having_test.go b/pkg/query-service/postprocess/having_test.go similarity index 99% rename from pkg/query-service/app/having_test.go rename to pkg/query-service/postprocess/having_test.go index 2eeafa1b65..e7e37095fb 100644 --- a/pkg/query-service/app/having_test.go +++ b/pkg/query-service/postprocess/having_test.go @@ -1,4 +1,4 @@ -package app +package postprocess import ( "testing" diff --git a/pkg/query-service/app/limit.go b/pkg/query-service/postprocess/limit.go similarity index 95% rename from pkg/query-service/app/limit.go rename to pkg/query-service/postprocess/limit.go index 3ace3c687c..491c91da54 100644 --- a/pkg/query-service/app/limit.go +++ b/pkg/query-service/postprocess/limit.go @@ -1,4 +1,4 @@ -package app +package postprocess import ( "math" @@ -9,8 +9,8 @@ import ( v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) -// applyMetricLimit applies limit to the metrics query results -func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) { +// ApplyMetricLimit applies limit to the metrics query results +func ApplyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) { // apply limit if any for metrics // use the grouping set points to apply the limit diff --git a/pkg/query-service/app/limit_test.go b/pkg/query-service/postprocess/limit_test.go similarity index 99% rename from pkg/query-service/app/limit_test.go rename to pkg/query-service/postprocess/limit_test.go index d90d7b9417..e946e17b60 100644 --- a/pkg/query-service/app/limit_test.go +++ b/pkg/query-service/postprocess/limit_test.go @@ -1,4 +1,4 @@ -package app +package postprocess import ( "testing" @@ -594,7 +594,7 @@ func TestApplyLimitOnMetricResult(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { result := c.inputResult - applyMetricLimit(result, c.params) + ApplyMetricLimit(result, c.params) if len(result) != len(c.expectedResult) { t.Errorf("expected result length: %d, but got: %d", len(c.expectedResult), len(result)) } diff --git a/pkg/query-service/postprocess/query_range.go b/pkg/query-service/postprocess/query_range.go new file mode 100644 index 0000000000..7477ac11c3 --- /dev/null +++ b/pkg/query-service/postprocess/query_range.go @@ -0,0 +1,110 @@ +package postprocess + +import ( + "github.com/SigNoz/govaluate" + "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.uber.org/zap" +) + +// postProcessResult applies having clause, metric limit, reduce function to the result +// This function is effective for metrics data source for now, but it can be extended to other data sources +// if needed +// Much of this work can be done in the ClickHouse query, but we decided to do it here because: +// 1. Effective use of caching +// 2. Easier to add new functions +func PostProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) ([]*v3.Result, error) { + // Having clause is not part of the clickhouse query, so we need to apply it here + // It's not included in the query because it doesn't work nicely with caching + // With this change, if you have a query with a having clause, and then you change the having clause + // to something else, the query will still be cached. + applyHavingClause(result, queryRangeParams) + // We apply the metric limit here because it's not part of the clickhouse query + // The limit in the context of the time series query is the number of time series + // So for the limit to work, we need to know what series to keep and what to discard + // For us to know that, we need to execute the query first, and then apply the limit + // which we found expensive, because we are executing the query twice on the same data + // So we decided to apply the limit here, after the query is executed + // The function is named applyMetricLimit because it only applies to metrics data source + // In traces and logs, the limit is achieved using subqueries + ApplyMetricLimit(result, queryRangeParams) + // Each series in the result produces N number of points, where N is (end - start) / step + // For the panel type table, we need to show one point for each series in the row + // We do that by applying a reduce function to each series + applyReduceTo(result, queryRangeParams) + // We apply the functions here it's easier to add new functions + ApplyFunctions(result, queryRangeParams) + + // expressions are executed at query serivce so the value of time.now in the invdividual + // queries will be different so for table panel we are making it same. + if queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeTable { + tablePanelResultProcessor(result) + } + + for _, query := range queryRangeParams.CompositeQuery.BuilderQueries { + // The way we distinguish between a formula and a query is by checking if the expression + // is the same as the query name + // TODO(srikanthccv): Update the UI to send a flag to distinguish between a formula and a query + if query.Expression != query.QueryName { + expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs()) + // This shouldn't happen here, because it should have been caught earlier in validation + if err != nil { + zap.L().Error("error in expression", zap.Error(err)) + return nil, err + } + formulaResult, err := processResults(result, expression) + if err != nil { + zap.L().Error("error in expression", zap.Error(err)) + return nil, err + } + formulaResult.QueryName = query.QueryName + result = append(result, formulaResult) + } + } + // we are done with the formula calculations, only send the results for enabled queries + removeDisabledQueries := func(result []*v3.Result) []*v3.Result { + var newResult []*v3.Result + for _, res := range result { + if queryRangeParams.CompositeQuery.BuilderQueries[res.QueryName].Disabled { + continue + } + newResult = append(newResult, res) + } + return newResult + } + if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder { + result = removeDisabledQueries(result) + } + return result, nil +} + +// ApplyFunctions applies functions for each query in the composite query +// The functions can be more than one, and they are applied in the order they are defined +func ApplyFunctions(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) { + for idx, result := range results { + builderQueries := queryRangeParams.CompositeQuery.BuilderQueries + + if builderQueries != nil { + functions := builderQueries[result.QueryName].Functions + + for _, function := range functions { + results[idx] = queryBuilder.ApplyFunction(function, result) + } + } + } +} + +func tablePanelResultProcessor(results []*v3.Result) { + var ts int64 + for ridx := range results { + for sidx := range results[ridx].Series { + for pidx := range results[ridx].Series[sidx].Points { + if ts == 0 { + ts = results[ridx].Series[sidx].Points[pidx].Timestamp + } else { + results[ridx].Series[sidx].Points[pidx].Timestamp = ts + } + } + } + } +} diff --git a/pkg/query-service/app/reduce_to.go b/pkg/query-service/postprocess/reduce_to.go similarity index 99% rename from pkg/query-service/app/reduce_to.go rename to pkg/query-service/postprocess/reduce_to.go index 26c60d07c6..2e9275eea8 100644 --- a/pkg/query-service/app/reduce_to.go +++ b/pkg/query-service/postprocess/reduce_to.go @@ -1,4 +1,4 @@ -package app +package postprocess import ( v3 "go.signoz.io/signoz/pkg/query-service/model/v3" diff --git a/pkg/query-service/app/reduce_to_test.go b/pkg/query-service/postprocess/reduce_to_test.go similarity index 98% rename from pkg/query-service/app/reduce_to_test.go rename to pkg/query-service/postprocess/reduce_to_test.go index 1f5a16d65b..8709766850 100644 --- a/pkg/query-service/app/reduce_to_test.go +++ b/pkg/query-service/postprocess/reduce_to_test.go @@ -1,4 +1,4 @@ -package app +package postprocess import ( "testing" diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index d649b565fd..e940bd9b13 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -61,6 +61,7 @@ type ManagerOptions struct { ResendDelay time.Duration DisableRules bool FeatureFlags interfaces.FeatureLookup + Reader interfaces.Reader } // The Manager manages recording and alerting rules. @@ -79,6 +80,7 @@ type Manager struct { logger log.Logger featureFlags interfaces.FeatureLookup + reader interfaces.Reader } func defaultOptions(o *ManagerOptions) *ManagerOptions { @@ -119,6 +121,7 @@ func NewManager(o *ManagerOptions) (*Manager, error) { block: make(chan struct{}), logger: o.Logger, featureFlags: o.FeatureFlags, + reader: o.Reader, } return m, nil } @@ -516,6 +519,7 @@ func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string r, ThresholdRuleOpts{}, m.featureFlags, + m.reader, ) if err != nil { @@ -879,6 +883,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m SendAlways: true, }, m.featureFlags, + m.reader, ) if err != nil { diff --git a/pkg/query-service/rules/thresholdRule.go b/pkg/query-service/rules/thresholdRule.go index feb16bda49..1884563305 100644 --- a/pkg/query-service/rules/thresholdRule.go +++ b/pkg/query-service/rules/thresholdRule.go @@ -1,17 +1,14 @@ package rules import ( - "bytes" "context" "encoding/json" "fmt" "math" "net/url" - "reflect" "regexp" "sort" "sync" - "text/template" "time" "unicode" @@ -21,20 +18,19 @@ import ( "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/converter" + "go.signoz.io/signoz/pkg/query-service/postprocess" + "go.signoz.io/signoz/pkg/query-service/app/querier" + querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/interfaces" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils/labels" - querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate" "go.signoz.io/signoz/pkg/query-service/utils/times" "go.signoz.io/signoz/pkg/query-service/utils/timestamp" logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" - metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" - metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" - tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/formatter" yaml "gopkg.in/yaml.v2" @@ -62,9 +58,7 @@ type ThresholdRule struct { // map of active alerts active map[uint64]*Alert - queryBuilder *queryBuilder.QueryBuilder - version string - queryBuilderV4 *queryBuilder.QueryBuilder + version string // temporalityMap is a map of metric name to temporality // to avoid fetching temporality for the same metric multiple times // querying the v4 table on low cardinal temporality column @@ -75,6 +69,9 @@ type ThresholdRule struct { lastTimestampWithDatapoints time.Time typ string + + querier interfaces.Querier + querierV2 interfaces.Querier } type ThresholdRuleOpts struct { @@ -93,6 +90,7 @@ func NewThresholdRule( p *PostableRule, opts ThresholdRuleOpts, featureFlags interfaces.FeatureLookup, + reader interfaces.Reader, ) (*ThresholdRule, error) { if p.RuleCondition == nil { @@ -122,19 +120,22 @@ func NewThresholdRule( t.evalWindow = 5 * time.Minute } - builderOpts := queryBuilder.QueryBuilderOptions{ - BuildMetricQuery: metricsv3.PrepareMetricQuery, - BuildTraceQuery: tracesV3.PrepareTracesQuery, - BuildLogQuery: logsv3.PrepareLogsQuery, + querierOption := querier.QuerierOptions{ + Reader: reader, + Cache: nil, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FeatureLookup: featureFlags, } - t.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, featureFlags) - builderOptsV4 := queryBuilder.QueryBuilderOptions{ - BuildMetricQuery: metricsV4.PrepareMetricQuery, - BuildTraceQuery: tracesV3.PrepareTracesQuery, - BuildLogQuery: logsv3.PrepareLogsQuery, + querierOptsV2 := querierV2.QuerierOptions{ + Reader: reader, + Cache: nil, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FeatureLookup: featureFlags, } - t.queryBuilderV4 = queryBuilder.NewQueryBuilder(builderOptsV4, featureFlags) + + t.querier = querier.NewQuerier(querierOption) + t.querierV2 = querierV2.NewQuerier(querierOptsV2) zap.L().Info("creating new ThresholdRule", zap.String("name", t.name), zap.String("id", t.id)) @@ -166,7 +167,9 @@ func (r *ThresholdRule) targetVal() float64 { return 0 } - return *r.ruleCondition.Target + unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit)) + value := unitConverter.Convert(converter.Value{F: *r.ruleCondition.Target, U: converter.Unit(r.ruleCondition.TargetUnit)}, converter.Unit(r.Unit())) + return value.F } func (r *ThresholdRule) matchType() MatchType { @@ -414,39 +417,7 @@ func (r *ThresholdRule) Unit() string { return "" } -func (r *ThresholdRule) CheckCondition(v float64) bool { - - if math.IsNaN(v) { - zap.L().Debug("found NaN in rule condition", zap.String("rule", r.Name())) - return false - } - - if r.ruleCondition.Target == nil { - zap.L().Debug("found null target in rule condition", zap.String("rule", r.Name())) - return false - } - - unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit)) - - value := unitConverter.Convert(converter.Value{F: *r.ruleCondition.Target, U: converter.Unit(r.ruleCondition.TargetUnit)}, converter.Unit(r.Unit())) - - zap.L().Info("Checking condition for rule", zap.String("rule", r.Name()), zap.String("converter", unitConverter.Name()), zap.Float64("value", v), zap.Float64("target", value.F), zap.String("compareOp", string(r.ruleCondition.CompareOp))) - switch r.ruleCondition.CompareOp { - case ValueIsEq: - return v == value.F - case ValueIsNotEq: - return v != value.F - case ValueIsBelow: - return v < value.F - case ValueIsAbove: - return v > value.F - default: - return false - } -} - func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 { - // todo(amol): add 30 seconds to evalWindow for rate calc // todo(srikanthccv): make this configurable // 2 minutes is reasonable time to wait for data to be available @@ -462,9 +433,10 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 { return &v3.QueryRangeParamsV3{ Start: start, End: end, - Step: 60, + Step: int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)), CompositeQuery: r.ruleCondition.CompositeQuery, Variables: make(map[string]interface{}, 0), + NoCache: true, } } @@ -478,297 +450,13 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 { return &v3.QueryRangeParamsV3{ Start: start, End: end, - Step: 60, + Step: int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)), CompositeQuery: r.ruleCondition.CompositeQuery, + Variables: make(map[string]interface{}, 0), + NoCache: true, } } -func (r *ThresholdRule) shouldSkipFirstRecord() bool { - shouldSkip := false - for _, q := range r.ruleCondition.CompositeQuery.BuilderQueries { - if q.DataSource == v3.DataSourceMetrics && q.AggregateOperator.IsRateOperator() { - shouldSkip = true - } - } - return shouldSkip -} - -// queryClickhouse runs actual query against clickhouse -func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, query string) (Vector, error) { - rows, err := db.Query(ctx, query) - if err != nil { - zap.L().Error("failed to get alert query result", zap.String("rule", r.Name()), zap.Error(err)) - return nil, err - } - - columnTypes := rows.ColumnTypes() - columnNames := rows.Columns() - vars := make([]interface{}, len(columnTypes)) - - for i := range columnTypes { - vars[i] = reflect.New(columnTypes[i].ScanType()).Interface() - } - - // []sample list - var result Vector - - // map[fingerprint]sample - resultMap := make(map[uint64]Sample, 0) - - // for rates we want to skip the first record - // but we dont know when the rates are being used - // so we always pick timeframe - 30 seconds interval - // and skip the first record for a given label combo - // NOTE: this is not applicable for raw queries - skipFirstRecord := make(map[uint64]bool, 0) - - defer rows.Close() - for rows.Next() { - - if err := rows.Scan(vars...); err != nil { - return nil, err - } - r.lastTimestampWithDatapoints = time.Now() - - sample := Sample{} - // Why do we maintain two labels sets? Alertmanager requires - // label keys to follow the model https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels - // However, our traces and logs explorers support label keys with dot and other namespace characters - // Using normalized label keys results in invalid filter criteria. - // The original labels are used to prepare the related{logs, traces} link in alert notification - lbls := labels.NewBuilder(labels.Labels{}) - lblsOrig := labels.NewBuilder(labels.Labels{}) - - for i, v := range vars { - - colName := normalizeLabelName(columnNames[i]) - - switch v := v.(type) { - case *string: - lbls.Set(colName, *v) - lblsOrig.Set(columnNames[i], *v) - case *time.Time: - timval := *v - - if colName == "ts" || colName == "interval" { - sample.Point.T = timval.Unix() - } else { - lbls.Set(colName, timval.Format(constants.AlertTimeFormat)) - lblsOrig.Set(columnNames[i], timval.Format(constants.AlertTimeFormat)) - } - - case *float64: - if _, ok := constants.ReservedColumnTargetAliases[colName]; ok { - sample.Point.V = *v - } else { - lbls.Set(colName, fmt.Sprintf("%f", *v)) - lblsOrig.Set(columnNames[i], fmt.Sprintf("%f", *v)) - } - case **float64: - // ch seems to return this type when column is derived from - // SELECT count(*)/ SELECT count(*) - floatVal := *v - if floatVal != nil { - if _, ok := constants.ReservedColumnTargetAliases[colName]; ok { - sample.Point.V = *floatVal - } else { - lbls.Set(colName, fmt.Sprintf("%f", *floatVal)) - lblsOrig.Set(columnNames[i], fmt.Sprintf("%f", *floatVal)) - } - } - case *float32: - float32Val := float32(*v) - if _, ok := constants.ReservedColumnTargetAliases[colName]; ok { - sample.Point.V = float64(float32Val) - } else { - lbls.Set(colName, fmt.Sprintf("%f", float32Val)) - lblsOrig.Set(columnNames[i], fmt.Sprintf("%f", float32Val)) - } - case *uint8, *uint64, *uint16, *uint32: - if _, ok := constants.ReservedColumnTargetAliases[colName]; ok { - sample.Point.V = float64(reflect.ValueOf(v).Elem().Uint()) - } else { - lbls.Set(colName, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint())) - lblsOrig.Set(columnNames[i], fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint())) - } - case *int8, *int16, *int32, *int64: - if _, ok := constants.ReservedColumnTargetAliases[colName]; ok { - sample.Point.V = float64(reflect.ValueOf(v).Elem().Int()) - } else { - lbls.Set(colName, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int())) - lblsOrig.Set(columnNames[i], fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int())) - } - default: - zap.L().Error("invalid var found in query result", zap.String("ruleId", r.ID()), zap.Any("value", v), zap.Any("column", columnNames[i])) - } - } - - if math.IsNaN(sample.Point.V) { - continue - } - sample.Point.Vs = append(sample.Point.Vs, sample.Point.V) - - // capture lables in result - sample.Metric = lbls.Labels() - sample.MetricOrig = lblsOrig.Labels() - - labelHash := lbls.Labels().Hash() - - // here we walk through values of time series - // and calculate the final value used to compare - // with rule target - if existing, ok := resultMap[labelHash]; ok { - - switch r.matchType() { - case AllTheTimes: - if r.compareOp() == ValueIsAbove { - sample.Point.V = math.Min(existing.Point.V, sample.Point.V) - resultMap[labelHash] = sample - } else if r.compareOp() == ValueIsBelow { - sample.Point.V = math.Max(existing.Point.V, sample.Point.V) - resultMap[labelHash] = sample - } else { - sample.Point.Vs = append(existing.Point.Vs, sample.Point.V) - resultMap[labelHash] = sample - } - case AtleastOnce: - if r.compareOp() == ValueIsAbove { - sample.Point.V = math.Max(existing.Point.V, sample.Point.V) - resultMap[labelHash] = sample - } else if r.compareOp() == ValueIsBelow { - sample.Point.V = math.Min(existing.Point.V, sample.Point.V) - resultMap[labelHash] = sample - } else { - sample.Point.Vs = append(existing.Point.Vs, sample.Point.V) - resultMap[labelHash] = sample - } - case OnAverage: - sample.Point.Vs = append(existing.Point.Vs, sample.Point.V) - sample.Point.V = (existing.Point.V + sample.Point.V) - resultMap[labelHash] = sample - case InTotal: - sample.Point.V = (existing.Point.V + sample.Point.V) - resultMap[labelHash] = sample - } - - } else { - if r.Condition().QueryType() == v3.QueryTypeBuilder { - // for query builder, time series data - // we skip the first record to support rate cases correctly - // improvement(amol): explore approaches to limit this only for - // rate uses cases - if exists := skipFirstRecord[labelHash]; exists || !r.shouldSkipFirstRecord() { - resultMap[labelHash] = sample - } else { - // looks like the first record for this label combo, skip it - skipFirstRecord[labelHash] = true - } - } else { - // for clickhouse raw queries, all records are considered - // improvement(amol): think about supporting rate queries - // written by user. may have to skip a record, similar to qb case(above) - resultMap[labelHash] = sample - } - - } - - } - - if r.matchType() == OnAverage { - for hash, s := range resultMap { - s.Point.V = s.Point.V / float64(len(s.Point.Vs)) - resultMap[hash] = s - } - } - - for hash, s := range resultMap { - if r.matchType() == AllTheTimes && r.compareOp() == ValueIsEq { - for _, v := range s.Point.Vs { - if v != r.targetVal() { // if any of the values is not equal to target, alert shouldn't be sent - s.Point.V = v - } - } - resultMap[hash] = s - } else if r.matchType() == AllTheTimes && r.compareOp() == ValueIsNotEq { - for _, v := range s.Point.Vs { - if v == r.targetVal() { // if any of the values is equal to target, alert shouldn't be sent - s.Point.V = v - } - } - resultMap[hash] = s - } else if r.matchType() == AtleastOnce && r.compareOp() == ValueIsEq { - for _, v := range s.Point.Vs { - if v == r.targetVal() { // if any of the values is equal to target, alert should be sent - s.Point.V = v - } - } - resultMap[hash] = s - } else if r.matchType() == AtleastOnce && r.compareOp() == ValueIsNotEq { - for _, v := range s.Point.Vs { - if v != r.targetVal() { // if any of the values is not equal to target, alert should be sent - s.Point.V = v - } - } - resultMap[hash] = s - } - } - - zap.L().Debug("resultmap(potential alerts)", zap.String("ruleid", r.ID()), zap.Int("count", len(resultMap))) - - // if the data is missing for `For` duration then we should send alert - if r.ruleCondition.AlertOnAbsent && r.lastTimestampWithDatapoints.Add(time.Duration(r.Condition().AbsentFor)*time.Minute).Before(time.Now()) { - zap.L().Info("no data found for rule condition", zap.String("ruleid", r.ID())) - lbls := labels.NewBuilder(labels.Labels{}) - if !r.lastTimestampWithDatapoints.IsZero() { - lbls.Set("lastSeen", r.lastTimestampWithDatapoints.Format(constants.AlertTimeFormat)) - } - result = append(result, Sample{ - Metric: lbls.Labels(), - IsMissing: true, - }) - return result, nil - } - - for _, sample := range resultMap { - // check alert rule condition before dumping results, if sendUnmatchedResults - // is set then add results irrespective of condition - if r.opts.SendUnmatched || r.CheckCondition(sample.Point.V) { - result = append(result, sample) - } - } - if len(result) != 0 { - zap.L().Info("found alerts", zap.String("ruleid", r.ID()), zap.String("query", query), zap.Int("count", len(result))) - } - return result, nil -} - -func (r *ThresholdRule) prepareBuilderQueries(ts time.Time, ch driver.Conn) (map[string]string, error) { - params := r.prepareQueryRange(ts) - if params.CompositeQuery.QueryType == v3.QueryTypeBuilder { - // check if any enrichment is required for logs if yes then enrich them - if logsv3.EnrichmentRequired(params) { - // Note: Sending empty fields key because enrichment is only needed for json - // TODO: Add support for attribute enrichment later - logsv3.Enrich(params, map[string]v3.AttributeKey{}) - } - - } - - var runQueries map[string]string - var err error - - if r.version == "v4" { - if ch != nil { - r.populateTemporality(context.Background(), params, ch) - } - runQueries, err = r.queryBuilderV4.PrepareQueries(params) - } else { - runQueries, err = r.queryBuilder.PrepareQueries(params) - } - - return runQueries, err -} - // The following function is used to prepare the where clause for the query // `lbls` contains the key value pairs of the labels from the result of the query // We iterate over the where clause and replace the labels with the actual values @@ -974,73 +662,26 @@ func (r *ThresholdRule) hostFromSource() string { return fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Hostname()) } -func (r *ThresholdRule) prepareClickhouseQueries(ts time.Time) (map[string]string, error) { - queries := make(map[string]string) - - if r.ruleCondition == nil { - return nil, fmt.Errorf("rule condition is empty") - } - - if r.ruleCondition.QueryType() != v3.QueryTypeClickHouseSQL { - zap.L().Error("unsupported query type in prepareClickhouseQueries", zap.String("ruleid", r.ID())) - return nil, fmt.Errorf("failed to prepare clickhouse queries") - } - - params := r.prepareQueryRange(ts) - - // replace reserved go template variables - querytemplate.AssignReservedVarsV3(params) - - for name, chQuery := range r.ruleCondition.CompositeQuery.ClickHouseQueries { - if chQuery.Disabled { - continue - } - tmpl := template.New("clickhouse-query") - tmpl, err := tmpl.Parse(chQuery.Query) - if err != nil { - zap.L().Error("failed to parse clickhouse query to populate vars", zap.String("ruleid", r.ID()), zap.Error(err)) - r.SetHealth(HealthBad) - return nil, err - } - var query bytes.Buffer - err = tmpl.Execute(&query, params.Variables) - if err != nil { - zap.L().Error("failed to populate clickhouse query", zap.String("ruleid", r.ID()), zap.Error(err)) - r.SetHealth(HealthBad) - return nil, err - } - queries[name] = query.String() - } - return queries, nil -} - func (r *ThresholdRule) GetSelectedQuery() string { - - // The actual query string is not relevant here - // we just need to know the selected query - - var queries map[string]string - var err error - - if r.ruleCondition.QueryType() == v3.QueryTypeBuilder { - queries, err = r.prepareBuilderQueries(time.Now(), nil) - if err != nil { - zap.L().Error("failed to prepare metric queries", zap.String("ruleid", r.ID()), zap.Error(err)) - return "" - } - } else if r.ruleCondition.QueryType() == v3.QueryTypeClickHouseSQL { - queries, err = r.prepareClickhouseQueries(time.Now()) - if err != nil { - zap.L().Error("failed to prepare clickhouse queries", zap.String("ruleid", r.ID()), zap.Error(err)) - return "" - } - } - if r.ruleCondition != nil { if r.ruleCondition.SelectedQuery != "" { return r.ruleCondition.SelectedQuery } + queryNames := map[string]struct{}{} + + if r.ruleCondition.CompositeQuery != nil { + if r.ruleCondition.QueryType() == v3.QueryTypeBuilder { + for name := range r.ruleCondition.CompositeQuery.BuilderQueries { + queryNames[name] = struct{}{} + } + } else if r.ruleCondition.QueryType() == v3.QueryTypeClickHouseSQL { + for name := range r.ruleCondition.CompositeQuery.ClickHouseQueries { + queryNames[name] = struct{}{} + } + } + } + // The following logic exists for backward compatibility // If there is no selected query, then // - check if F1 is present, if yes, return F1 @@ -1048,11 +689,11 @@ func (r *ThresholdRule) GetSelectedQuery() string { // this logic is not really correct. we should be considering // whether the query is enabled or not. but this is a temporary // fix to support backward compatibility - if _, ok := queries["F1"]; ok { + if _, ok := queryNames["F1"]; ok { return "F1" } - keys := make([]string, 0, len(queries)) - for k := range queries { + keys := make([]string, 0, len(queryNames)) + for k := range queryNames { keys = append(keys, k) } sort.Strings(keys) @@ -1062,56 +703,91 @@ func (r *ThresholdRule) GetSelectedQuery() string { return "" } -// query looks if alert condition is being -// satisfied and returns the signals func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch clickhouse.Conn) (Vector, error) { if r.ruleCondition == nil || r.ruleCondition.CompositeQuery == nil { r.SetHealth(HealthBad) + r.SetLastError(fmt.Errorf("no rule condition")) return nil, fmt.Errorf("invalid rule condition") } - // var to hold target query to be executed - var queries map[string]string - var err error + params := r.prepareQueryRange(ts) + err := r.populateTemporality(ctx, params, ch) + if err != nil { + r.SetHealth(HealthBad) + zap.L().Error("failed to set temporality", zap.String("rule", r.Name()), zap.Error(err)) + return nil, fmt.Errorf("internal error while setting temporality") + } - // fetch the target query based on query type - if r.ruleCondition.QueryType() == v3.QueryTypeBuilder { - - queries, err = r.prepareBuilderQueries(ts, ch) - - if err != nil { - zap.L().Error("failed to prepare metric queries", zap.String("ruleid", r.ID()), zap.Error(err)) - return nil, fmt.Errorf("failed to prepare metric queries") + if params.CompositeQuery.QueryType == v3.QueryTypeBuilder { + // check if any enrichment is required for logs if yes then enrich them + if logsv3.EnrichmentRequired(params) { + // Note: Sending empty fields key because enrichment is only needed for json + // TODO: Add support for attribute enrichment later + logsv3.Enrich(params, map[string]v3.AttributeKey{}) } + } - } else if r.ruleCondition.QueryType() == v3.QueryTypeClickHouseSQL { - - queries, err = r.prepareClickhouseQueries(ts) - - if err != nil { - zap.L().Error("failed to prepare clickhouse queries", zap.String("ruleid", r.ID()), zap.Error(err)) - return nil, fmt.Errorf("failed to prepare clickhouse queries") - } + var results []*v3.Result + var errQuriesByName map[string]error + if r.version == "v4" { + results, err, errQuriesByName = r.querierV2.QueryRange(ctx, params, map[string]v3.AttributeKey{}) } else { - return nil, fmt.Errorf("unexpected rule condition - query type is empty") + results, err, errQuriesByName = r.querier.QueryRange(ctx, params, map[string]v3.AttributeKey{}) } - if len(queries) == 0 { - return nil, fmt.Errorf("no queries could be built with the rule config") + if err != nil { + zap.L().Error("failed to get alert query result", zap.String("rule", r.Name()), zap.Error(err), zap.Any("queries", errQuriesByName)) + r.SetHealth(HealthBad) + return nil, fmt.Errorf("internal error while querying") } - zap.L().Info("prepared queries", zap.String("ruleid", r.ID()), zap.Any("queries", queries)) - - queryLabel := r.GetSelectedQuery() - zap.L().Debug("Selected query lable for rule", zap.String("ruleid", r.ID()), zap.String("label", queryLabel)) - - if queryString, ok := queries[queryLabel]; ok { - return r.runChQuery(ctx, ch, queryString) + if params.CompositeQuery.QueryType == v3.QueryTypeBuilder { + results, err = postprocess.PostProcessResult(results, params) + if err != nil { + r.SetHealth(HealthBad) + zap.L().Error("failed to post process result", zap.String("rule", r.Name()), zap.Error(err)) + return nil, fmt.Errorf("internal error while post processing") + } } - zap.L().Error("invalid query label", zap.String("ruleid", r.ID()), zap.String("label", queryLabel), zap.Any("queries", queries)) - return nil, fmt.Errorf("this is unexpected, invalid query label") + selectedQuery := r.GetSelectedQuery() + + var queryResult *v3.Result + for _, res := range results { + if res.QueryName == selectedQuery { + queryResult = res + break + } + } + + if queryResult != nil && len(queryResult.Series) > 0 { + r.lastTimestampWithDatapoints = time.Now() + } + + var resultVector Vector + + // if the data is missing for `For` duration then we should send alert + if r.ruleCondition.AlertOnAbsent && r.lastTimestampWithDatapoints.Add(time.Duration(r.Condition().AbsentFor)*time.Minute).Before(time.Now()) { + zap.L().Info("no data found for rule condition", zap.String("ruleid", r.ID())) + lbls := labels.NewBuilder(labels.Labels{}) + if !r.lastTimestampWithDatapoints.IsZero() { + lbls.Set("lastSeen", r.lastTimestampWithDatapoints.Format(constants.AlertTimeFormat)) + } + resultVector = append(resultVector, Sample{ + Metric: lbls.Labels(), + IsMissing: true, + }) + return resultVector, nil + } + + for _, series := range queryResult.Series { + smpl, shouldAlert := r.shouldAlert(*series) + if shouldAlert { + resultVector = append(resultVector, smpl) + } + } + return resultVector, nil } func normalizeLabelName(name string) string { @@ -1307,3 +983,154 @@ func (r *ThresholdRule) String() string { return string(byt) } + +func removeGroupinSetPoints(series v3.Series) []v3.Point { + var result []v3.Point + for _, s := range series.Points { + if s.Timestamp > 0 { + result = append(result, s) + } + } + return result +} + +func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) { + var alertSmpl Sample + var shouldAlert bool + var lbls labels.Labels + + for name, value := range series.Labels { + lbls = append(lbls, labels.Label{Name: name, Value: value}) + } + + series.Points = removeGroupinSetPoints(series) + + switch r.matchType() { + case AtleastOnce: + // If any sample matches the condition, the rule is firing. + if r.compareOp() == ValueIsAbove { + for _, smpl := range series.Points { + if smpl.Value > r.targetVal() { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls} + shouldAlert = true + break + } + } + } else if r.compareOp() == ValueIsBelow { + for _, smpl := range series.Points { + if smpl.Value < r.targetVal() { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls} + shouldAlert = true + break + } + } + } else if r.compareOp() == ValueIsEq { + for _, smpl := range series.Points { + if smpl.Value == r.targetVal() { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls} + shouldAlert = true + break + } + } + } else if r.compareOp() == ValueIsNotEq { + for _, smpl := range series.Points { + if smpl.Value != r.targetVal() { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls, MetricOrig: lbls} + shouldAlert = true + break + } + } + } + case AllTheTimes: + // If all samples match the condition, the rule is firing. + shouldAlert = true + alertSmpl = Sample{Point: Point{V: r.targetVal()}, Metric: lbls, MetricOrig: lbls} + if r.compareOp() == ValueIsAbove { + for _, smpl := range series.Points { + if smpl.Value <= r.targetVal() { + shouldAlert = false + break + } + } + } else if r.compareOp() == ValueIsBelow { + for _, smpl := range series.Points { + if smpl.Value >= r.targetVal() { + shouldAlert = false + break + } + } + } else if r.compareOp() == ValueIsEq { + for _, smpl := range series.Points { + if smpl.Value != r.targetVal() { + shouldAlert = false + break + } + } + } else if r.compareOp() == ValueIsNotEq { + for _, smpl := range series.Points { + if smpl.Value == r.targetVal() { + shouldAlert = false + break + } + } + } + case OnAverage: + // If the average of all samples matches the condition, the rule is firing. + var sum, count float64 + for _, smpl := range series.Points { + if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) { + continue + } + sum += smpl.Value + count++ + } + avg := sum / count + alertSmpl = Sample{Point: Point{V: avg}, Metric: lbls, MetricOrig: lbls} + if r.compareOp() == ValueIsAbove { + if avg > r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsBelow { + if avg < r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsEq { + if avg == r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsNotEq { + if avg != r.targetVal() { + shouldAlert = true + } + } + case InTotal: + // If the sum of all samples matches the condition, the rule is firing. + var sum float64 + + for _, smpl := range series.Points { + if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) { + continue + } + sum += smpl.Value + } + alertSmpl = Sample{Point: Point{V: sum}, Metric: lbls, MetricOrig: lbls} + if r.compareOp() == ValueIsAbove { + if sum > r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsBelow { + if sum < r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsEq { + if sum == r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsNotEq { + if sum != r.targetVal() { + shouldAlert = true + } + } + } + return alertSmpl, shouldAlert +} diff --git a/pkg/query-service/rules/thresholdRule_test.go b/pkg/query-service/rules/thresholdRule_test.go index 3e51e652b9..faf5803bc6 100644 --- a/pkg/query-service/rules/thresholdRule_test.go +++ b/pkg/query-service/rules/thresholdRule_test.go @@ -1,18 +1,16 @@ package rules import ( - "context" "testing" "time" - cmock "github.com/srikanthccv/ClickHouse-go-mock" "github.com/stretchr/testify/assert" "go.signoz.io/signoz/pkg/query-service/featureManager" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils/labels" ) -func TestThresholdRuleCombinations(t *testing.T) { +func TestThresholdRuleShouldAlert(t *testing.T) { postableRule := PostableRule{ AlertName: "Tricky Condition Tests", AlertType: "METRIC_BASED_ALERT", @@ -37,18 +35,9 @@ func TestThresholdRuleCombinations(t *testing.T) { }, }, } - fm := featureManager.StartManager() - mock, err := cmock.NewClickHouseNative(nil) - if err != nil { - t.Errorf("an error '%s' was not expected when opening a stub database connection", err) - } - - cols := make([]cmock.ColumnType, 0) - cols = append(cols, cmock.ColumnType{Name: "value", Type: "Int32"}) - cols = append(cols, cmock.ColumnType{Name: "endpoint", Type: "String"}) cases := []struct { - values [][]interface{} + values v3.Series expectAlert bool compareOp string matchType string @@ -56,12 +45,14 @@ func TestThresholdRuleCombinations(t *testing.T) { }{ // Test cases for Equals Always { - values: [][]interface{}{ - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + }, }, expectAlert: true, compareOp: "3", // Equals @@ -69,12 +60,14 @@ func TestThresholdRuleCombinations(t *testing.T) { target: 0.0, }, { - values: [][]interface{}{ - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(1), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + {Value: 1.0}, + }, }, expectAlert: false, compareOp: "3", // Equals @@ -82,12 +75,14 @@ func TestThresholdRuleCombinations(t *testing.T) { target: 0.0, }, { - values: [][]interface{}{ - {int32(0), "endpoint"}, - {int32(1), "endpoint"}, - {int32(0), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 0.0}, + {Value: 1.0}, + {Value: 0.0}, + {Value: 1.0}, + {Value: 1.0}, + }, }, expectAlert: false, compareOp: "3", // Equals @@ -95,12 +90,14 @@ func TestThresholdRuleCombinations(t *testing.T) { target: 0.0, }, { - values: [][]interface{}{ - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + }, }, expectAlert: false, compareOp: "3", // Equals @@ -109,12 +106,14 @@ func TestThresholdRuleCombinations(t *testing.T) { }, // Test cases for Equals Once { - values: [][]interface{}{ - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + }, }, expectAlert: true, compareOp: "3", // Equals @@ -122,12 +121,14 @@ func TestThresholdRuleCombinations(t *testing.T) { target: 0.0, }, { - values: [][]interface{}{ - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(1), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + {Value: 1.0}, + }, }, expectAlert: true, compareOp: "3", // Equals @@ -135,12 +136,14 @@ func TestThresholdRuleCombinations(t *testing.T) { target: 0.0, }, { - values: [][]interface{}{ - {int32(0), "endpoint"}, - {int32(1), "endpoint"}, - {int32(0), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 0.0}, + {Value: 1.0}, + {Value: 0.0}, + {Value: 1.0}, + {Value: 1.0}, + }, }, expectAlert: true, compareOp: "3", // Equals @@ -148,26 +151,92 @@ func TestThresholdRuleCombinations(t *testing.T) { target: 0.0, }, { - values: [][]interface{}{ - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + }, }, expectAlert: false, compareOp: "3", // Equals matchType: "1", // Once target: 0.0, }, + // Test cases for Greater Than Always + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 4.0}, + {Value: 6.0}, + {Value: 8.0}, + {Value: 2.0}, + }, + }, + expectAlert: true, + compareOp: "1", // Greater Than + matchType: "2", // Always + target: 1.5, + }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 4.0}, + {Value: 6.0}, + {Value: 8.0}, + {Value: 2.0}, + }, + }, + expectAlert: false, + compareOp: "1", // Greater Than + matchType: "2", // Always + target: 4.5, + }, + // Test cases for Greater Than Once + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 4.0}, + {Value: 6.0}, + {Value: 8.0}, + {Value: 2.0}, + }, + }, + expectAlert: true, + compareOp: "1", // Greater Than + matchType: "1", // Once + target: 4.5, + }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 4.0}, + {Value: 4.0}, + {Value: 4.0}, + {Value: 4.0}, + {Value: 4.0}, + }, + }, + expectAlert: false, + compareOp: "1", // Greater Than + matchType: "1", // Once + target: 4.5, + }, // Test cases for Not Equals Always { - values: [][]interface{}{ - {int32(0), "endpoint"}, - {int32(1), "endpoint"}, - {int32(0), "endpoint"}, - {int32(1), "endpoint"}, - {int32(0), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 0.0}, + {Value: 1.0}, + {Value: 0.0}, + {Value: 1.0}, + {Value: 0.0}, + }, }, expectAlert: false, compareOp: "4", // Not Equals @@ -175,12 +244,14 @@ func TestThresholdRuleCombinations(t *testing.T) { target: 0.0, }, { - values: [][]interface{}{ - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(0), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + {Value: 0.0}, + }, }, expectAlert: false, compareOp: "4", // Not Equals @@ -188,12 +259,14 @@ func TestThresholdRuleCombinations(t *testing.T) { target: 0.0, }, { - values: [][]interface{}{ - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + }, }, expectAlert: true, compareOp: "4", // Not Equals @@ -201,12 +274,14 @@ func TestThresholdRuleCombinations(t *testing.T) { target: 0.0, }, { - values: [][]interface{}{ - {int32(1), "endpoint"}, - {int32(0), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 1.0}, + {Value: 0.0}, + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + }, }, expectAlert: false, compareOp: "4", // Not Equals @@ -215,12 +290,14 @@ func TestThresholdRuleCombinations(t *testing.T) { }, // Test cases for Not Equals Once { - values: [][]interface{}{ - {int32(0), "endpoint"}, - {int32(1), "endpoint"}, - {int32(0), "endpoint"}, - {int32(1), "endpoint"}, - {int32(0), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 0.0}, + {Value: 1.0}, + {Value: 0.0}, + {Value: 1.0}, + {Value: 0.0}, + }, }, expectAlert: true, compareOp: "4", // Not Equals @@ -228,12 +305,14 @@ func TestThresholdRuleCombinations(t *testing.T) { target: 0.0, }, { - values: [][]interface{}{ - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + {Value: 0.0}, + }, }, expectAlert: false, compareOp: "4", // Not Equals @@ -241,12 +320,14 @@ func TestThresholdRuleCombinations(t *testing.T) { target: 0.0, }, { - values: [][]interface{}{ - {int32(0), "endpoint"}, - {int32(0), "endpoint"}, - {int32(1), "endpoint"}, - {int32(0), "endpoint"}, - {int32(1), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 0.0}, + {Value: 0.0}, + {Value: 1.0}, + {Value: 0.0}, + {Value: 1.0}, + }, }, expectAlert: true, compareOp: "4", // Not Equals @@ -254,85 +335,294 @@ func TestThresholdRuleCombinations(t *testing.T) { target: 0.0, }, { - values: [][]interface{}{ - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, - {int32(1), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + {Value: 1.0}, + }, }, expectAlert: true, compareOp: "4", // Not Equals matchType: "1", // Once target: 0.0, }, + // Test cases for Less Than Always { - values: [][]interface{}{ - {int32(2), "endpoint"}, - {int32(3), "endpoint"}, - {int32(2), "endpoint"}, - {int32(4), "endpoint"}, - {int32(2), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 1.5}, + {Value: 1.5}, + {Value: 1.5}, + {Value: 1.5}, + {Value: 1.5}, + }, }, expectAlert: true, - compareOp: "2", // Below - matchType: "3", // On Average - target: 3.0, + compareOp: "2", // Less Than + matchType: "2", // Always + target: 4, }, { - values: [][]interface{}{ - {int32(4), "endpoint"}, - {int32(7), "endpoint"}, - {int32(5), "endpoint"}, - {int32(2), "endpoint"}, - {int32(9), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 4.5}, + {Value: 4.5}, + {Value: 4.5}, + {Value: 4.5}, + {Value: 4.5}, + }, }, expectAlert: false, - compareOp: "2", // Below - matchType: "3", // On Average - target: 3.0, + compareOp: "2", // Less Than + matchType: "2", // Always + target: 4, }, + // Test cases for Less Than Once { - values: [][]interface{}{ - {int32(4), "endpoint"}, - {int32(7), "endpoint"}, - {int32(5), "endpoint"}, - {int32(2), "endpoint"}, - {int32(9), "endpoint"}, + values: v3.Series{ + Points: []v3.Point{ + {Value: 4.5}, + {Value: 4.5}, + {Value: 4.5}, + {Value: 4.5}, + {Value: 2.5}, + }, }, expectAlert: true, - compareOp: "2", // Below - matchType: "3", // On Average + compareOp: "2", // Less Than + matchType: "1", // Once + target: 4, + }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 4.5}, + {Value: 4.5}, + {Value: 4.5}, + {Value: 4.5}, + {Value: 4.5}, + }, + }, + expectAlert: false, + compareOp: "2", // Less Than + matchType: "1", // Once + target: 4, + }, + // Test cases for OnAverage + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 4.0}, + {Value: 6.0}, + {Value: 8.0}, + {Value: 2.0}, + }, + }, + expectAlert: true, + compareOp: "3", // Equals + matchType: "3", // OnAverage target: 6.0, }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 4.0}, + {Value: 6.0}, + {Value: 8.0}, + {Value: 2.0}, + }, + }, + expectAlert: false, + compareOp: "3", // Equals + matchType: "3", // OnAverage + target: 4.5, + }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 4.0}, + {Value: 6.0}, + {Value: 8.0}, + {Value: 2.0}, + }, + }, + expectAlert: true, + compareOp: "4", // Not Equals + matchType: "3", // OnAverage + target: 4.5, + }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 4.0}, + {Value: 6.0}, + {Value: 8.0}, + {Value: 2.0}, + }, + }, + expectAlert: false, + compareOp: "4", // Not Equals + matchType: "3", // OnAverage + target: 6.0, + }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 4.0}, + {Value: 6.0}, + {Value: 8.0}, + {Value: 2.0}, + }, + }, + expectAlert: true, + compareOp: "1", // Greater Than + matchType: "3", // OnAverage + target: 4.5, + }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 4.0}, + {Value: 6.0}, + {Value: 8.0}, + {Value: 2.0}, + }, + }, + expectAlert: true, + compareOp: "2", // Less Than + matchType: "3", // OnAverage + target: 12.0, + }, + // Test cases for InTotal + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 4.0}, + {Value: 6.0}, + {Value: 8.0}, + {Value: 2.0}, + }, + }, + expectAlert: true, + compareOp: "3", // Equals + matchType: "4", // InTotal + target: 30.0, + }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 4.0}, + {Value: 6.0}, + {Value: 8.0}, + {Value: 2.0}, + }, + }, + expectAlert: false, + compareOp: "3", // Equals + matchType: "4", // InTotal + target: 20.0, + }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + }, + }, + expectAlert: true, + compareOp: "4", // Not Equals + matchType: "4", // InTotal + target: 9.0, + }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + }, + }, + expectAlert: false, + compareOp: "4", // Not Equals + matchType: "4", // InTotal + target: 10.0, + }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 10.0}, + }, + }, + expectAlert: true, + compareOp: "1", // Greater Than + matchType: "4", // InTotal + target: 10.0, + }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 10.0}, + }, + }, + expectAlert: false, + compareOp: "1", // Greater Than + matchType: "4", // InTotal + target: 20.0, + }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 10.0}, + }, + }, + expectAlert: true, + compareOp: "2", // Less Than + matchType: "4", // InTotal + target: 30.0, + }, + { + values: v3.Series{ + Points: []v3.Point{ + {Value: 10.0}, + {Value: 10.0}, + }, + }, + expectAlert: false, + compareOp: "2", // Less Than + matchType: "4", // InTotal + target: 20.0, + }, } + fm := featureManager.StartManager() for idx, c := range cases { - rows := cmock.NewRows(cols, c.values) - // We are testing the eval logic after the query is run - // so we don't care about the query string here - queryString := "SELECT value, endpoint FROM table" - mock. - ExpectQuery(queryString). - WillReturnRows(rows) postableRule.RuleCondition.CompareOp = CompareOp(c.compareOp) postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm) + rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil) if err != nil { assert.NoError(t, err) } - result, err := rule.runChQuery(context.Background(), mock, queryString) - if err != nil { - assert.NoError(t, err) - } - if c.expectAlert { - assert.Equal(t, 1, len(result), "case %d", idx) - } else { - assert.Equal(t, 0, len(result), "case %d", idx) + values := c.values + for i := range values.Points { + values.Points[i].Timestamp = time.Now().UnixMilli() } + + _, shoulAlert := rule.shouldAlert(c.values) + assert.Equal(t, c.expectAlert, shoulAlert, "Test case %d", idx) } } @@ -407,7 +697,7 @@ func TestPrepareLinksToLogs(t *testing.T) { } fm := featureManager.StartManager() - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm) + rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil) if err != nil { assert.NoError(t, err) } @@ -449,7 +739,7 @@ func TestPrepareLinksToTraces(t *testing.T) { } fm := featureManager.StartManager() - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm) + rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil) if err != nil { assert.NoError(t, err) }