From 00863e54deda1d0fa81a63725f1afc3967a9c45b Mon Sep 17 00:00:00 2001 From: Amol Umbark Date: Wed, 23 Nov 2022 18:49:03 +0530 Subject: [PATCH] feat: added ch query support (#1735) * feat: added ch query support * fix: added new vars to resolve alert query format issue * fix: replaced timestamp vars in metric query range Co-authored-by: Pranay Prateek Co-authored-by: Srikanth Chekuri --- pkg/query-service/app/http_handler.go | 6 + pkg/query-service/rules/apiParams.go | 3 +- pkg/query-service/rules/thresholdRule.go | 144 +++++++++++++++--- pkg/query-service/utils/queryTemplate/vars.go | 24 +++ 4 files changed, 151 insertions(+), 26 deletions(-) create mode 100644 pkg/query-service/utils/queryTemplate/vars.go diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 18d2743924..fcf25b93f0 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -24,6 +24,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/parser" "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/constants" + querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate" "go.signoz.io/signoz/pkg/query-service/dao" am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" @@ -652,11 +653,16 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request return } var query bytes.Buffer + + // replace go template variables + querytemplate.AssignReservedVars(metricsQueryRangeParams) + err = tmpl.Execute(&query, metricsQueryRangeParams.Variables) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) return } + queries[name] = query.String() } seriesList, err, errQuriesByName = execClickHouseQueries(queries) diff --git a/pkg/query-service/rules/apiParams.go b/pkg/query-service/rules/apiParams.go index bf4c41a17a..aad672c99a 100644 --- a/pkg/query-service/rules/apiParams.go +++ b/pkg/query-service/rules/apiParams.go @@ -32,6 +32,7 @@ func newApiErrorBadData(err error) *model.ApiError { // PostableRule is used to create alerting rule from HTTP api type PostableRule struct { Alert string `yaml:"alert,omitempty" json:"alert,omitempty"` + AlertType string `yaml:"alertType,omitempty" json:"alertType,omitempty"` Description string `yaml:"description,omitempty" json:"description,omitempty"` RuleType RuleType `yaml:"ruleType,omitempty" json:"ruleType,omitempty"` EvalWindow Duration `yaml:"evalWindow,omitempty" json:"evalWindow,omitempty"` @@ -92,7 +93,7 @@ func parseIntoRule(initRule PostableRule, content []byte, kind string) (*Postabl CompositeMetricQuery: &model.CompositeMetricQuery{ QueryType: model.PROM, PromQueries: map[string]*model.PromQuery{ - "A": &model.PromQuery{ + "A": { Query: rule.Expr, }, }, diff --git a/pkg/query-service/rules/thresholdRule.go b/pkg/query-service/rules/thresholdRule.go index 0ce8d9317b..3d08f70d35 100644 --- a/pkg/query-service/rules/thresholdRule.go +++ b/pkg/query-service/rules/thresholdRule.go @@ -1,12 +1,14 @@ package rules import ( + "bytes" "context" "fmt" "math" "reflect" "sort" "sync" + "text/template" "time" "go.uber.org/zap" @@ -16,6 +18,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/constants" qsmodel "go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/utils/labels" + querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate" "go.signoz.io/signoz/pkg/query-service/utils/times" "go.signoz.io/signoz/pkg/query-service/utils/timestamp" "go.signoz.io/signoz/pkg/query-service/utils/value" @@ -320,6 +323,7 @@ func (r *ThresholdRule) CheckCondition(v float64) bool { return false } + zap.S().Debugf("target:", v, *r.ruleCondition.Target) switch r.ruleCondition.CompareOp { case ValueIsEq: return v == *r.ruleCondition.Target @@ -336,17 +340,21 @@ func (r *ThresholdRule) CheckCondition(v float64) bool { func (r *ThresholdRule) prepareQueryRange(ts time.Time) *qsmodel.QueryRangeParamsV2 { // todo(amol): add 30 seconds to evalWindow for rate calc - tsEnd := ts.UnixNano() / int64(time.Millisecond) - tsStart := ts.Add(-time.Duration(r.evalWindow)).UnixNano() / int64(time.Millisecond) - // for k, v := range r.ruleCondition.CompositeMetricQuery.BuilderQueries { - // v.ReduceTo = qsmodel.RMAX - // r.ruleCondition.CompositeMetricQuery.BuilderQueries[k] = v - // } + if r.ruleCondition.QueryType() == qsmodel.CLICKHOUSE { + return &qsmodel.QueryRangeParamsV2{ + Start: ts.UnixMilli(), + End: ts.Add(-time.Duration(r.evalWindow)).UnixMilli(), + Step: 30, + CompositeMetricQuery: r.ruleCondition.CompositeMetricQuery, + Variables: make(map[string]interface{}, 0), + } + } + // default mode return &qsmodel.QueryRangeParamsV2{ - Start: tsStart, - End: tsEnd, + Start: ts.UnixMilli(), + End: ts.Add(-time.Duration(r.evalWindow)).UnixMilli(), Step: 30, CompositeMetricQuery: r.ruleCondition.CompositeMetricQuery, } @@ -384,6 +392,7 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer // but we dont know when the rates are being used // so we always pick timeframe - 30 seconds interval // and skip the first record for a given label combo + // NOTE: this is not applicable for raw queries skipFirstRecord := make(map[uint64]bool, 0) defer rows.Close() @@ -406,7 +415,7 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer case *time.Time: timval := *v - if colName == "ts" { + if colName == "ts" || colName == "interval" { sample.Point.T = timval.Unix() } else { lbls.Set(colName, timval.Format("2006-01-02 15:04:05")) @@ -478,12 +487,24 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer } } else { - if exists, _ := skipFirstRecord[labelHash]; exists { - resultMap[labelHash] = sample + if r.Condition().QueryType() == qsmodel.QUERY_BUILDER { + // for query builder, time series data + // we skip the first record to support rate cases correctly + // improvement(amol): explore approaches to limit this only for + // rate uses cases + if exists, _ := skipFirstRecord[labelHash]; exists { + resultMap[labelHash] = sample + } else { + // looks like the first record for this label combo, skip it + skipFirstRecord[labelHash] = true + } } else { - // looks like the first record for this label combo, skip it - skipFirstRecord[labelHash] = true + // for clickhouse raw queries, all records are considered + // improvement(amol): think about supporting rate queries + // written by user. may have to skip a record, similar to qb case(above) + resultMap[labelHash] = sample } + } } zap.S().Debugf("ruleid:", r.ID(), "\t resultmap(potential alerts):", len(resultMap)) @@ -499,32 +520,105 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer return result, nil } +func (r *ThresholdRule) prepareBuilderQueries(ts time.Time) (map[string]string, error) { + params := r.prepareQueryRange(ts) + runQueries := metrics.PrepareBuilderMetricQueries(params, constants.SIGNOZ_TIMESERIES_TABLENAME) + + return runQueries.Queries, runQueries.Err +} + +func (r *ThresholdRule) prepareClickhouseQueries(ts time.Time) (map[string]string, error) { + queries := make(map[string]string) + + if r.ruleCondition == nil { + return nil, fmt.Errorf("rule condition is empty") + } + + if r.ruleCondition.QueryType() != qsmodel.CLICKHOUSE { + zap.S().Debugf("ruleid:", r.ID(), "\t msg: unsupported query type in prepareClickhouseQueries()") + return nil, fmt.Errorf("failed to prepare clickhouse queries") + } + + params := r.prepareQueryRange(ts) + + // replace reserved go template variables + querytemplate.AssignReservedVars(params) + + for name, chQuery := range r.ruleCondition.CompositeMetricQuery.ClickHouseQueries { + if chQuery.Disabled { + continue + } + tmpl := template.New("clickhouse-query") + tmpl, err := tmpl.Parse(chQuery.Query) + if err != nil { + zap.S().Errorf("ruleid:", r.ID(), "\t msg: failed to parse clickhouse query to populate vars", err) + r.SetHealth(HealthBad) + return nil, err + } + var query bytes.Buffer + err = tmpl.Execute(&query, params.Variables) + if err != nil { + zap.S().Errorf("ruleid:", r.ID(), "\t msg: failed to populate clickhouse query", err) + r.SetHealth(HealthBad) + return nil, err + } + zap.S().Debugf("ruleid:", r.ID(), "\t query:", query.String()) + queries[name] = query.String() + } + return queries, nil +} + // query looks if alert condition is being // satisfied and returns the signals func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch clickhouse.Conn) (Vector, error) { - params := r.prepareQueryRange(ts) - - runQueries := metrics.PrepareBuilderMetricQueries(params, constants.SIGNOZ_TIMESERIES_TABLENAME) - if runQueries.Err != nil { - return nil, fmt.Errorf("failed to prepare metric queries: %v", runQueries.Err) + if r.ruleCondition == nil || r.ruleCondition.CompositeMetricQuery == nil { + r.SetHealth(HealthBad) + return nil, fmt.Errorf("invalid rule condition") } - if len(runQueries.Queries) == 0 { + // var to hold target query to be executed + queries := make(map[string]string) + var err error + + // fetch the target query based on query type + if r.ruleCondition.QueryType() == qsmodel.QUERY_BUILDER { + + queries, err = r.prepareBuilderQueries(ts) + + if err != nil { + zap.S().Errorf("ruleid:", r.ID(), "\t msg: failed to prepare metric queries", zap.Error(err)) + return nil, fmt.Errorf("failed to prepare metric queries") + } + + } else if r.ruleCondition.QueryType() == qsmodel.CLICKHOUSE { + + queries, err = r.prepareClickhouseQueries(ts) + + if err != nil { + zap.S().Errorf("ruleid:", r.ID(), "\t msg: failed to prepare clickhouse queries", zap.Error(err)) + return nil, fmt.Errorf("failed to prepare clickhouse queries") + } + + } else { + return nil, fmt.Errorf("unexpected rule condition - query type is empty") + } + + if len(queries) == 0 { return nil, fmt.Errorf("no queries could be built with the rule config") } - zap.S().Debugf("ruleid:", r.ID(), "\t runQueries:", runQueries.Queries) + zap.S().Debugf("ruleid:", r.ID(), "\t runQueries:", queries) // find target query label - if query, ok := runQueries.Queries["F1"]; ok { + if query, ok := queries["F1"]; ok { // found a formula query, run with it return r.runChQuery(ctx, ch, query) } // no formula in rule condition, now look for // query label with max ascii val - keys := make([]string, 0, len(runQueries.Queries)) - for k := range runQueries.Queries { + keys := make([]string, 0, len(queries)) + for k := range queries { keys = append(keys, k) } sort.Strings(keys) @@ -533,11 +627,11 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch c zap.S().Debugf("ruleId: ", r.ID(), "\t result query label:", queryLabel) - if queryString, ok := runQueries.Queries[queryLabel]; ok { + if queryString, ok := queries[queryLabel]; ok { return r.runChQuery(ctx, ch, queryString) } - zap.S().Errorf("ruleId: ", r.ID(), "\t invalid query label:", queryLabel, "\t queries:", runQueries.Queries) + zap.S().Errorf("ruleId: ", r.ID(), "\t invalid query label:", queryLabel, "\t queries:", queries) return nil, fmt.Errorf("this is unexpected, invalid query label") } diff --git a/pkg/query-service/utils/queryTemplate/vars.go b/pkg/query-service/utils/queryTemplate/vars.go new file mode 100644 index 0000000000..093977aa01 --- /dev/null +++ b/pkg/query-service/utils/queryTemplate/vars.go @@ -0,0 +1,24 @@ +package querytemplate + +import ( + "fmt" + + "go.signoz.io/signoz/pkg/query-service/model" +) + +// AssignReservedVars assigns values for go template vars. assumes that +// model.QueryRangeParamsV2.Start and End are Unix Nano timestamps +func AssignReservedVars(metricsQueryRangeParams *model.QueryRangeParamsV2) { + metricsQueryRangeParams.Variables["start_timestamp"] = metricsQueryRangeParams.Start / 1000 + metricsQueryRangeParams.Variables["end_timestamp"] = metricsQueryRangeParams.End / 1000 + + metricsQueryRangeParams.Variables["start_timestamp_ms"] = metricsQueryRangeParams.Start + metricsQueryRangeParams.Variables["end_timestamp_ms"] = metricsQueryRangeParams.End + + metricsQueryRangeParams.Variables["start_timestamp_nano"] = metricsQueryRangeParams.Start * 1e6 + metricsQueryRangeParams.Variables["end_timestamp_nano"] = metricsQueryRangeParams.End * 1e6 + + metricsQueryRangeParams.Variables["start_datetime"] = fmt.Sprintf("toDateTime(%d)", metricsQueryRangeParams.Start/1000) + metricsQueryRangeParams.Variables["end_datetime"] = fmt.Sprintf("toDateTime(%d)", metricsQueryRangeParams.End/1000) + +}