feat: metrics query range v3 (#2265)

This commit is contained in:
Srikanth Chekuri 2023-03-23 19:45:15 +05:30 committed by GitHub
parent c3763032df
commit 17a5bc8cc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1843 additions and 17 deletions

View File

@ -3767,6 +3767,145 @@ func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3
return &attributeValues, nil
}
func readRow(vars []interface{}, columnNames []string) ([]string, map[string]string, v3.Point) {
// Each row will have a value and a timestamp, and an optional list of label values
// example: {Timestamp: ..., Value: ...}
// The timestamp may also not present in some cases where the time series is reduced to single value
var point v3.Point
// groupBy is a container to hold label values for the current point
// example: ["frontend", "/fetch"]
var groupBy []string
// groupAttributes is a container to hold the key-value pairs for the current
// metric point.
// example: {"serviceName": "frontend", "operation": "/fetch"}
groupAttributes := make(map[string]string)
for idx, v := range vars {
colName := columnNames[idx]
switch v := v.(type) {
case *string:
// special case for returning all labels in metrics datasource
if colName == "fullLabels" {
var metric map[string]string
err := json.Unmarshal([]byte(*v), &metric)
if err != nil {
zap.S().Errorf("unexpected error encountered %v", err)
}
for key, val := range metric {
groupBy = append(groupBy, val)
groupAttributes[key] = val
}
} else {
groupBy = append(groupBy, *v)
groupAttributes[colName] = *v
}
case *time.Time:
point.Timestamp = v.UnixMilli()
case *float64, *float32:
if _, ok := constants.ReservedColumnTargetAliases[colName]; ok {
point.Value = float64(reflect.ValueOf(v).Elem().Float())
} else {
groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Float()))
groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Float())
}
case *uint8, *uint64, *uint16, *uint32:
if _, ok := constants.ReservedColumnTargetAliases[colName]; ok {
point.Value = float64(reflect.ValueOf(v).Elem().Uint())
} else {
groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint()))
groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint())
}
case *int8, *int16, *int32, *int64:
if _, ok := constants.ReservedColumnTargetAliases[colName]; ok {
point.Value = float64(reflect.ValueOf(v).Elem().Int())
} else {
groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int()))
groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int())
}
default:
zap.S().Errorf("unsupported var type %v found in metric builder query result for column %s", v, colName)
}
}
return groupBy, groupAttributes, point
}
func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNames []string) ([]*v3.Series, error) {
// when groupBy is applied, each combination of cartesian product
// of attribute values is a separate series. Each item in seriesToPoints
// represent a unique series where the key is sorted attribute values joined
// by "," and the value is the list of points for that series
// For instance, group by (serviceName, operation)
// with two services and three operations in each will result in (maximum of) 6 series
// ("frontend", "order") x ("/fetch", "/fetch/{Id}", "/order")
//
// ("frontend", "/fetch")
// ("frontend", "/fetch/{Id}")
// ("frontend", "/order")
// ("order", "/fetch")
// ("order", "/fetch/{Id}")
// ("order", "/order")
seriesToPoints := make(map[string][]v3.Point)
// seriesToAttrs is a mapping of key to a map of attribute key to attribute value
// for each series. This is used to populate the series' attributes
// For instance, for the above example, the seriesToAttrs will be
// {
// "frontend,/fetch": {"serviceName": "frontend", "operation": "/fetch"},
// "frontend,/fetch/{Id}": {"serviceName": "frontend", "operation": "/fetch/{Id}"},
// "frontend,/order": {"serviceName": "frontend", "operation": "/order"},
// "order,/fetch": {"serviceName": "order", "operation": "/fetch"},
// "order,/fetch/{Id}": {"serviceName": "order", "operation": "/fetch/{Id}"},
// "order,/order": {"serviceName": "order", "operation": "/order"},
// }
seriesToAttrs := make(map[string]map[string]string)
for rows.Next() {
if err := rows.Scan(vars...); err != nil {
return nil, err
}
groupBy, groupAttributes, metricPoint := readRow(vars, columnNames)
sort.Strings(groupBy)
key := strings.Join(groupBy, "")
seriesToAttrs[key] = groupAttributes
seriesToPoints[key] = append(seriesToPoints[key], metricPoint)
}
var seriesList []*v3.Series
for key := range seriesToPoints {
series := v3.Series{Labels: seriesToAttrs[key], Points: seriesToPoints[key]}
seriesList = append(seriesList, &series)
}
return seriesList, nil
}
// GetTimeSeriesResultV3 runs the query and returns list of time series
func (r *ClickHouseReader) GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error) {
defer utils.Elapsed("GetTimeSeriesResultV3", query)()
rows, err := r.db.Query(ctx, query)
if err != nil {
zap.S().Errorf("error while reading time series result %v", err)
return nil, err
}
defer rows.Close()
var (
columnTypes = rows.ColumnTypes()
columnNames = rows.Columns()
vars = make([]interface{}, len(columnTypes))
)
for i := range columnTypes {
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}
return readRowsForTimeSeriesResult(rows, vars, columnNames)
}
func (r *ClickHouseReader) CheckClickHouse(ctx context.Context) error {
rows, err := r.db.Query(ctx, "SELECT 1")
if err != nil {

View File

@ -22,6 +22,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/explorer"
"go.signoz.io/signoz/pkg/query-service/app/logs"
"go.signoz.io/signoz/pkg/query-service/app/metrics"
metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
"go.signoz.io/signoz/pkg/query-service/app/parser"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/constants"
@ -36,6 +37,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/rules"
"go.signoz.io/signoz/pkg/query-service/telemetry"
"go.signoz.io/signoz/pkg/query-service/version"
"go.uber.org/multierr"
"go.uber.org/zap"
)
@ -63,6 +65,7 @@ type APIHandler struct {
ruleManager *rules.Manager
featureFlags interfaces.FeatureLookup
ready func(http.HandlerFunc) http.HandlerFunc
queryBuilder *queryBuilder
// SetupCompleted indicates if SigNoz is ready for general use.
// at the moment, we mark the app ready when the first user
@ -101,6 +104,17 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
featureFlags: opts.FeatureFlags,
}
builderOpts := queryBuilderOptions{
BuildMetricQuery: metricsv3.PrepareMetricQuery,
BuildTraceQuery: func(start, end, step int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error) {
return "", errors.New("not implemented")
},
BuildLogQuery: func(start, end, step int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error) {
return "", errors.New("not implemented")
},
}
aH.queryBuilder = NewQueryBuilder(builderOpts)
aH.ready = aH.testReady
dashboards.LoadDashboardFiles()
@ -244,6 +258,7 @@ func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMid
subRouter.HandleFunc("/autocomplete/aggregate_attributes", am.ViewAccess(aH.autocompleteAggregateAttributes)).Methods(http.MethodGet)
subRouter.HandleFunc("/autocomplete/attribute_keys", am.ViewAccess(aH.autoCompleteAttributeKeys)).Methods(http.MethodGet)
subRouter.HandleFunc("/autocomplete/attribute_values", am.ViewAccess(aH.autoCompleteAttributeValues)).Methods(http.MethodGet)
subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeV3)).Methods(http.MethodPost)
}
func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) {
@ -2424,3 +2439,186 @@ func (aH *APIHandler) autoCompleteAttributeValues(w http.ResponseWriter, r *http
aH.Respond(w, response)
}
func (aH *APIHandler) execClickHouseQueries(ctx context.Context, queries map[string]string) ([]*v3.Result, error, map[string]string) {
type channelResult struct {
Series []*v3.Series
Err error
Name string
Query string
}
ch := make(chan channelResult, len(queries))
var wg sync.WaitGroup
for name, query := range queries {
wg.Add(1)
go func(name, query string) {
defer wg.Done()
seriesList, err := aH.reader.GetTimeSeriesResultV3(ctx, query)
if err != nil {
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query}
return
}
ch <- channelResult{Series: seriesList, Name: name, Query: query}
}(name, query)
}
wg.Wait()
close(ch)
var errs []error
errQuriesByName := make(map[string]string)
res := make([]*v3.Result, 0)
// read values from the channel
for r := range ch {
if r.Err != nil {
errs = append(errs, r.Err)
errQuriesByName[r.Name] = r.Query
continue
}
res = append(res, &v3.Result{
QueryName: r.Name,
Series: r.Series,
})
}
if len(errs) != 0 {
return nil, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)), errQuriesByName
}
return res, nil, nil
}
func (aH *APIHandler) execPromQueries(ctx context.Context, metricsQueryRangeParams *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]string) {
type channelResult struct {
Series []*v3.Series
Err error
Name string
Query string
}
ch := make(chan channelResult, len(metricsQueryRangeParams.CompositeQuery.PromQueries))
var wg sync.WaitGroup
for name, query := range metricsQueryRangeParams.CompositeQuery.PromQueries {
if query.Disabled {
continue
}
wg.Add(1)
go func(name string, query *v3.PromQuery) {
var seriesList []*v3.Series
defer wg.Done()
tmpl := template.New("promql-query")
tmpl, tmplErr := tmpl.Parse(query.Query)
if tmplErr != nil {
ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query}
return
}
var queryBuf bytes.Buffer
tmplErr = tmpl.Execute(&queryBuf, metricsQueryRangeParams.Variables)
if tmplErr != nil {
ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query}
return
}
query.Query = queryBuf.String()
queryModel := model.QueryRangeParams{
Start: time.UnixMilli(metricsQueryRangeParams.Start),
End: time.UnixMilli(metricsQueryRangeParams.End),
Step: time.Duration(metricsQueryRangeParams.Step * int64(time.Second)),
Query: query.Query,
}
promResult, _, err := aH.reader.GetQueryRangeResult(ctx, &queryModel)
if err != nil {
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query.Query}
return
}
matrix, _ := promResult.Matrix()
for _, v := range matrix {
var s v3.Series
s.Labels = v.Metric.Copy().Map()
for _, p := range v.Points {
s.Points = append(s.Points, v3.Point{Timestamp: p.T, Value: p.V})
}
seriesList = append(seriesList, &s)
}
ch <- channelResult{Series: seriesList, Name: name, Query: query.Query}
}(name, query)
}
wg.Wait()
close(ch)
var errs []error
errQuriesByName := make(map[string]string)
res := make([]*v3.Result, 0)
// read values from the channel
for r := range ch {
if r.Err != nil {
errs = append(errs, r.Err)
errQuriesByName[r.Name] = r.Query
continue
}
res = append(res, &v3.Result{
QueryName: r.Name,
Series: r.Series,
})
}
if len(errs) != 0 {
return nil, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)), errQuriesByName
}
return res, nil, nil
}
func (aH *APIHandler) queryRangeV3(queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) {
var result []*v3.Result
var err error
var errQuriesByName map[string]string
switch queryRangeParams.CompositeQuery.QueryType {
case v3.QueryTypeBuilder:
queries, err := aH.queryBuilder.prepareQueries(queryRangeParams)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
result, err, errQuriesByName = aH.execClickHouseQueries(r.Context(), queries)
case v3.QueryTypeClickHouseSQL:
queries := make(map[string]string)
for name, query := range queryRangeParams.CompositeQuery.ClickHouseQueries {
if query.Disabled {
continue
}
queries[name] = query.Query
}
result, err, errQuriesByName = aH.execClickHouseQueries(r.Context(), queries)
case v3.QueryTypePromQL:
result, err, errQuriesByName = aH.execPromQueries(r.Context(), queryRangeParams)
default:
err = fmt.Errorf("invalid query type")
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, errQuriesByName)
return
}
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}
func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
queryRangeParams, apiErrorObj := ParseQueryRangeParams(r)
if apiErrorObj != nil {
zap.S().Errorf(apiErrorObj.Err.Error())
RespondError(w, apiErrorObj, nil)
return
}
aH.queryRangeV3(queryRangeParams, w, r)
}

