mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-10 03:09:02 +08:00
feat: added ch query support (#1735)
* feat: added ch query support * fix: added new vars to resolve alert query format issue * fix: replaced timestamp vars in metric query range Co-authored-by: Pranay Prateek <pranay@signoz.io> Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
parent
e9c47a6a73
commit
00863e54de
@ -24,6 +24,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/app/parser"
|
||||
"go.signoz.io/signoz/pkg/query-service/auth"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/dao"
|
||||
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
|
||||
@ -652,11 +653,16 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
|
||||
return
|
||||
}
|
||||
var query bytes.Buffer
|
||||
|
||||
// replace go template variables
|
||||
querytemplate.AssignReservedVars(metricsQueryRangeParams)
|
||||
|
||||
err = tmpl.Execute(&query, metricsQueryRangeParams.Variables)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
queries[name] = query.String()
|
||||
}
|
||||
seriesList, err, errQuriesByName = execClickHouseQueries(queries)
|
||||
|
@ -32,6 +32,7 @@ func newApiErrorBadData(err error) *model.ApiError {
|
||||
// PostableRule is used to create alerting rule from HTTP api
|
||||
type PostableRule struct {
|
||||
Alert string `yaml:"alert,omitempty" json:"alert,omitempty"`
|
||||
AlertType string `yaml:"alertType,omitempty" json:"alertType,omitempty"`
|
||||
Description string `yaml:"description,omitempty" json:"description,omitempty"`
|
||||
RuleType RuleType `yaml:"ruleType,omitempty" json:"ruleType,omitempty"`
|
||||
EvalWindow Duration `yaml:"evalWindow,omitempty" json:"evalWindow,omitempty"`
|
||||
@ -92,7 +93,7 @@ func parseIntoRule(initRule PostableRule, content []byte, kind string) (*Postabl
|
||||
CompositeMetricQuery: &model.CompositeMetricQuery{
|
||||
QueryType: model.PROM,
|
||||
PromQueries: map[string]*model.PromQuery{
|
||||
"A": &model.PromQuery{
|
||||
"A": {
|
||||
Query: rule.Expr,
|
||||
},
|
||||
},
|
||||
|
@ -1,12 +1,14 @@
|
||||
package rules
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
@ -16,6 +18,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
qsmodel "go.signoz.io/signoz/pkg/query-service/model"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils/labels"
|
||||
querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils/times"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils/timestamp"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils/value"
|
||||
@ -320,6 +323,7 @@ func (r *ThresholdRule) CheckCondition(v float64) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
zap.S().Debugf("target:", v, *r.ruleCondition.Target)
|
||||
switch r.ruleCondition.CompareOp {
|
||||
case ValueIsEq:
|
||||
return v == *r.ruleCondition.Target
|
||||
@ -336,17 +340,21 @@ func (r *ThresholdRule) CheckCondition(v float64) bool {
|
||||
|
||||
func (r *ThresholdRule) prepareQueryRange(ts time.Time) *qsmodel.QueryRangeParamsV2 {
|
||||
// todo(amol): add 30 seconds to evalWindow for rate calc
|
||||
tsEnd := ts.UnixNano() / int64(time.Millisecond)
|
||||
tsStart := ts.Add(-time.Duration(r.evalWindow)).UnixNano() / int64(time.Millisecond)
|
||||
|
||||
// for k, v := range r.ruleCondition.CompositeMetricQuery.BuilderQueries {
|
||||
// v.ReduceTo = qsmodel.RMAX
|
||||
// r.ruleCondition.CompositeMetricQuery.BuilderQueries[k] = v
|
||||
// }
|
||||
if r.ruleCondition.QueryType() == qsmodel.CLICKHOUSE {
|
||||
return &qsmodel.QueryRangeParamsV2{
|
||||
Start: ts.UnixMilli(),
|
||||
End: ts.Add(-time.Duration(r.evalWindow)).UnixMilli(),
|
||||
Step: 30,
|
||||
CompositeMetricQuery: r.ruleCondition.CompositeMetricQuery,
|
||||
Variables: make(map[string]interface{}, 0),
|
||||
}
|
||||
}
|
||||
|
||||
// default mode
|
||||
return &qsmodel.QueryRangeParamsV2{
|
||||
Start: tsStart,
|
||||
End: tsEnd,
|
||||
Start: ts.UnixMilli(),
|
||||
End: ts.Add(-time.Duration(r.evalWindow)).UnixMilli(),
|
||||
Step: 30,
|
||||
CompositeMetricQuery: r.ruleCondition.CompositeMetricQuery,
|
||||
}
|
||||
@ -384,6 +392,7 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer
|
||||
// but we dont know when the rates are being used
|
||||
// so we always pick timeframe - 30 seconds interval
|
||||
// and skip the first record for a given label combo
|
||||
// NOTE: this is not applicable for raw queries
|
||||
skipFirstRecord := make(map[uint64]bool, 0)
|
||||
|
||||
defer rows.Close()
|
||||
@ -406,7 +415,7 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer
|
||||
case *time.Time:
|
||||
timval := *v
|
||||
|
||||
if colName == "ts" {
|
||||
if colName == "ts" || colName == "interval" {
|
||||
sample.Point.T = timval.Unix()
|
||||
} else {
|
||||
lbls.Set(colName, timval.Format("2006-01-02 15:04:05"))
|
||||
@ -478,12 +487,24 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer
|
||||
}
|
||||
|
||||
} else {
|
||||
if exists, _ := skipFirstRecord[labelHash]; exists {
|
||||
resultMap[labelHash] = sample
|
||||
if r.Condition().QueryType() == qsmodel.QUERY_BUILDER {
|
||||
// for query builder, time series data
|
||||
// we skip the first record to support rate cases correctly
|
||||
// improvement(amol): explore approaches to limit this only for
|
||||
// rate uses cases
|
||||
if exists, _ := skipFirstRecord[labelHash]; exists {
|
||||
resultMap[labelHash] = sample
|
||||
} else {
|
||||
// looks like the first record for this label combo, skip it
|
||||
skipFirstRecord[labelHash] = true
|
||||
}
|
||||
} else {
|
||||
// looks like the first record for this label combo, skip it
|
||||
skipFirstRecord[labelHash] = true
|
||||
// for clickhouse raw queries, all records are considered
|
||||
// improvement(amol): think about supporting rate queries
|
||||
// written by user. may have to skip a record, similar to qb case(above)
|
||||
resultMap[labelHash] = sample
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
zap.S().Debugf("ruleid:", r.ID(), "\t resultmap(potential alerts):", len(resultMap))
|
||||
@ -499,32 +520,105 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r *ThresholdRule) prepareBuilderQueries(ts time.Time) (map[string]string, error) {
|
||||
params := r.prepareQueryRange(ts)
|
||||
runQueries := metrics.PrepareBuilderMetricQueries(params, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||
|
||||
return runQueries.Queries, runQueries.Err
|
||||
}
|
||||
|
||||
func (r *ThresholdRule) prepareClickhouseQueries(ts time.Time) (map[string]string, error) {
|
||||
queries := make(map[string]string)
|
||||
|
||||
if r.ruleCondition == nil {
|
||||
return nil, fmt.Errorf("rule condition is empty")
|
||||
}
|
||||
|
||||
if r.ruleCondition.QueryType() != qsmodel.CLICKHOUSE {
|
||||
zap.S().Debugf("ruleid:", r.ID(), "\t msg: unsupported query type in prepareClickhouseQueries()")
|
||||
return nil, fmt.Errorf("failed to prepare clickhouse queries")
|
||||
}
|
||||
|
||||
params := r.prepareQueryRange(ts)
|
||||
|
||||
// replace reserved go template variables
|
||||
querytemplate.AssignReservedVars(params)
|
||||
|
||||
for name, chQuery := range r.ruleCondition.CompositeMetricQuery.ClickHouseQueries {
|
||||
if chQuery.Disabled {
|
||||
continue
|
||||
}
|
||||
tmpl := template.New("clickhouse-query")
|
||||
tmpl, err := tmpl.Parse(chQuery.Query)
|
||||
if err != nil {
|
||||
zap.S().Errorf("ruleid:", r.ID(), "\t msg: failed to parse clickhouse query to populate vars", err)
|
||||
r.SetHealth(HealthBad)
|
||||
return nil, err
|
||||
}
|
||||
var query bytes.Buffer
|
||||
err = tmpl.Execute(&query, params.Variables)
|
||||
if err != nil {
|
||||
zap.S().Errorf("ruleid:", r.ID(), "\t msg: failed to populate clickhouse query", err)
|
||||
r.SetHealth(HealthBad)
|
||||
return nil, err
|
||||
}
|
||||
zap.S().Debugf("ruleid:", r.ID(), "\t query:", query.String())
|
||||
queries[name] = query.String()
|
||||
}
|
||||
return queries, nil
|
||||
}
|
||||
|
||||
// query looks if alert condition is being
|
||||
// satisfied and returns the signals
|
||||
func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch clickhouse.Conn) (Vector, error) {
|
||||
params := r.prepareQueryRange(ts)
|
||||
|
||||
runQueries := metrics.PrepareBuilderMetricQueries(params, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||
if runQueries.Err != nil {
|
||||
return nil, fmt.Errorf("failed to prepare metric queries: %v", runQueries.Err)
|
||||
if r.ruleCondition == nil || r.ruleCondition.CompositeMetricQuery == nil {
|
||||
r.SetHealth(HealthBad)
|
||||
return nil, fmt.Errorf("invalid rule condition")
|
||||
}
|
||||
|
||||
if len(runQueries.Queries) == 0 {
|
||||
// var to hold target query to be executed
|
||||
queries := make(map[string]string)
|
||||
var err error
|
||||
|
||||
// fetch the target query based on query type
|
||||
if r.ruleCondition.QueryType() == qsmodel.QUERY_BUILDER {
|
||||
|
||||
queries, err = r.prepareBuilderQueries(ts)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Errorf("ruleid:", r.ID(), "\t msg: failed to prepare metric queries", zap.Error(err))
|
||||
return nil, fmt.Errorf("failed to prepare metric queries")
|
||||
}
|
||||
|
||||
} else if r.ruleCondition.QueryType() == qsmodel.CLICKHOUSE {
|
||||
|
||||
queries, err = r.prepareClickhouseQueries(ts)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Errorf("ruleid:", r.ID(), "\t msg: failed to prepare clickhouse queries", zap.Error(err))
|
||||
return nil, fmt.Errorf("failed to prepare clickhouse queries")
|
||||
}
|
||||
|
||||
} else {
|
||||
return nil, fmt.Errorf("unexpected rule condition - query type is empty")
|
||||
}
|
||||
|
||||
if len(queries) == 0 {
|
||||
return nil, fmt.Errorf("no queries could be built with the rule config")
|
||||
}
|
||||
|
||||
zap.S().Debugf("ruleid:", r.ID(), "\t runQueries:", runQueries.Queries)
|
||||
zap.S().Debugf("ruleid:", r.ID(), "\t runQueries:", queries)
|
||||
|
||||
// find target query label
|
||||
if query, ok := runQueries.Queries["F1"]; ok {
|
||||
if query, ok := queries["F1"]; ok {
|
||||
// found a formula query, run with it
|
||||
return r.runChQuery(ctx, ch, query)
|
||||
}
|
||||
|
||||
// no formula in rule condition, now look for
|
||||
// query label with max ascii val
|
||||
keys := make([]string, 0, len(runQueries.Queries))
|
||||
for k := range runQueries.Queries {
|
||||
keys := make([]string, 0, len(queries))
|
||||
for k := range queries {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
@ -533,11 +627,11 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch c
|
||||
|
||||
zap.S().Debugf("ruleId: ", r.ID(), "\t result query label:", queryLabel)
|
||||
|
||||
if queryString, ok := runQueries.Queries[queryLabel]; ok {
|
||||
if queryString, ok := queries[queryLabel]; ok {
|
||||
return r.runChQuery(ctx, ch, queryString)
|
||||
}
|
||||
|
||||
zap.S().Errorf("ruleId: ", r.ID(), "\t invalid query label:", queryLabel, "\t queries:", runQueries.Queries)
|
||||
zap.S().Errorf("ruleId: ", r.ID(), "\t invalid query label:", queryLabel, "\t queries:", queries)
|
||||
return nil, fmt.Errorf("this is unexpected, invalid query label")
|
||||
}
|
||||
|
||||
|
24
pkg/query-service/utils/queryTemplate/vars.go
Normal file
24
pkg/query-service/utils/queryTemplate/vars.go
Normal file
@ -0,0 +1,24 @@
|
||||
package querytemplate
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
)
|
||||
|
||||
// AssignReservedVars assigns values for go template vars. assumes that
|
||||
// model.QueryRangeParamsV2.Start and End are Unix Nano timestamps
|
||||
func AssignReservedVars(metricsQueryRangeParams *model.QueryRangeParamsV2) {
|
||||
metricsQueryRangeParams.Variables["start_timestamp"] = metricsQueryRangeParams.Start / 1000
|
||||
metricsQueryRangeParams.Variables["end_timestamp"] = metricsQueryRangeParams.End / 1000
|
||||
|
||||
metricsQueryRangeParams.Variables["start_timestamp_ms"] = metricsQueryRangeParams.Start
|
||||
metricsQueryRangeParams.Variables["end_timestamp_ms"] = metricsQueryRangeParams.End
|
||||
|
||||
metricsQueryRangeParams.Variables["start_timestamp_nano"] = metricsQueryRangeParams.Start * 1e6
|
||||
metricsQueryRangeParams.Variables["end_timestamp_nano"] = metricsQueryRangeParams.End * 1e6
|
||||
|
||||
metricsQueryRangeParams.Variables["start_datetime"] = fmt.Sprintf("toDateTime(%d)", metricsQueryRangeParams.Start/1000)
|
||||
metricsQueryRangeParams.Variables["end_datetime"] = fmt.Sprintf("toDateTime(%d)", metricsQueryRangeParams.End/1000)
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user