mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-15 15:45:56 +08:00
Add support for Delta temporality (#2505)
* Revert "upgraded some deprecated packages (#2423)" This reverts commit c4b052c51ec3a22e25923aa92fb66665e6ca589b. * chore: delta working with QB * chore: use enum * Revert "Revert "upgraded some deprecated packages (#2423)"" This reverts commit d88f3594a62582a16fa04528a56ff075f628bf4e. * chore: add test * chore: add delta for query range v3 * chore: explicit check for temporality * chore: fix tests * chore: conditionally fetch temporality when --prefer-delta is set
This commit is contained in:
parent
6b77165d09
commit
216499051d
@ -17,6 +17,7 @@ import (
|
|||||||
type APIHandlerOptions struct {
|
type APIHandlerOptions struct {
|
||||||
DataConnector interfaces.DataConnector
|
DataConnector interfaces.DataConnector
|
||||||
SkipConfig *basemodel.SkipConfig
|
SkipConfig *basemodel.SkipConfig
|
||||||
|
PreferDelta bool
|
||||||
AppDao dao.ModelDao
|
AppDao dao.ModelDao
|
||||||
RulesManager *rules.Manager
|
RulesManager *rules.Manager
|
||||||
FeatureFlags baseint.FeatureLookup
|
FeatureFlags baseint.FeatureLookup
|
||||||
@ -34,6 +35,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
|
|||||||
baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{
|
baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{
|
||||||
Reader: opts.DataConnector,
|
Reader: opts.DataConnector,
|
||||||
SkipConfig: opts.SkipConfig,
|
SkipConfig: opts.SkipConfig,
|
||||||
|
PerferDelta: opts.PreferDelta,
|
||||||
AppDao: opts.AppDao,
|
AppDao: opts.AppDao,
|
||||||
RuleManager: opts.RulesManager,
|
RuleManager: opts.RulesManager,
|
||||||
FeatureFlags: opts.FeatureFlags})
|
FeatureFlags: opts.FeatureFlags})
|
||||||
|
@ -56,6 +56,7 @@ type ServerOptions struct {
|
|||||||
// alert specific params
|
// alert specific params
|
||||||
DisableRules bool
|
DisableRules bool
|
||||||
RuleRepoURL string
|
RuleRepoURL string
|
||||||
|
PreferDelta bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Server runs HTTP api service
|
// Server runs HTTP api service
|
||||||
@ -170,6 +171,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
apiOpts := api.APIHandlerOptions{
|
apiOpts := api.APIHandlerOptions{
|
||||||
DataConnector: reader,
|
DataConnector: reader,
|
||||||
SkipConfig: skipConfig,
|
SkipConfig: skipConfig,
|
||||||
|
PreferDelta: serverOptions.PreferDelta,
|
||||||
AppDao: modelDao,
|
AppDao: modelDao,
|
||||||
RulesManager: rm,
|
RulesManager: rm,
|
||||||
FeatureFlags: lm,
|
FeatureFlags: lm,
|
||||||
|
@ -83,10 +83,12 @@ func main() {
|
|||||||
var ruleRepoURL string
|
var ruleRepoURL string
|
||||||
|
|
||||||
var enableQueryServiceLogOTLPExport bool
|
var enableQueryServiceLogOTLPExport bool
|
||||||
|
var preferDelta bool
|
||||||
|
|
||||||
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
|
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
|
||||||
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
|
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
|
||||||
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
|
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
|
||||||
|
flag.BoolVar(&preferDelta, "prefer-delta", false, "(prefer delta over raw metrics)")
|
||||||
flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)")
|
flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)")
|
||||||
flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)")
|
flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
@ -102,6 +104,7 @@ func main() {
|
|||||||
HTTPHostPort: baseconst.HTTPHostPort,
|
HTTPHostPort: baseconst.HTTPHostPort,
|
||||||
PromConfigPath: promConfigPath,
|
PromConfigPath: promConfigPath,
|
||||||
SkipTopLvlOpsPath: skipTopLvlOpsPath,
|
SkipTopLvlOpsPath: skipTopLvlOpsPath,
|
||||||
|
PreferDelta: preferDelta,
|
||||||
PrivateHostPort: baseconst.PrivateHostPort,
|
PrivateHostPort: baseconst.PrivateHostPort,
|
||||||
DisableRules: disableRules,
|
DisableRules: disableRules,
|
||||||
RuleRepoURL: ruleRepoURL,
|
RuleRepoURL: ruleRepoURL,
|
||||||
|
@ -3249,6 +3249,32 @@ func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context)
|
|||||||
return spansInLastHeartBeatInterval, nil
|
return spansInLastHeartBeatInterval, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) {
|
||||||
|
|
||||||
|
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
|
||||||
|
|
||||||
|
query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN [$1]`, signozMetricDBName, signozTSTableName)
|
||||||
|
|
||||||
|
rows, err := r.db.Query(ctx, query, metricNames)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var metricName, temporality string
|
||||||
|
err := rows.Scan(&metricName, &temporality)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if _, ok := metricNameToTemporality[metricName]; !ok {
|
||||||
|
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
|
||||||
|
}
|
||||||
|
metricNameToTemporality[metricName][v3.Temporality(temporality)] = true
|
||||||
|
}
|
||||||
|
return metricNameToTemporality, nil
|
||||||
|
}
|
||||||
|
|
||||||
// func sum(array []tsByMetricName) uint64 {
|
// func sum(array []tsByMetricName) uint64 {
|
||||||
// var result uint64
|
// var result uint64
|
||||||
// result = 0
|
// result = 0
|
||||||
|
@ -71,6 +71,7 @@ type APIHandler struct {
|
|||||||
featureFlags interfaces.FeatureLookup
|
featureFlags interfaces.FeatureLookup
|
||||||
ready func(http.HandlerFunc) http.HandlerFunc
|
ready func(http.HandlerFunc) http.HandlerFunc
|
||||||
queryBuilder *queryBuilder.QueryBuilder
|
queryBuilder *queryBuilder.QueryBuilder
|
||||||
|
preferDelta bool
|
||||||
|
|
||||||
// SetupCompleted indicates if SigNoz is ready for general use.
|
// SetupCompleted indicates if SigNoz is ready for general use.
|
||||||
// at the moment, we mark the app ready when the first user
|
// at the moment, we mark the app ready when the first user
|
||||||
@ -84,6 +85,8 @@ type APIHandlerOpts struct {
|
|||||||
Reader interfaces.Reader
|
Reader interfaces.Reader
|
||||||
|
|
||||||
SkipConfig *model.SkipConfig
|
SkipConfig *model.SkipConfig
|
||||||
|
|
||||||
|
PerferDelta bool
|
||||||
// dao layer to perform crud on app objects like dashboard, alerts etc
|
// dao layer to perform crud on app objects like dashboard, alerts etc
|
||||||
AppDao dao.ModelDao
|
AppDao dao.ModelDao
|
||||||
|
|
||||||
@ -106,6 +109,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
|||||||
reader: opts.Reader,
|
reader: opts.Reader,
|
||||||
appDao: opts.AppDao,
|
appDao: opts.AppDao,
|
||||||
skipConfig: opts.SkipConfig,
|
skipConfig: opts.SkipConfig,
|
||||||
|
preferDelta: opts.PerferDelta,
|
||||||
alertManager: alertManager,
|
alertManager: alertManager,
|
||||||
ruleManager: opts.RuleManager,
|
ruleManager: opts.RuleManager,
|
||||||
featureFlags: opts.FeatureFlags,
|
featureFlags: opts.FeatureFlags,
|
||||||
@ -452,6 +456,48 @@ func (aH *APIHandler) metricAutocompleteTagValue(w http.ResponseWriter, r *http.
|
|||||||
aH.Respond(w, tagValueList)
|
aH.Respond(w, tagValueList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
|
||||||
|
|
||||||
|
metricNames := make([]string, 0)
|
||||||
|
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
|
||||||
|
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
|
||||||
|
for _, query := range qp.CompositeQuery.BuilderQueries {
|
||||||
|
if query.DataSource == v3.DataSourceMetrics {
|
||||||
|
metricNames = append(metricNames, query.AggregateAttribute.Key)
|
||||||
|
if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok {
|
||||||
|
metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if aH.preferDelta {
|
||||||
|
zap.S().Debug("fetching metric temporality")
|
||||||
|
metricNameToTemporality, err = aH.reader.FetchTemporality(ctx, metricNames)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
|
||||||
|
for name := range qp.CompositeQuery.BuilderQueries {
|
||||||
|
query := qp.CompositeQuery.BuilderQueries[name]
|
||||||
|
if query.DataSource == v3.DataSourceMetrics {
|
||||||
|
if aH.preferDelta && metricNameToTemporality[query.AggregateAttribute.Key][v3.Delta] {
|
||||||
|
query.Temporality = v3.Delta
|
||||||
|
} else if metricNameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] {
|
||||||
|
query.Temporality = v3.Cumulative
|
||||||
|
} else {
|
||||||
|
query.Temporality = v3.Unspecified
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request) {
|
||||||
metricsQueryRangeParams, apiErrorObj := parser.ParseMetricQueryRangeParams(r)
|
metricsQueryRangeParams, apiErrorObj := parser.ParseMetricQueryRangeParams(r)
|
||||||
|
|
||||||
@ -2776,6 +2822,15 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add temporality for each metric
|
||||||
|
|
||||||
|
temporalityErr := aH.addTemporality(r.Context(), queryRangeParams)
|
||||||
|
if temporalityErr != nil {
|
||||||
|
zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr)
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
aH.queryRangeV3(r.Context(), queryRangeParams, w, r)
|
aH.queryRangeV3(r.Context(), queryRangeParams, w, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
174
pkg/query-service/app/metrics/v3/delta.go
Normal file
174
pkg/query-service/app/metrics/v3/delta.go
Normal file
@ -0,0 +1,174 @@
|
|||||||
|
package v3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
func buildDeltaMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName string) (string, error) {
|
||||||
|
|
||||||
|
metricQueryGroupBy := mq.GroupBy
|
||||||
|
|
||||||
|
// if the aggregate operator is a histogram quantile, and user has not forgotten
|
||||||
|
// the le tag in the group by then add the le tag to the group by
|
||||||
|
if mq.AggregateOperator == v3.AggregateOperatorHistQuant50 ||
|
||||||
|
mq.AggregateOperator == v3.AggregateOperatorHistQuant75 ||
|
||||||
|
mq.AggregateOperator == v3.AggregateOperatorHistQuant90 ||
|
||||||
|
mq.AggregateOperator == v3.AggregateOperatorHistQuant95 ||
|
||||||
|
mq.AggregateOperator == v3.AggregateOperatorHistQuant99 {
|
||||||
|
found := false
|
||||||
|
for _, tag := range mq.GroupBy {
|
||||||
|
if tag.Key == "le" {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
metricQueryGroupBy = append(
|
||||||
|
metricQueryGroupBy,
|
||||||
|
v3.AttributeKey{
|
||||||
|
Key: "le",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
IsColumn: false,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if mq.Filters != nil {
|
||||||
|
mq.Filters.Items = append(mq.Filters.Items, v3.FilterItem{
|
||||||
|
Key: v3.AttributeKey{Key: "__temporality__"},
|
||||||
|
Operator: v3.FilterOperatorEqual,
|
||||||
|
Value: "Delta",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
|
||||||
|
|
||||||
|
// Select the aggregate value for interval
|
||||||
|
queryTmpl :=
|
||||||
|
"SELECT %s" +
|
||||||
|
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
|
||||||
|
" %s as value" +
|
||||||
|
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||||
|
" GLOBAL INNER JOIN" +
|
||||||
|
" (%s) as filtered_time_series" +
|
||||||
|
" USING fingerprint" +
|
||||||
|
" WHERE " + samplesTableTimeFilter +
|
||||||
|
" GROUP BY %s" +
|
||||||
|
" ORDER BY %s ts"
|
||||||
|
|
||||||
|
// tagsWithoutLe is used to group by all tags except le
|
||||||
|
// This is done because we want to group by le only when we are calculating quantile
|
||||||
|
// Otherwise, we want to group by all tags except le
|
||||||
|
tagsWithoutLe := []string{}
|
||||||
|
for _, tag := range mq.GroupBy {
|
||||||
|
if tag.Key != "le" {
|
||||||
|
tagsWithoutLe = append(tagsWithoutLe, tag.Key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
groupByWithoutLe := groupBy(tagsWithoutLe...)
|
||||||
|
groupTagsWithoutLe := groupSelect(tagsWithoutLe...)
|
||||||
|
orderWithoutLe := orderBy(mq.OrderBy, tagsWithoutLe)
|
||||||
|
|
||||||
|
groupBy := groupByAttributeKeyTags(metricQueryGroupBy...)
|
||||||
|
groupTags := groupSelectAttributeKeyTags(metricQueryGroupBy...)
|
||||||
|
orderBy := orderByAttributeKeyTags(mq.OrderBy, metricQueryGroupBy)
|
||||||
|
|
||||||
|
if len(orderBy) != 0 {
|
||||||
|
orderBy += ","
|
||||||
|
}
|
||||||
|
if len(orderWithoutLe) != 0 {
|
||||||
|
orderWithoutLe += ","
|
||||||
|
}
|
||||||
|
|
||||||
|
switch mq.AggregateOperator {
|
||||||
|
case v3.AggregateOperatorRate:
|
||||||
|
// Calculate rate of change of metric for each unique time series
|
||||||
|
groupBy = "fingerprint, ts"
|
||||||
|
orderBy = "fingerprint, "
|
||||||
|
groupTags = "fingerprint,"
|
||||||
|
op := fmt.Sprintf("sum(value)/%d", step)
|
||||||
|
query := fmt.Sprintf(
|
||||||
|
queryTmpl, "any(labels) as fullLabels, "+groupTags, step, op, filterSubQuery, groupBy, orderBy,
|
||||||
|
) // labels will be same so any should be fine
|
||||||
|
|
||||||
|
return query, nil
|
||||||
|
case v3.AggregateOperatorSumRate, v3.AggregateOperatorAvgRate, v3.AggregateOperatorMaxRate, v3.AggregateOperatorMinRate:
|
||||||
|
op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step)
|
||||||
|
query := fmt.Sprintf(
|
||||||
|
queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy,
|
||||||
|
)
|
||||||
|
return query, nil
|
||||||
|
case
|
||||||
|
v3.AggregateOperatorRateSum,
|
||||||
|
v3.AggregateOperatorRateMax,
|
||||||
|
v3.AggregateOperatorRateAvg,
|
||||||
|
v3.AggregateOperatorRateMin:
|
||||||
|
op := fmt.Sprintf("%s(value)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], step)
|
||||||
|
query := fmt.Sprintf(
|
||||||
|
queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy,
|
||||||
|
)
|
||||||
|
return query, nil
|
||||||
|
case
|
||||||
|
v3.AggregateOperatorP05,
|
||||||
|
v3.AggregateOperatorP10,
|
||||||
|
v3.AggregateOperatorP20,
|
||||||
|
v3.AggregateOperatorP25,
|
||||||
|
v3.AggregateOperatorP50,
|
||||||
|
v3.AggregateOperatorP75,
|
||||||
|
v3.AggregateOperatorP90,
|
||||||
|
v3.AggregateOperatorP95,
|
||||||
|
v3.AggregateOperatorP99:
|
||||||
|
op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator])
|
||||||
|
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy)
|
||||||
|
return query, nil
|
||||||
|
case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99:
|
||||||
|
op := fmt.Sprintf("sum(value)/%d", step)
|
||||||
|
query := fmt.Sprintf(
|
||||||
|
queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy,
|
||||||
|
) // labels will be same so any should be fine
|
||||||
|
value := aggregateOperatorToPercentile[mq.AggregateOperator]
|
||||||
|
|
||||||
|
query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, orderWithoutLe)
|
||||||
|
return query, nil
|
||||||
|
case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax:
|
||||||
|
op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator])
|
||||||
|
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy)
|
||||||
|
return query, nil
|
||||||
|
case v3.AggregateOperatorCount:
|
||||||
|
op := "toFloat64(count(*))"
|
||||||
|
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy)
|
||||||
|
return query, nil
|
||||||
|
case v3.AggregateOperatorCountDistinct:
|
||||||
|
op := "toFloat64(count(distinct(value)))"
|
||||||
|
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy)
|
||||||
|
return query, nil
|
||||||
|
case v3.AggregateOperatorNoOp:
|
||||||
|
queryTmpl :=
|
||||||
|
"SELECT fingerprint, labels as fullLabels," +
|
||||||
|
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
|
||||||
|
" any(value) as value" +
|
||||||
|
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||||
|
" GLOBAL INNER JOIN" +
|
||||||
|
" (%s) as filtered_time_series" +
|
||||||
|
" USING fingerprint" +
|
||||||
|
" WHERE " + samplesTableTimeFilter +
|
||||||
|
" GROUP BY fingerprint, labels, ts" +
|
||||||
|
" ORDER BY fingerprint, labels, ts"
|
||||||
|
query := fmt.Sprintf(queryTmpl, step, filterSubQuery)
|
||||||
|
return query, nil
|
||||||
|
default:
|
||||||
|
return "", fmt.Errorf("unsupported aggregate operator")
|
||||||
|
}
|
||||||
|
}
|
@ -44,13 +44,19 @@ var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{
|
|||||||
}
|
}
|
||||||
|
|
||||||
// See https://github.com/SigNoz/signoz/issues/2151#issuecomment-1467249056
|
// See https://github.com/SigNoz/signoz/issues/2151#issuecomment-1467249056
|
||||||
var rateWithoutNegative = `if (runningDifference(value) < 0 OR runningDifference(ts) <= 0, nan, runningDifference(value)/runningDifference(ts))`
|
var rateWithoutNegative = `if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) `
|
||||||
|
|
||||||
// buildMetricsTimeSeriesFilterQuery builds the sub-query to be used for filtering
|
// buildMetricsTimeSeriesFilterQuery builds the sub-query to be used for filtering
|
||||||
// timeseries based on search criteria
|
// timeseries based on search criteria
|
||||||
func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.AttributeKey, metricName string, aggregateOperator v3.AggregateOperator) (string, error) {
|
func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.AttributeKey, mq *v3.BuilderQuery) (string, error) {
|
||||||
|
metricName := mq.AggregateAttribute.Key
|
||||||
|
aggregateOperator := mq.AggregateOperator
|
||||||
var conditions []string
|
var conditions []string
|
||||||
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(metricName)))
|
if mq.Temporality == v3.Delta {
|
||||||
|
conditions = append(conditions, fmt.Sprintf("metric_name = %s AND temporality = '%s' ", utils.ClickHouseFormattedValue(metricName), v3.Delta))
|
||||||
|
} else {
|
||||||
|
conditions = append(conditions, fmt.Sprintf("metric_name = %s AND temporality IN ['%s', '%s']", utils.ClickHouseFormattedValue(metricName), v3.Cumulative, v3.Unspecified))
|
||||||
|
}
|
||||||
|
|
||||||
if fs != nil && len(fs.Items) != 0 {
|
if fs != nil && len(fs.Items) != 0 {
|
||||||
for _, item := range fs.Items {
|
for _, item := range fs.Items {
|
||||||
@ -157,7 +163,7 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq.AggregateAttribute.Key, mq.AggregateOperator)
|
filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -413,7 +419,13 @@ func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v
|
|||||||
}
|
}
|
||||||
|
|
||||||
func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery) (string, error) {
|
func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery) (string, error) {
|
||||||
query, err := buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
var query string
|
||||||
|
var err error
|
||||||
|
if mq.Temporality == v3.Delta {
|
||||||
|
query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||||
|
} else {
|
||||||
|
query, err = buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -57,7 +57,7 @@ func TestBuildQueryWithFilters(t *testing.T) {
|
|||||||
query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"])
|
query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Contains(t, query, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'a') != 'b'")
|
require.Contains(t, query, "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'a') != 'b'")
|
||||||
require.Contains(t, query, rateWithoutNegative)
|
require.Contains(t, query, rateWithoutNegative)
|
||||||
require.Contains(t, query, "not match(JSONExtractString(labels, 'code'), 'ERROR_*')")
|
require.Contains(t, query, "not match(JSONExtractString(labels, 'code'), 'ERROR_*')")
|
||||||
})
|
})
|
||||||
@ -93,7 +93,7 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) {
|
|||||||
query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"])
|
query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Contains(t, query, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']")
|
require.Contains(t, query, "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'in') IN ['a','b','c']")
|
||||||
require.Contains(t, query, rateWithoutNegative)
|
require.Contains(t, query, rateWithoutNegative)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -228,7 +228,12 @@ func TestBuildQueryOperators(t *testing.T) {
|
|||||||
|
|
||||||
for i, tc := range testCases {
|
for i, tc := range testCases {
|
||||||
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
|
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
|
||||||
whereClause, err := buildMetricsTimeSeriesFilterQuery(&tc.filterSet, []v3.AttributeKey{}, "signoz_calls_total", "sum")
|
mq := v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
AggregateAttribute: v3.AttributeKey{Key: "signoz_calls_total"},
|
||||||
|
AggregateOperator: v3.AggregateOperatorSum,
|
||||||
|
}
|
||||||
|
whereClause, err := buildMetricsTimeSeriesFilterQuery(&tc.filterSet, []v3.AttributeKey{}, &mq)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Contains(t, whereClause, tc.expectedWhereClause)
|
require.Contains(t, whereClause, tc.expectedWhereClause)
|
||||||
})
|
})
|
||||||
@ -238,7 +243,7 @@ func TestBuildQueryOperators(t *testing.T) {
|
|||||||
func TestBuildQueryXRate(t *testing.T) {
|
func TestBuildQueryXRate(t *testing.T) {
|
||||||
t.Run("TestBuildQueryXRate", func(t *testing.T) {
|
t.Run("TestBuildQueryXRate", func(t *testing.T) {
|
||||||
|
|
||||||
tmpl := `SELECT ts, %s(value) as value FROM (SELECT ts, if (runningDifference(value) < 0 OR runningDifference(ts) <= 0, nan, runningDifference(value)/runningDifference(ts))as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 0 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'name') as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY GROUPING SETS ( (ts), () ) ORDER BY ts`
|
tmpl := `SELECT ts, %s(value) as value FROM (SELECT ts, if(runningDifference(ts) <= 0, nan, if(runningDifference(value) < 0, (value) / runningDifference(ts), runningDifference(value) / runningDifference(ts))) as value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 0 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(value) = 0) GROUP BY GROUPING SETS ( (ts), () ) ORDER BY ts`
|
||||||
|
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
aggregateOperator v3.AggregateOperator
|
aggregateOperator v3.AggregateOperator
|
||||||
|
@ -51,7 +51,7 @@ func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Contains(t, queries["C"], "SELECT A.ts as ts, A.value / B.value")
|
require.Contains(t, queries["C"], "SELECT A.ts as ts, A.value / B.value")
|
||||||
require.Contains(t, queries["C"], "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']")
|
require.Contains(t, queries["C"], "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'in') IN ['a','b','c']")
|
||||||
require.Contains(t, queries["C"], "runningDifference(value) / runningDifference(ts)")
|
require.Contains(t, queries["C"], "runningDifference(value) / runningDifference(ts)")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -205,3 +205,164 @@ func TestBuildQueryWithThreeOrMoreQueriesRefAndFormula(t *testing.T) {
|
|||||||
// So(strings.Count(queries["F5"], " ON "), ShouldEqual, 1)
|
// So(strings.Count(queries["F5"], " ON "), ShouldEqual, 1)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeltaQueryBuilder(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
query *v3.QueryRangeParamsV3
|
||||||
|
queryToTest string
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "TestQueryWithName - Request rate",
|
||||||
|
query: &v3.QueryRangeParamsV3{
|
||||||
|
Start: 1650991982000,
|
||||||
|
End: 1651078382000,
|
||||||
|
Step: 60,
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
QueryType: v3.QueryTypeBuilder,
|
||||||
|
PanelType: v3.PanelTypeGraph,
|
||||||
|
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||||
|
"A": {
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
QueryName: "A",
|
||||||
|
AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_count"},
|
||||||
|
StepInterval: 60,
|
||||||
|
AggregateOperator: v3.AggregateOperatorSumRate,
|
||||||
|
Expression: "A",
|
||||||
|
Temporality: v3.Delta,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{Key: "service_name"},
|
||||||
|
Operator: v3.FilterOperatorIn,
|
||||||
|
Value: []interface{}{"frontend"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{Key: "operation"},
|
||||||
|
Operator: v3.FilterOperatorIn,
|
||||||
|
Value: []interface{}{"HTTP GET /dispatch"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryToTest: "A",
|
||||||
|
expected: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY ts ORDER BY ts",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "TestQueryWithExpression - Error rate",
|
||||||
|
query: &v3.QueryRangeParamsV3{
|
||||||
|
Start: 1650991982000,
|
||||||
|
End: 1651078382000,
|
||||||
|
Step: 60,
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
QueryType: v3.QueryTypeBuilder,
|
||||||
|
PanelType: v3.PanelTypeGraph,
|
||||||
|
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||||
|
"A": {
|
||||||
|
QueryName: "A",
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_count"},
|
||||||
|
StepInterval: 60,
|
||||||
|
AggregateOperator: v3.AggregateOperatorSumRate,
|
||||||
|
Expression: "A",
|
||||||
|
Temporality: v3.Delta,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{Key: "service_name"},
|
||||||
|
Operator: v3.FilterOperatorIn,
|
||||||
|
Value: []interface{}{"frontend"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{Key: "operation"},
|
||||||
|
Operator: v3.FilterOperatorIn,
|
||||||
|
Value: []interface{}{"HTTP GET /dispatch"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{Key: "status_code"},
|
||||||
|
Operator: v3.FilterOperatorIn,
|
||||||
|
Value: []interface{}{"STATUS_CODE_ERROR"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"B": {
|
||||||
|
QueryName: "B",
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_count"},
|
||||||
|
StepInterval: 60,
|
||||||
|
AggregateOperator: v3.AggregateOperatorSumRate,
|
||||||
|
Expression: "B",
|
||||||
|
Temporality: v3.Delta,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{Key: "service_name"},
|
||||||
|
Operator: v3.FilterOperatorIn,
|
||||||
|
Value: []interface{}{"frontend"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{Key: "operation"},
|
||||||
|
Operator: v3.FilterOperatorIn,
|
||||||
|
Value: []interface{}{"HTTP GET /dispatch"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"C": {
|
||||||
|
QueryName: "C",
|
||||||
|
Expression: "A*100/B",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryToTest: "C",
|
||||||
|
expected: "SELECT A.ts as ts, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY ts ORDER BY ts) as A GLOBAL INNER JOIN(SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY ts ORDER BY ts) as B ON A.ts = B.ts",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "TestQuery - Quantile",
|
||||||
|
query: &v3.QueryRangeParamsV3{
|
||||||
|
Start: 1650991982000,
|
||||||
|
End: 1651078382000,
|
||||||
|
Step: 60,
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
QueryType: v3.QueryTypeBuilder,
|
||||||
|
PanelType: v3.PanelTypeGraph,
|
||||||
|
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||||
|
"A": {
|
||||||
|
QueryName: "A",
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{Key: "signoz_latency_bucket"},
|
||||||
|
StepInterval: 60,
|
||||||
|
AggregateOperator: v3.AggregateOperatorHistQuant95,
|
||||||
|
Expression: "A",
|
||||||
|
Temporality: v3.Delta,
|
||||||
|
GroupBy: []v3.AttributeKey{
|
||||||
|
{Key: "service_name"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryToTest: "A",
|
||||||
|
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 GLOBAL INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.distributed_time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991982000 AND timestamp_ms <= 1651078382000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range cases {
|
||||||
|
t.Run(c.name, func(t *testing.T) {
|
||||||
|
qbOptions := QueryBuilderOptions{
|
||||||
|
BuildMetricQuery: metricsv3.PrepareMetricQuery,
|
||||||
|
}
|
||||||
|
qb := NewQueryBuilder(qbOptions)
|
||||||
|
queries, err := qb.PrepareQueries(c.query)
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, c.expected, queries[c.queryToTest])
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -48,6 +48,7 @@ type ServerOptions struct {
|
|||||||
// alert specific params
|
// alert specific params
|
||||||
DisableRules bool
|
DisableRules bool
|
||||||
RuleRepoURL string
|
RuleRepoURL string
|
||||||
|
PreferDelta bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Server runs HTTP, Mux and a grpc server
|
// Server runs HTTP, Mux and a grpc server
|
||||||
@ -125,6 +126,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
|||||||
apiHandler, err := NewAPIHandler(APIHandlerOpts{
|
apiHandler, err := NewAPIHandler(APIHandlerOpts{
|
||||||
Reader: reader,
|
Reader: reader,
|
||||||
SkipConfig: skipConfig,
|
SkipConfig: skipConfig,
|
||||||
|
PerferDelta: serverOptions.PreferDelta,
|
||||||
AppDao: dao.DB(),
|
AppDao: dao.DB(),
|
||||||
RuleManager: rm,
|
RuleManager: rm,
|
||||||
FeatureFlags: fm,
|
FeatureFlags: fm,
|
||||||
|
@ -56,6 +56,7 @@ type Reader interface {
|
|||||||
// Setter Interfaces
|
// Setter Interfaces
|
||||||
SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
|
SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
|
||||||
|
|
||||||
|
FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error)
|
||||||
GetMetricAutocompleteMetricNames(ctx context.Context, matchText string, limit int) (*[]string, *model.ApiError)
|
GetMetricAutocompleteMetricNames(ctx context.Context, matchText string, limit int) (*[]string, *model.ApiError)
|
||||||
GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError)
|
GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError)
|
||||||
GetMetricAutocompleteTagValue(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError)
|
GetMetricAutocompleteTagValue(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError)
|
||||||
|
@ -34,9 +34,12 @@ func main() {
|
|||||||
// the url used to build link in the alert messages in slack and other systems
|
// the url used to build link in the alert messages in slack and other systems
|
||||||
var ruleRepoURL string
|
var ruleRepoURL string
|
||||||
|
|
||||||
|
var preferDelta bool
|
||||||
|
|
||||||
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
|
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
|
||||||
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
|
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
|
||||||
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
|
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
|
||||||
|
flag.BoolVar(&preferDelta, "prefer-delta", false, "(prefer delta over gauge)")
|
||||||
flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)")
|
flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
@ -51,6 +54,7 @@ func main() {
|
|||||||
HTTPHostPort: constants.HTTPHostPort,
|
HTTPHostPort: constants.HTTPHostPort,
|
||||||
PromConfigPath: promConfigPath,
|
PromConfigPath: promConfigPath,
|
||||||
SkipTopLvlOpsPath: skipTopLvlOpsPath,
|
SkipTopLvlOpsPath: skipTopLvlOpsPath,
|
||||||
|
PreferDelta: preferDelta,
|
||||||
PrivateHostPort: constants.PrivateHostPort,
|
PrivateHostPort: constants.PrivateHostPort,
|
||||||
DisableRules: disableRules,
|
DisableRules: disableRules,
|
||||||
RuleRepoURL: ruleRepoURL,
|
RuleRepoURL: ruleRepoURL,
|
||||||
|
@ -417,12 +417,21 @@ func (c *CompositeQuery) Validate() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Temporality string
|
||||||
|
|
||||||
|
const (
|
||||||
|
Unspecified Temporality = "Unspecified"
|
||||||
|
Delta Temporality = "Delta"
|
||||||
|
Cumulative Temporality = "Cumulative"
|
||||||
|
)
|
||||||
|
|
||||||
type BuilderQuery struct {
|
type BuilderQuery struct {
|
||||||
QueryName string `json:"queryName"`
|
QueryName string `json:"queryName"`
|
||||||
StepInterval int64 `json:"stepInterval"`
|
StepInterval int64 `json:"stepInterval"`
|
||||||
DataSource DataSource `json:"dataSource"`
|
DataSource DataSource `json:"dataSource"`
|
||||||
AggregateOperator AggregateOperator `json:"aggregateOperator"`
|
AggregateOperator AggregateOperator `json:"aggregateOperator"`
|
||||||
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
|
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
|
||||||
|
Temporality Temporality `json:"temporality,omitempty"`
|
||||||
Filters *FilterSet `json:"filters,omitempty"`
|
Filters *FilterSet `json:"filters,omitempty"`
|
||||||
GroupBy []AttributeKey `json:"groupBy,omitempty"`
|
GroupBy []AttributeKey `json:"groupBy,omitempty"`
|
||||||
Expression string `json:"expression"`
|
Expression string `json:"expression"`
|
||||||
|
Loading…
x
Reference in New Issue
Block a user