View File

@ -0,0 +1,363 @@
package v3
import (
"fmt"
"strings"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
)
var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{
v3.AggregateOperatorP05: 0.5,
v3.AggregateOperatorP10: 0.10,
v3.AggregateOperatorP20: 0.20,
v3.AggregateOperatorP25: 0.25,
v3.AggregateOperatorP50: 0.50,
v3.AggregateOperatorP75: 0.75,
v3.AggregateOperatorP90: 0.90,
v3.AggregateOperatorP95: 0.95,
v3.AggregateOperatorP99: 0.99,
v3.AggregateOperatorHistQuant50: 0.50,
v3.AggregateOperatorHistQuant75: 0.75,
v3.AggregateOperatorHistQuant90: 0.90,
v3.AggregateOperatorHistQuant95: 0.95,
v3.AggregateOperatorHistQuant99: 0.99,
}
var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{
v3.AggregateOperatorAvg: "avg",
v3.AggregateOperatorMax: "max",
v3.AggregateOperatorMin: "min",
v3.AggregateOperatorSum: "sum",
v3.AggregateOperatorRateSum: "sum",
v3.AggregateOperatorRateAvg: "avg",
v3.AggregateOperatorRateMax: "max",
v3.AggregateOperatorRateMin: "min",
}
// See https://github.com/SigNoz/signoz/issues/2151#issuecomment-1467249056
var rateWithoutNegative = `if (runningDifference(value) < 0 OR runningDifference(ts) < 0, nan, runningDifference(value)/runningDifference(ts))`
// buildMetricsTimeSeriesFilterQuery builds the sub-query to be used for filtering
// timeseries based on search criteria
func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.AttributeKey, metricName string, aggregateOperator v3.AggregateOperator) (string, error) {
var conditions []string
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(metricName)))
if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
toFormat := item.Value
op := strings.ToLower(strings.TrimSpace(item.Operator))
// if the received value is an array for like/match op, just take the first value
if op == "like" || op == "match" || op == "nlike" || op == "nmatch" {
x, ok := item.Value.([]interface{})
if ok {
if len(x) == 0 {
continue
}
toFormat = x[0]
}
}
fmtVal := utils.ClickHouseFormattedValue(toFormat)
switch op {
case "eq":
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal))
case "neq":
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal))
case "in":
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal))
case "nin":
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal))
case "like":
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case "nlike":
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case "match":
conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case "nmatch":
conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case "gt":
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal))
case "gte":
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal))
case "lt":
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal))
case "lte":
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal))
case "contains":
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case "ncontains":
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case "exists":
conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), %s)", item.Key.Key))
case "nexists":
conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), %s)", item.Key.Key))
default:
return "", fmt.Errorf("unsupported operation")
}
}
}
queryString := strings.Join(conditions, " AND ")
var selectLabels string
if aggregateOperator == v3.AggregateOperatorNoOp || aggregateOperator == v3.AggregateOperatorRate {
selectLabels = "labels,"
} else {
for _, tag := range groupTags {
selectLabels += fmt.Sprintf(" JSONExtractString(labels, '%s') as %s,", tag.Key, tag.Key)
}
}
filterSubQuery := fmt.Sprintf("SELECT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_TABLENAME, queryString)
return filterSubQuery, nil
}
func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName string) (string, error) {
filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, mq.AggregateAttribute.Key, mq.AggregateOperator)
if err != nil {
return "", err
}
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
// Select the aggregate value for interval
queryTmpl :=
"SELECT %s" +
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
" %s as value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
" GLOBAL INNER JOIN" +
" (%s) as filtered_time_series" +
" USING fingerprint" +
" WHERE " + samplesTableTimeFilter +
" GROUP BY %s" +
" ORDER BY %s ts"
// tagsWithoutLe is used to group by all tags except le
// This is done because we want to group by le only when we are calculating quantile
// Otherwise, we want to group by all tags except le
tagsWithoutLe := []string{}
for _, tag := range mq.GroupBy {
if tag.Key != "le" {
tagsWithoutLe = append(tagsWithoutLe, tag.Key)
}
}
groupByWithoutLe := groupBy(tagsWithoutLe...)
groupTagsWithoutLe := groupSelect(tagsWithoutLe...)
orderWithoutLe := orderBy(mq.OrderBy, tagsWithoutLe)
groupBy := groupByAttributeKeyTags(mq.GroupBy...)
groupTags := groupSelectAttributeKeyTags(mq.GroupBy...)
orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
if len(orderBy) != 0 {
orderBy += ","
}
switch mq.AggregateOperator {
case v3.AggregateOperatorRate:
// Calculate rate of change of metric for each unique time series
groupBy = "fingerprint, ts"
groupTags = "fingerprint,"
op := "max(value)" // max value should be the closest value for point in time
subQuery := fmt.Sprintf(
queryTmpl, "any(labels) as labels, "+groupTags, step, op, filterSubQuery, groupBy, orderBy,
) // labels will be same so any should be fine
query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s)`
query = fmt.Sprintf(query, "labels as fullLabels,", subQuery)
return query, nil
case v3.AggregateOperatorSumRate:
rateGroupBy := "fingerprint, " + groupBy
rateGroupTags := "fingerprint, " + groupTags
rateOrderBy := "fingerprint, " + orderBy
op := "max(value)"
subQuery := fmt.Sprintf(
queryTmpl, rateGroupTags, step, op, filterSubQuery, rateGroupBy, rateOrderBy,
) // labels will be same so any should be fine
query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s)`
query = fmt.Sprintf(query, groupTags, subQuery)
query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, query, groupBy, orderBy)
return query, nil
case
v3.AggregateOperatorRateSum,
v3.AggregateOperatorRateMax,
v3.AggregateOperatorRateAvg,
v3.AggregateOperatorRateMin:
op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator])
subQuery := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy)
query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s)`
query = fmt.Sprintf(query, groupTags, subQuery)
return query, nil
case
v3.AggregateOperatorP05,
v3.AggregateOperatorP10,
v3.AggregateOperatorP20,
v3.AggregateOperatorP25,
v3.AggregateOperatorP50,
v3.AggregateOperatorP75,
v3.AggregateOperatorP90,
v3.AggregateOperatorP95,
v3.AggregateOperatorP99:
op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator])
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy)
return query, nil
case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99:
rateGroupBy := "fingerprint, " + groupBy
rateGroupTags := "fingerprint, " + groupTags
rateOrderBy := "fingerprint, " + orderBy
op := "max(value)"
subQuery := fmt.Sprintf(
queryTmpl, rateGroupTags, step, op, filterSubQuery, rateGroupBy, rateOrderBy,
) // labels will be same so any should be fine
query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s)`
query = fmt.Sprintf(query, groupTags, subQuery)
query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, query, groupBy, orderBy)
value := aggregateOperatorToPercentile[mq.AggregateOperator]
query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, orderWithoutLe)
return query, nil
case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax:
op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator])
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy)
return query, nil
case v3.AggregateOpeatorCount:
op := "toFloat64(count(*))"
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy)
return query, nil
case v3.AggregateOperatorCountDistinct:
op := "toFloat64(count(distinct(value)))"
query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy)
return query, nil
case v3.AggregateOperatorNoOp:
queryTmpl :=
"SELECT fingerprint, labels as fullLabels," +
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
" any(value) as value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
" GLOBAL INNER JOIN" +
" (%s) as filtered_time_series" +
" USING fingerprint" +
" WHERE " + samplesTableTimeFilter +
" GROUP BY fingerprint, labels, ts" +
" ORDER BY fingerprint, labels, ts"
query := fmt.Sprintf(queryTmpl, step, filterSubQuery)
return query, nil
default:
return "", fmt.Errorf("unsupported aggregate operator")
}
}
// groupBy returns a string of comma separated tags for group by clause
// `ts` is always added to the group by clause
func groupBy(tags ...string) string {
tags = append(tags, "ts")
return strings.Join(tags, ",")
}
// groupSelect returns a string of comma separated tags for select clause
func groupSelect(tags ...string) string {
groupTags := strings.Join(tags, ",")
if len(tags) != 0 {
groupTags += ", "
}
return groupTags
}
func groupByAttributeKeyTags(tags ...v3.AttributeKey) string {
groupTags := []string{}
for _, tag := range tags {
groupTags = append(groupTags, tag.Key)
}
return groupBy(groupTags...)
}
func groupSelectAttributeKeyTags(tags ...v3.AttributeKey) string {
groupTags := []string{}
for _, tag := range tags {
groupTags = append(groupTags, tag.Key)
}
return groupSelect(groupTags...)
}
// orderBy returns a string of comma separated tags for order by clause
// if the order is not specified, it defaults to ASC
func orderBy(items []v3.OrderBy, tags []string) string {
var orderBy []string
for _, tag := range tags {
found := false
for _, item := range items {
if item.ColumnName == tag {
found = true
orderBy = append(orderBy, fmt.Sprintf("%s %s", item.ColumnName, item.Order))
break
}
}
if !found {
orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag))
}
}
return strings.Join(orderBy, ",")
}
func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string {
var groupTags []string
for _, tag := range tags {
groupTags = append(groupTags, tag.Key)
}
return orderBy(items, groupTags)
}
func having(items []v3.Having) string {
var having []string
for _, item := range items {
having = append(having, fmt.Sprintf("%s %s %v", item.ColumnName, item.Operator, utils.ClickHouseFormattedValue(item.Value)))
}
return strings.Join(having, " AND ")
}
func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v3.AggregateOperator) (string, error) {
var selectLabels string
var groupBy string
// NOOP and RATE can possibly return multiple time series and reduce should be applied
// for each uniques series. When the final result contains more than one series we throw
// an error post DB fetching. Otherwise just return the single data. This is not known until queried so the
// the query is prepared accordingly.
if aggregateOperator == v3.AggregateOperatorNoOp || aggregateOperator == v3.AggregateOperatorRate {
selectLabels = ", any(fullLabels) as fullLabels"
groupBy = "GROUP BY fingerprint"
}
// the timestamp picked is not relevant here since the final value used is show the single
// chart with just the query value. For the quer
switch reduceTo {
case v3.ReduceToOperatorLast:
query = fmt.Sprintf("SELECT anyLast(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
case v3.ReduceToOperatorSum:
query = fmt.Sprintf("SELECT sum(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
case v3.ReduceToOperatorAvg:
query = fmt.Sprintf("SELECT avg(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
case v3.ReduceToOperatorMax:
query = fmt.Sprintf("SELECT max(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
case v3.ReduceToOperatorMin:
query = fmt.Sprintf("SELECT min(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
default:
return "", fmt.Errorf("unsupported reduce operator")
}
return query, nil
}
func PrepareMetricQuery(start, end, step int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery) (string, error) {
query, err := buildMetricQuery(start, end, step, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
if err != nil {
return "", err
}
if panelType == v3.PanelTypeValue {
query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator)
}
return query, err
}

View File

@ -0,0 +1,98 @@
package v3
import (
"testing"
"github.com/stretchr/testify/require"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestBuildQuery(t *testing.T) {
t.Run("TestSimpleQueryWithName", func(t *testing.T) {
q := &v3.QueryRangeParamsV3{
Start: 1650991982000,
End: 1651078382000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "name"},
AggregateOperator: v3.AggregateOperatorRateMax,
Expression: "A",
},
},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
},
}
query, err := PrepareMetricQuery(q.Start, q.End, q.Step, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"])
require.NoError(t, err)
require.Contains(t, query, "WHERE metric_name = 'name'")
})
}
func TestBuildQueryWithFilters(t *testing.T) {
t.Run("TestBuildQueryWithFilters", func(t *testing.T) {
q := &v3.QueryRangeParamsV3{
Start: 1650991982000,
End: 1651078382000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "name"},
Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "a"}, Value: "b", Operator: "neq"},
{Key: v3.AttributeKey{Key: "code"}, Value: "ERROR_*", Operator: "nmatch"},
}},
AggregateOperator: v3.AggregateOperatorRateMax,
Expression: "A",
},
},
},
}
query, err := PrepareMetricQuery(q.Start, q.End, q.Step, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"])
require.NoError(t, err)
require.Contains(t, query, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'a') != 'b'")
require.Contains(t, query, rateWithoutNegative)
require.Contains(t, query, "not match(JSONExtractString(labels, 'code'), 'ERROR_*')")
})
}
func TestBuildQueryWithMultipleQueries(t *testing.T) {
t.Run("TestBuildQueryWithFilters", func(t *testing.T) {
q := &v3.QueryRangeParamsV3{
Start: 1650991982000,
End: 1651078382000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "name"},
Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "in"}, Value: []interface{}{"a", "b", "c"}, Operator: "in"},
}},
AggregateOperator: v3.AggregateOperatorRateAvg,
Expression: "A",
},
"B": {
QueryName: "B",
AggregateAttribute: v3.AttributeKey{Key: "name2"},
AggregateOperator: v3.AggregateOperatorRateMax,
Expression: "B",
},
},
},
}
query, err := PrepareMetricQuery(q.Start, q.End, q.Step, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"])
require.NoError(t, err)
require.Contains(t, query, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']")
require.Contains(t, query, rateWithoutNegative)
})
}

View File

@ -1,6 +1,7 @@
package app
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@ -8,15 +9,21 @@ import (
"net/http"
"strconv"
"strings"
"text/template"
"time"
"github.com/SigNoz/govaluate"
"github.com/gorilla/mux"
promModel "github.com/prometheus/common/model"
"go.uber.org/multierr"
"go.signoz.io/signoz/pkg/query-service/app/metrics"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate"
)
var allowedFunctions = []string{"count", "ratePerSec", "sum", "avg", "min", "max", "p50", "p90", "p95", "p99"}
@ -902,3 +909,147 @@ func parseFilterAttributeValueRequest(r *http.Request) (*v3.FilterAttributeValue
}
return &req, nil
}
func validateQueryRangeParamsV3(qp *v3.QueryRangeParamsV3) error {
err := qp.CompositeQuery.Validate()
if err != nil {
return err
}
var expressions []string
for _, q := range qp.CompositeQuery.BuilderQueries {
expressions = append(expressions, q.Expression)
}
errs := validateExpressions(expressions, evalFuncs, qp.CompositeQuery)
if len(errs) > 0 {
return multierr.Combine(errs...)
}
return nil
}
// validateExpressions validates the math expressions using the list of
// allowed functions.
func validateExpressions(expressions []string, funcs map[string]govaluate.ExpressionFunction, cq *v3.CompositeQuery) []error {
var errs []error
for _, exp := range expressions {
evalExp, err := govaluate.NewEvaluableExpressionWithFunctions(exp, funcs)
if err != nil {
errs = append(errs, err)
continue
}
variables := evalExp.Vars()
for _, v := range variables {
var hasVariable bool
for _, q := range cq.BuilderQueries {
if q.Expression == v {
hasVariable = true
break
}
}
if !hasVariable {
errs = append(errs, fmt.Errorf("unknown variable %s", v))
}
}
}
return errs
}
func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiError) {
var queryRangeParams *v3.QueryRangeParamsV3
// parse the request body
if err := json.NewDecoder(r.Body).Decode(&queryRangeParams); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
}
// validate the request body
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
}
// prepare the variables for the corrspnding query type
formattedVars := make(map[string]interface{})
for name, value := range queryRangeParams.Variables {
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypePromQL {
formattedVars[name] = metrics.PromFormattedValue(value)
} else if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeClickHouseSQL {
formattedVars[name] = utils.ClickHouseFormattedValue(value)
}
}
// replace the variables in metrics builder filter item with actual value
// example: {"key": "host", "value": "{{ .host }}", "operator": "equals"} with
// variables {"host": "test"} will be replaced with {"key": "host", "value": "test", "operator": "equals"}
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
for _, query := range queryRangeParams.CompositeQuery.BuilderQueries {
if query.Filters == nil || len(query.Filters.Items) == 0 {
continue
}
for idx := range query.Filters.Items {
item := &query.Filters.Items[idx]
value := item.Value
if value != nil {
switch x := value.(type) {
case string:
variableName := strings.Trim(x, "{{ . }}")
if _, ok := queryRangeParams.Variables[variableName]; ok {
item.Value = queryRangeParams.Variables[variableName]
}
case []interface{}:
if len(x) > 0 {
switch x[0].(type) {
case string:
variableName := strings.Trim(x[0].(string), "{{ . }}")
if _, ok := queryRangeParams.Variables[variableName]; ok {
item.Value = queryRangeParams.Variables[variableName]
}
}
}
}
}
}
}
}
queryRangeParams.Variables = formattedVars
// prometheus instant query needs same timestamp
if queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeValue &&
queryRangeParams.CompositeQuery.QueryType == v3.QueryTypePromQL {
queryRangeParams.Start = queryRangeParams.End
}
// round up the end to neaerest multiple
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
end := (queryRangeParams.End) / 1000
step := queryRangeParams.Step
queryRangeParams.End = (end / step * step) * 1000
}
// replace go template variables in clickhouse query
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeClickHouseSQL {
for _, chQuery := range queryRangeParams.CompositeQuery.ClickHouseQueries {
if chQuery.Disabled {
continue
}
tmpl := template.New("clickhouse-query")
tmpl, err := tmpl.Parse(chQuery.Query)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
}
var query bytes.Buffer
// replace go template variables
querytemplate.AssignReservedVarsV3(queryRangeParams)
err = tmpl.Execute(&query, queryRangeParams.Variables)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
}
chQuery.Query = query.String()
}
}
return queryRangeParams, nil
}

View File

@ -2,14 +2,18 @@ package app
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/smartystreets/assertions/should"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/app/metrics"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
@ -337,3 +341,419 @@ func TestParseFilterAttributeValueRequest(t *testing.T) {
assert.Equal(t, reqCase.expectedSearchText, filterAttrRequest.SearchText)
}
}
func TestParseQueryRangeParamsCompositeQuery(t *testing.T) {
reqCases := []struct {
desc string
compositeQuery v3.CompositeQuery
expectErr bool
errMsg string
}{
{
desc: "no query in request",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypeClickHouseSQL,
},
expectErr: true,
errMsg: "composite query must contain at least one query",
},
{
desc: "invalid panel type",
compositeQuery: v3.CompositeQuery{
PanelType: "invalid",
QueryType: v3.QueryTypeClickHouseSQL,
ClickHouseQueries: map[string]*v3.ClickHouseQuery{
"A": {
Query: "query",
Disabled: false,
},
},
},
expectErr: true,
errMsg: "panel type is invalid",
},
{
desc: "invalid query type",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: "invalid",
ClickHouseQueries: map[string]*v3.ClickHouseQuery{
"A": {
Query: "query",
Disabled: false,
},
},
},
expectErr: true,
errMsg: "query type is invalid",
},
{
desc: "invalid prometheus query",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{
"A": {
Query: "",
Disabled: false,
},
},
},
expectErr: true,
errMsg: "query is empty",
},
{
desc: "invalid clickhouse query",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypeClickHouseSQL,
ClickHouseQueries: map[string]*v3.ClickHouseQuery{
"A": {
Query: "",
Disabled: false,
},
},
},
expectErr: true,
errMsg: "query is empty",
},
{
desc: "invalid prometheus query with disabled query",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{
"A": {
Query: "",
Disabled: true,
},
},
},
expectErr: true,
errMsg: "query is empty",
},
{
desc: "invalid clickhouse query with disabled query",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypeClickHouseSQL,
ClickHouseQueries: map[string]*v3.ClickHouseQuery{
"A": {
Query: "",
Disabled: true,
},
},
},
expectErr: true,
errMsg: "query is empty",
},
{
desc: "valid prometheus query",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{
"A": {
Query: "http_calls_total",
Disabled: false,
},
},
},
expectErr: false,
},
{
desc: "invalid builder query without query name",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"": {
QueryName: "",
Expression: "A",
},
},
},
expectErr: true,
errMsg: "query name is required",
},
{
desc: "invalid data source for builder query",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: "invalid",
Expression: "A",
},
},
},
expectErr: true,
errMsg: "data source is invalid",
},
{
desc: "invalid aggregate operator for builder query",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: "metrics",
AggregateOperator: "invalid",
Expression: "A",
},
},
},
expectErr: true,
errMsg: "aggregate operator is invalid",
},
{
desc: "invalid aggregate attribute for builder query",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: "traces",
AggregateOperator: "sum",
AggregateAttribute: v3.AttributeKey{},
Expression: "A",
},
},
},
expectErr: true,
errMsg: "aggregate attribute is required",
},
{
desc: "invalid group by attribute for builder query",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: "logs",
AggregateOperator: "sum",
AggregateAttribute: v3.AttributeKey{Key: "attribute"},
GroupBy: []v3.AttributeKey{{Key: ""}},
Expression: "A",
},
},
},
expectErr: true,
errMsg: "builder query A is invalid: group by is invalid",
},
}
for _, tc := range reqCases {
t.Run(tc.desc, func(t *testing.T) {
queryRangeParams := &v3.QueryRangeParamsV3{
Start: time.Now().Add(-time.Hour).UnixMilli(),
End: time.Now().UnixMilli(),
Step: time.Minute.Microseconds(),
CompositeQuery: &tc.compositeQuery,
Variables: map[string]interface{}{},
}
body := &bytes.Buffer{}
err := json.NewEncoder(body).Encode(queryRangeParams)
require.NoError(t, err)
req := httptest.NewRequest(http.MethodPost, "/api/v3/query_range", body)
_, apiErr := ParseQueryRangeParams(req)
if tc.expectErr {
require.Error(t, apiErr)
require.Contains(t, apiErr.Error(), tc.errMsg)
} else {
require.Nil(t, apiErr)
}
})
}
}
func TestParseQueryRangeParamsExpressions(t *testing.T) {
reqCases := []struct {
desc string
compositeQuery v3.CompositeQuery
expectErr bool
errMsg string
}{
{
desc: "invalid expression",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSum,
AggregateAttribute: v3.AttributeKey{Key: "attribute_metrics"},
Expression: "A +",
},
},
},
expectErr: true,
errMsg: "Unexpected end of expression",
},
{
desc: "invalid expression",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: v3.DataSourceLogs,
AggregateOperator: v3.AggregateOperatorSum,
AggregateAttribute: v3.AttributeKey{Key: "attribute_logs"},
Expression: "A",
},
"F1": {
QueryName: "F1",
Expression: "A + B",
},
},
},
expectErr: true,
errMsg: "unknown variable B",
},
{
desc: "invalid expression",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: v3.DataSourceLogs,
AggregateOperator: v3.AggregateOperatorSum,
AggregateAttribute: v3.AttributeKey{Key: "attribute_logs"},
Expression: "A",
},
"F1": {
QueryName: "F1",
Expression: "A + B + C",
},
},
},
expectErr: true,
errMsg: "unknown variable B; unknown variable C",
},
}
for _, tc := range reqCases {
t.Run(tc.desc, func(t *testing.T) {
queryRangeParams := &v3.QueryRangeParamsV3{
Start: time.Now().Add(-time.Hour).UnixMilli(),
End: time.Now().UnixMilli(),
Step: time.Minute.Microseconds(),
CompositeQuery: &tc.compositeQuery,
Variables: map[string]interface{}{},
}
body := &bytes.Buffer{}
err := json.NewEncoder(body).Encode(queryRangeParams)
require.NoError(t, err)
req := httptest.NewRequest(http.MethodPost, "/api/v3/query_range", body)
_, apiErr := ParseQueryRangeParams(req)
if tc.expectErr {
if apiErr == nil {
t.Fatalf("expected error %s, got nil", tc.errMsg)
}
require.Error(t, apiErr)
require.Contains(t, apiErr.Error(), tc.errMsg)
} else {
require.Nil(t, apiErr)
}
})
}
}
func TestParseQueryRangeParamsDashboardVarsSubstitution(t *testing.T) {
reqCases := []struct {
desc string
compositeQuery v3.CompositeQuery
variables map[string]interface{}
expectErr bool
errMsg string
expectedValue []interface{}
}{
{
desc: "valid builder query with dashboard variables",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSum,
AggregateAttribute: v3.AttributeKey{Key: "attribute_metrics"},
Expression: "A",
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "service_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag},
Operator: "EQ",
Value: "{{.service_name}}",
},
{
Key: v3.AttributeKey{Key: "operation_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag},
Operator: "IN",
Value: "{{.operation_name}}",
},
},
},
},
},
},
variables: map[string]interface{}{
"service_name": "route",
"operation_name": []interface{}{
"GET /route",
"POST /route",
},
},
expectErr: false,
expectedValue: []interface{}{"route", []interface{}{"GET /route", "POST /route"}},
},
}
for _, tc := range reqCases {
t.Run(tc.desc, func(t *testing.T) {
queryRangeParams := &v3.QueryRangeParamsV3{
Start: time.Now().Add(-time.Hour).UnixMilli(),
End: time.Now().UnixMilli(),
Step: time.Minute.Microseconds(),
CompositeQuery: &tc.compositeQuery,
Variables: tc.variables,
}
body := &bytes.Buffer{}
err := json.NewEncoder(body).Encode(queryRangeParams)
require.NoError(t, err)
req := httptest.NewRequest(http.MethodPost, "/api/v3/query_range", body)
parsedQueryRangeParams, apiErr := ParseQueryRangeParams(req)
if tc.expectErr {
require.Error(t, apiErr)
require.Contains(t, apiErr.Error(), tc.errMsg)
} else {
fmt.Println(apiErr)
require.Nil(t, apiErr)
require.Equal(t, parsedQueryRangeParams.CompositeQuery.BuilderQueries["A"].Filters.Items[0].Value, tc.expectedValue[0])
require.Equal(t, parsedQueryRangeParams.CompositeQuery.BuilderQueries["A"].Filters.Items[1].Value, tc.expectedValue[1])
}
})
}
}

View File

@ -0,0 +1,179 @@
package app
import (
"fmt"
"strings"
"github.com/SigNoz/govaluate"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.uber.org/zap"
)
var SupportedFunctions = []string{
"exp",
"log",
"ln",
"exp2",
"log2",
"exp10",
"log10",
"sqrt",
"cbrt",
"erf",
"erfc",
"lgamma",
"tgamma",
"sin",
"cos",
"tan",
"asin",
"acos",
"atan",
"degrees",
"radians",
}
var evalFuncs = map[string]govaluate.ExpressionFunction{}
type prepareTracesQueryFunc func(start, end, step int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error)
type prepareLogsQueryFunc func(start, end, step int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error)
type prepareMetricQueryFunc func(start, end, step int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error)
type queryBuilder struct {
options queryBuilderOptions
}
type queryBuilderOptions struct {
BuildTraceQuery prepareTracesQueryFunc
BuildLogQuery prepareLogsQueryFunc
BuildMetricQuery prepareMetricQueryFunc
}
func NewQueryBuilder(options queryBuilderOptions) *queryBuilder {
return &queryBuilder{
options: options,
}
}
func init() {
for _, fn := range SupportedFunctions {
evalFuncs[fn] = func(args ...interface{}) (interface{}, error) {
return nil, nil
}
}
}
// unique returns the unique values in the slice
func unique(slice []string) []string {
keys := make(map[string]struct{})
list := []string{}
for _, entry := range slice {
if _, value := keys[entry]; !value {
keys[entry] = struct{}{}
list = append(list, entry)
}
}
return list
}
// expressionToQuery constructs the query for the expression
func expressionToQuery(qp *v3.QueryRangeParamsV3, varToQuery map[string]string, expression *govaluate.EvaluableExpression) (string, error) {
var formulaQuery string
variables := unique(expression.Vars())
var modified []govaluate.ExpressionToken
tokens := expression.Tokens()
for idx := range tokens {
token := tokens[idx]
if token.Kind == govaluate.VARIABLE {
token.Value = fmt.Sprintf("%v.value", token.Value)
token.Meta = fmt.Sprintf("%v.value", token.Meta)
}
modified = append(modified, token)
}
// err should be nil here since the expression is already validated
formula, _ := govaluate.NewEvaluableExpressionFromTokens(modified)
var formulaSubQuery string
var joinUsing string
var prevVar string
for idx, variable := range variables {
query := varToQuery[variable]
groupTags := []string{}
for _, tag := range qp.CompositeQuery.BuilderQueries[variable].GroupBy {
groupTags = append(groupTags, tag.Key)
}
groupTags = append(groupTags, "ts")
if joinUsing == "" {
for _, tag := range groupTags {
joinUsing += fmt.Sprintf("%s.%s as %s, ", variable, tag, tag)
}
joinUsing = strings.TrimSuffix(joinUsing, ", ")
}
formulaSubQuery += fmt.Sprintf("(%s) as %s ", query, variable)
if idx > 0 {
formulaSubQuery += " ON "
for _, tag := range groupTags {
formulaSubQuery += fmt.Sprintf("%s.%s = %s.%s AND ", prevVar, tag, variable, tag)
}
formulaSubQuery = strings.TrimSuffix(formulaSubQuery, " AND ")
}
if idx < len(variables)-1 {
formulaSubQuery += " GLOBAL INNER JOIN"
}
prevVar = variable
}
formulaQuery = fmt.Sprintf("SELECT %s, %s as value FROM ", joinUsing, formula.ExpressionString()) + formulaSubQuery
return formulaQuery, nil
}
func (qb *queryBuilder) prepareQueries(params *v3.QueryRangeParamsV3) (map[string]string, error) {
queries := make(map[string]string)
compositeQuery := params.CompositeQuery
if compositeQuery != nil {
// Build queries for each builder query
for queryName, query := range compositeQuery.BuilderQueries {
if query.Expression == queryName {
switch query.DataSource {
case v3.DataSourceTraces:
queryString, err := qb.options.BuildTraceQuery(params.Start, params.End, params.Step, compositeQuery.QueryType, compositeQuery.PanelType, query)
if err != nil {
return nil, err
}
queries[queryName] = queryString
case v3.DataSourceLogs:
queryString, err := qb.options.BuildLogQuery(params.Start, params.End, params.Step, compositeQuery.QueryType, compositeQuery.PanelType, query)
if err != nil {
return nil, err
}
queries[queryName] = queryString
case v3.DataSourceMetrics:
queryString, err := qb.options.BuildMetricQuery(params.Start, params.End, params.Step, compositeQuery.QueryType, compositeQuery.PanelType, query)
if err != nil {
return nil, err
}
queries[queryName] = queryString
default:
zap.S().Errorf("Unknown data source %s", query.DataSource)
}
}
}
// Build queries for each expression
for _, query := range compositeQuery.BuilderQueries {
if query.Expression != query.QueryName {
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, evalFuncs)
queryString, err := expressionToQuery(params, queries, expression)
if err != nil {
return nil, err
}
queries[query.QueryName] = queryString
}
}
}
return queries, nil
}

View File

@ -0,0 +1,198 @@
package app
import (
"strings"
"testing"
"github.com/stretchr/testify/require"
metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) {
t.Run("TestBuildQueryWithFilters", func(t *testing.T) {
q := &v3.QueryRangeParamsV3{
Start: 1650991982000,
End: 1651078382000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{Key: "name"},
Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "in"}, Value: []interface{}{"a", "b", "c"}, Operator: "in"},
}},
AggregateOperator: v3.AggregateOperatorRateMax,
Expression: "A",
},
"B": {
AggregateAttribute: v3.AttributeKey{Key: "name2"},
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorRateAvg,
Expression: "B",
},
"C": {
QueryName: "C",
Expression: "A/B",
},
},
},
}
qbOptions := queryBuilderOptions{
BuildMetricQuery: metricsv3.PrepareMetricQuery,
}
qb := NewQueryBuilder(qbOptions)
queries, err := qb.prepareQueries(q)
require.NoError(t, err)
require.Contains(t, queries["C"], "SELECT A.ts as ts, A.value / B.value")
require.Contains(t, queries["C"], "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']")
require.Contains(t, queries["C"], "runningDifference(value)/runningDifference(ts)")
})
}
func TestBuildQueryWithIncorrectQueryRef(t *testing.T) {
t.Run("TestBuildQueryWithFilters", func(t *testing.T) {
q := &v3.QueryRangeParamsV3{
Start: 1650991982000,
End: 1651078382000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{Key: "name"},
Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "in"}, Value: []interface{}{"a", "b", "c"}, Operator: "in"},
}},
AggregateOperator: v3.AggregateOperatorRateMax,
Expression: "A",
},
"C": {
QueryName: "C",
Expression: "A*2",
},
},
},
}
qbOptions := queryBuilderOptions{
BuildMetricQuery: metricsv3.PrepareMetricQuery,
}
qb := NewQueryBuilder(qbOptions)
_, err := qb.prepareQueries(q)
require.NoError(t, err)
})
}
func TestBuildQueryWithThreeOrMoreQueriesRefAndFormula(t *testing.T) {
t.Run("TestBuildQueryWithFilters", func(t *testing.T) {
q := &v3.QueryRangeParamsV3{
Start: 1650991982000,
End: 1651078382000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{Key: "name"},
Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "in"}, Value: []interface{}{"a", "b", "c"}, Operator: "in"},
}},
AggregateOperator: v3.AggregateOperatorRateMax,
Expression: "A",
Disabled: true,
},
"B": {
AggregateAttribute: v3.AttributeKey{Key: "name2"},
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorRateMax,
Expression: "B",
Disabled: true,
},
"C": {
AggregateAttribute: v3.AttributeKey{Key: "name3"},
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "C",
Disabled: true,
},
"F1": {
QueryName: "F1",
Expression: "A/B",
},
"F2": {
QueryName: "F2",
Expression: "A/(B+C)",
},
"F3": {
QueryName: "F3",
Expression: "A*A",
},
"F4": {
QueryName: "F4",
Expression: "A*B*C",
},
"F5": {
QueryName: "F5",
Expression: "((A - B) / B) * 100",
},
},
},
}
qbOptions := queryBuilderOptions{
BuildMetricQuery: metricsv3.PrepareMetricQuery,
}
qb := NewQueryBuilder(qbOptions)
queries, err := qb.prepareQueries(q)
require.NoError(t, err)
require.Contains(t, queries["F1"], "SELECT A.ts as ts, A.value / B.value")
require.Equal(t, 1, strings.Count(queries["F1"], " ON "))
require.Contains(t, queries["F2"], "SELECT A.ts as ts, A.value / (B.value + C.value)")
require.Equal(t, 2, strings.Count(queries["F2"], " ON "))
// Working with same query multiple times should not join on itself
require.NotContains(t, queries["F3"], " ON ")
require.Contains(t, queries["F4"], "SELECT A.ts as ts, A.value * B.value * C.value")
require.Equal(t, 2, strings.Count(queries["F4"], " ON "))
require.Contains(t, queries["F5"], "SELECT A.ts as ts, ((A.value - B.value) / B.value) * 100")
require.Equal(t, 1, strings.Count(queries["F5"], " ON "))
// res := PrepareBuilderMetricQueries(q, "table")
// So(res.Err, ShouldBeNil)
// queries := res.Queries
// So(len(queries), ShouldEqual, 5)
// So(queries["F1"], ShouldContainSubstring, "SELECT A.ts as ts, A.value / B.value")
// So(strings.Count(queries["F1"], " ON "), ShouldEqual, 1)
// So(queries["F2"], ShouldContainSubstring, "SELECT A.ts as ts, A.value / (B.value + C.value)")
// So(strings.Count(queries["F2"], " ON "), ShouldEqual, 2)
// // Working with same query multiple times should not join on itself
// So(queries["F3"], ShouldNotContainSubstring, " ON ")
// So(queries["F4"], ShouldContainSubstring, "SELECT A.ts as ts, A.value * B.value * C.value")
// // Number of times JOIN ON appears is N-1 where N is number of unique queries
// So(strings.Count(queries["F4"], " ON "), ShouldEqual, 2)
// So(queries["F5"], ShouldContainSubstring, "SELECT A.ts as ts, ((A.value - B.value) / B.value) * 100")
// So(strings.Count(queries["F5"], " ON "), ShouldEqual, 1)
})
}

View File

@ -213,7 +213,11 @@ const (
// ReservedColumnTargetAliases identifies result value from a user
// written clickhouse query. The column alias indcate which value is
// to be considered as final result (or target)
var ReservedColumnTargetAliases = map[string]bool{"result": true, "res": true, "value": true}
var ReservedColumnTargetAliases = map[string]struct{}{
"result": {},
"res": {},
"value": {},
}
const (
StringTagMapCol = "stringTagMap"

View File

@ -60,6 +60,7 @@ type Reader interface {
GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error)
GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error)
GetTotalSpans(ctx context.Context) (uint64, error)
GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error)

View File

@ -1,7 +1,9 @@
package v3
import (
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/google/uuid"
@ -248,11 +250,13 @@ func (a AttributeKey) Validate() error {
return fmt.Errorf("invalid attribute dataType: %s", a.DataType)
}
switch a.Type {
case AttributeKeyTypeResource, AttributeKeyTypeTag:
break
default:
return fmt.Errorf("invalid attribute type: %s", a.Type)
if a.IsColumn {
switch a.Type {
case AttributeKeyTypeResource, AttributeKeyTypeTag:
break
default:
return fmt.Errorf("invalid attribute type: %s", a.Type)
}
}
if a.Key == "" {
@ -410,16 +414,16 @@ func (b *BuilderQuery) Validate() error {
}
if b.GroupBy != nil {
for _, groupBy := range b.GroupBy {
if groupBy.Validate() != nil {
return fmt.Errorf("group by is invalid")
if err := groupBy.Validate(); err != nil {
return fmt.Errorf("group by is invalid %w", err)
}
}
}
if b.SelectColumns != nil {
for _, selectColumn := range b.SelectColumns {
if selectColumn.Validate() != nil {
return fmt.Errorf("select column is invalid")
if err := selectColumn.Validate(); err != nil {
return fmt.Errorf("select column is invalid %w", err)
}
}
}
@ -473,9 +477,9 @@ type QueryRangeResponse struct {
}
type Result struct {
QueryName string `json:"queryName"`
Series *Series `json:"series"`
List []*Row `json:"list"`
QueryName string `json:"queryName"`
Series []*Series `json:"series"`
List []*Row `json:"list"`
}
type Series struct {
@ -489,8 +493,14 @@ type Row struct {
}
type Point struct {
Timestamp int64 `json:"timestamp"`
Value float64 `json:"value"`
Timestamp int64
Value float64
}
// MarshalJSON implements json.Marshaler.
func (p *Point) MarshalJSON() ([]byte, error) {
v := strconv.FormatFloat(p.Value, 'f', -1, 64)
return json.Marshal(map[string]interface{}{"timestamp": p.Timestamp, "value": v})
}
// ExploreQuery is a query for the explore page

View File

@ -0,0 +1,47 @@
package utils
import (
"fmt"
"reflect"
"strings"
"go.uber.org/zap"
)
// ClickHouseFormattedValue formats the value to be used in clickhouse query
func ClickHouseFormattedValue(v interface{}) string {
switch x := v.(type) {
case int:
return fmt.Sprintf("%d", x)
case float32, float64:
return fmt.Sprintf("%f", x)
case string:
return fmt.Sprintf("'%s'", x)
case bool:
return fmt.Sprintf("%v", x)
case []interface{}:
if len(x) == 0 {
return ""
}
switch x[0].(type) {
case string:
str := "["
for idx, sVal := range x {
str += fmt.Sprintf("'%s'", sVal)
if idx != len(x)-1 {
str += ","
}
}
str += "]"
return str
case int, float32, float64, bool:
return strings.Join(strings.Fields(fmt.Sprint(x)), ",")
default:
zap.S().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x[0])))
return ""
}
default:
zap.S().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x)))
return ""
}
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
// AssignReservedVars assigns values for go template vars. assumes that
@ -22,3 +23,20 @@ func AssignReservedVars(metricsQueryRangeParams *model.QueryRangeParamsV2) {
metricsQueryRangeParams.Variables["end_datetime"] = fmt.Sprintf("toDateTime(%d)", metricsQueryRangeParams.End/1000)
}
// AssignReservedVars assigns values for go template vars. assumes that
// model.QueryRangeParamsV3.Start and End are Unix Nano timestamps
func AssignReservedVarsV3(metricsQueryRangeParams *v3.QueryRangeParamsV3) {
metricsQueryRangeParams.Variables["start_timestamp"] = metricsQueryRangeParams.Start / 1000
metricsQueryRangeParams.Variables["end_timestamp"] = metricsQueryRangeParams.End / 1000
metricsQueryRangeParams.Variables["start_timestamp_ms"] = metricsQueryRangeParams.Start
metricsQueryRangeParams.Variables["end_timestamp_ms"] = metricsQueryRangeParams.End
metricsQueryRangeParams.Variables["start_timestamp_nano"] = metricsQueryRangeParams.Start * 1e6
metricsQueryRangeParams.Variables["end_timestamp_nano"] = metricsQueryRangeParams.End * 1e6
metricsQueryRangeParams.Variables["start_datetime"] = fmt.Sprintf("toDateTime(%d)", metricsQueryRangeParams.Start/1000)
metricsQueryRangeParams.Variables["end_datetime"] = fmt.Sprintf("toDateTime(%d)", metricsQueryRangeParams.End/1000)
}

View File

@ -6,9 +6,9 @@ import (
"go.uber.org/zap"
)
func Elapsed(funcName string) func() {
func Elapsed(funcName string, args ...interface{}) func() {
start := time.Now()
return func() {
zap.S().Infof("%s took %v\n", funcName, time.Since(start))
zap.S().Infof("func %s took %v with args %v", funcName, time.Since(start), args)
}
}