mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 16:49:01 +08:00
Add v2 query range metrics API (#1020)
* Queryrange params tests * review suggestions, quantile, simple metric filter and some refactoring * Add value type support * Add supprot for re2 regex, refactor, update tests and other changes * chore: update govaluate dep to signoz/govaluate * chore: add name to grouping * chore: add support for NOOP * fix: make result format compatible with prom HTTP API * chore: update clickhouse server and update query builder to use new schema * chore: use metric_name in auto suggest APIs * chore: add reduce operator and new aggregate functions * chore: add support for not like op * chore: fix the dip at the end for incomplete time range * chore: rounddown the end to exclude the incomplete collection
This commit is contained in:
parent
cc18cc9087
commit
a733adad2c
@ -14,6 +14,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
@ -61,10 +62,8 @@ const (
|
||||
signozErrorIndexTable = "signoz_error_index"
|
||||
signozTraceTableName = "signoz_index_v2"
|
||||
signozMetricDBName = "signoz_metrics"
|
||||
signozSampleName = "samples"
|
||||
signozTSName = "time_series"
|
||||
signozSampleTableName = "samples"
|
||||
signozTSTableName = "time_series"
|
||||
signozSampleTableName = "samples_v2"
|
||||
signozTSTableName = "time_series_v2"
|
||||
|
||||
minTimespanForProgressiveSearch = time.Hour
|
||||
minTimespanForProgressiveSearchMargin = time.Minute
|
||||
@ -2368,7 +2367,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
||||
}
|
||||
|
||||
case constants.MetricsTTL:
|
||||
tableName = signozMetricDBName + "." + signozSampleName
|
||||
tableName = signozMetricDBName + "." + signozSampleTableName
|
||||
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
|
||||
@ -2607,7 +2606,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
|
||||
return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil
|
||||
|
||||
case constants.MetricsTTL:
|
||||
tableNameArray := []string{signozMetricDBName + "." + signozSampleName}
|
||||
tableNameArray := []string{signozMetricDBName + "." + signozSampleTableName}
|
||||
status, err := r.setTTLQueryStatus(ctx, tableNameArray)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -2726,16 +2725,16 @@ func (r *ClickHouseReader) GetMetricAutocompleteTagKey(ctx context.Context, para
|
||||
tagsWhereClause := ""
|
||||
|
||||
for key, val := range params.MetricTags {
|
||||
tagsWhereClause += fmt.Sprintf("AND JSONExtractString(labels,'%s') = '%s'", key, val)
|
||||
tagsWhereClause += fmt.Sprintf(" AND labels_object.%s = '%s' ", key, val)
|
||||
}
|
||||
// "select distinctTagKeys from (SELECT DISTINCT arrayJoin(tagKeys) distinctTagKeys from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from signoz_metrics.time_series WHERE JSONExtractString(labels,'__name__')='node_udp_queues')) WHERE distinctTagKeys ILIKE '%host%';"
|
||||
if len(params.Match) != 0 {
|
||||
query = fmt.Sprintf("select distinctTagKeys from (SELECT DISTINCT arrayJoin(tagKeys) distinctTagKeys from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from %s.%s WHERE JSONExtractString(labels,'__name__')=$1 %s)) WHERE distinctTagKeys ILIKE $2;", signozMetricDBName, signozTSTableName, tagsWhereClause)
|
||||
query = fmt.Sprintf("select distinctTagKeys from (SELECT DISTINCT arrayJoin(tagKeys) distinctTagKeys from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from %s.%s WHERE metric_name=$1 %s)) WHERE distinctTagKeys ILIKE $2;", signozMetricDBName, signozTSTableName, tagsWhereClause)
|
||||
|
||||
rows, err = r.db.Query(ctx, query, params.MetricName, fmt.Sprintf("%%%s%%", params.Match))
|
||||
|
||||
} else {
|
||||
query = fmt.Sprintf("select distinctTagKeys from (SELECT DISTINCT arrayJoin(tagKeys) distinctTagKeys from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from %s.%s WHERE JSONExtractString(labels,'__name__')=$1 %s ));", signozMetricDBName, signozTSTableName, tagsWhereClause)
|
||||
query = fmt.Sprintf("select distinctTagKeys from (SELECT DISTINCT arrayJoin(tagKeys) distinctTagKeys from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from %s.%s WHERE metric_name=$1 %s ));", signozMetricDBName, signozTSTableName, tagsWhereClause)
|
||||
|
||||
rows, err = r.db.Query(ctx, query, params.MetricName)
|
||||
}
|
||||
@ -2765,16 +2764,16 @@ func (r *ClickHouseReader) GetMetricAutocompleteTagValue(ctx context.Context, pa
|
||||
tagsWhereClause := ""
|
||||
|
||||
for key, val := range params.MetricTags {
|
||||
tagsWhereClause += fmt.Sprintf("AND JSONExtractString(labels,'%s') = '%s'", key, val)
|
||||
tagsWhereClause += fmt.Sprintf(" AND labels_object.%s = '%s' ", key, val)
|
||||
}
|
||||
|
||||
if len(params.Match) != 0 {
|
||||
query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels, $1)) from %s.%s WHERE JSONExtractString(labels,'__name__')=$2 %s AND JSONExtractString(labels, $1) ILIKE $3;", signozMetricDBName, signozTSTableName, tagsWhereClause)
|
||||
query = fmt.Sprintf("SELECT DISTINCT(labels_object.%s) from %s.%s WHERE metric_name=$1 %s AND labels_object.%s ILIKE $2;", params.TagKey, signozMetricDBName, signozTSTableName, tagsWhereClause, params.TagKey)
|
||||
|
||||
rows, err = r.db.Query(ctx, query, params.TagKey, params.MetricName, fmt.Sprintf("%%%s%%", params.Match))
|
||||
|
||||
} else {
|
||||
query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels, $1)) FROM %s.%s WHERE JSONExtractString(labels,'__name__')=$2 %s;", signozMetricDBName, signozTSTableName, tagsWhereClause)
|
||||
query = fmt.Sprintf("SELECT DISTINCT(labels_object.%s) FROM %s.%s WHERE metric_name=$2 %s;", params.TagKey, signozMetricDBName, signozTSTableName, tagsWhereClause)
|
||||
rows, err = r.db.Query(ctx, query, params.TagKey, params.MetricName)
|
||||
|
||||
}
|
||||
@ -2796,20 +2795,18 @@ func (r *ClickHouseReader) GetMetricAutocompleteTagValue(ctx context.Context, pa
|
||||
return &tagValueList, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMetricAutocompleteMetricNames(ctx context.Context, matchText string) (*[]string, *model.ApiError) {
|
||||
func (r *ClickHouseReader) GetMetricAutocompleteMetricNames(ctx context.Context, matchText string, limit int) (*[]string, *model.ApiError) {
|
||||
|
||||
var query string
|
||||
var err error
|
||||
var metricNameList []string
|
||||
var rows driver.Rows
|
||||
|
||||
if len(matchText) != 0 {
|
||||
query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels,'__name__')) from %s.%s WHERE JSONExtractString(labels,'__name__') ILIKE $1;", signozMetricDBName, signozTSTableName)
|
||||
rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", matchText))
|
||||
} else {
|
||||
query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels,'__name__')) from %s.%s;", signozMetricDBName, signozTSTableName)
|
||||
rows, err = r.db.Query(ctx, query)
|
||||
query = fmt.Sprintf("SELECT DISTINCT(metric_name) from %s.%s WHERE metric_name ILIKE $1", signozMetricDBName, signozTSTableName)
|
||||
if limit != 0 {
|
||||
query = query + fmt.Sprintf(" LIMIT %d;", limit)
|
||||
}
|
||||
rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", matchText))
|
||||
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -2828,3 +2825,84 @@ func (r *ClickHouseReader) GetMetricAutocompleteMetricNames(ctx context.Context,
|
||||
return &metricNameList, nil
|
||||
|
||||
}
|
||||
|
||||
// GetMetricResult runs the query and returns list of time series
|
||||
func (r *ClickHouseReader) GetMetricResult(ctx context.Context, query string) ([]*model.Series, error) {
|
||||
|
||||
rows, err := r.db.Query(ctx, query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing query: ", err)
|
||||
return nil, fmt.Errorf("error in processing query")
|
||||
}
|
||||
|
||||
var (
|
||||
columnTypes = rows.ColumnTypes()
|
||||
columnNames = rows.Columns()
|
||||
vars = make([]interface{}, len(columnTypes))
|
||||
)
|
||||
for i := range columnTypes {
|
||||
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
|
||||
}
|
||||
// when group by is applied, each combination of cartesian product
|
||||
// of attributes is separate series. each item in metricPointsMap
|
||||
// represent a unique series.
|
||||
metricPointsMap := make(map[string][]model.MetricPoint)
|
||||
// attribute key-value pairs for each group selection
|
||||
attributesMap := make(map[string]map[string]string)
|
||||
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(vars...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var groupBy []string
|
||||
var metricPoint model.MetricPoint
|
||||
groupAttributes := make(map[string]string)
|
||||
// Assuming that the end result row contains a timestamp, value and option labels
|
||||
// Label key and value are both strings.
|
||||
for idx, v := range vars {
|
||||
colName := columnNames[idx]
|
||||
switch v := v.(type) {
|
||||
case *string:
|
||||
// special case for returning all labels
|
||||
if colName == "fullLabels" {
|
||||
var metric map[string]string
|
||||
err := json.Unmarshal([]byte(*v), &metric)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for key, val := range metric {
|
||||
groupBy = append(groupBy, val)
|
||||
groupAttributes[key] = val
|
||||
}
|
||||
} else {
|
||||
groupBy = append(groupBy, *v)
|
||||
groupAttributes[colName] = *v
|
||||
}
|
||||
case *time.Time:
|
||||
metricPoint.Timestamp = v.UnixMilli()
|
||||
case *float64:
|
||||
metricPoint.Value = *v
|
||||
}
|
||||
}
|
||||
sort.Strings(groupBy)
|
||||
key := strings.Join(groupBy, "")
|
||||
attributesMap[key] = groupAttributes
|
||||
metricPointsMap[key] = append(metricPointsMap[key], metricPoint)
|
||||
}
|
||||
|
||||
var seriesList []*model.Series
|
||||
for key := range metricPointsMap {
|
||||
points := metricPointsMap[key]
|
||||
// first point in each series could be invalid since the
|
||||
// aggregations are applied with point from prev series
|
||||
if len(points) != 0 && len(points) > 1 {
|
||||
points = points[1:]
|
||||
}
|
||||
attributes := attributesMap[key]
|
||||
series := model.Series{Labels: attributes, Points: points}
|
||||
seriesList = append(seriesList, &series)
|
||||
}
|
||||
return seriesList, nil
|
||||
}
|
||||
|
@ -7,12 +7,16 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"go.signoz.io/query-service/app/dashboards"
|
||||
"go.signoz.io/query-service/app/metrics"
|
||||
"go.signoz.io/query-service/app/parser"
|
||||
"go.signoz.io/query-service/auth"
|
||||
"go.signoz.io/query-service/constants"
|
||||
@ -384,7 +388,12 @@ func (aH *APIHandler) getRule(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
func (aH *APIHandler) metricAutocompleteMetricName(w http.ResponseWriter, r *http.Request) {
|
||||
matchText := r.URL.Query().Get("match")
|
||||
metricNameList, apiErrObj := (*aH.reader).GetMetricAutocompleteMetricNames(r.Context(), matchText)
|
||||
limit, err := strconv.Atoi(r.URL.Query().Get("limit"))
|
||||
if err != nil {
|
||||
limit = 0 // no limit
|
||||
}
|
||||
|
||||
metricNameList, apiErrObj := (*aH.reader).GetMetricAutocompleteMetricNames(r.Context(), matchText, limit)
|
||||
|
||||
if apiErrObj != nil {
|
||||
respondError(w, apiErrObj, nil)
|
||||
@ -436,18 +445,173 @@ func (aH *APIHandler) metricAutocompleteTagValue(w http.ResponseWriter, r *http.
|
||||
func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request) {
|
||||
metricsQueryRangeParams, apiErrorObj := parser.ParseMetricQueryRangeParams(r)
|
||||
|
||||
fmt.Println(metricsQueryRangeParams)
|
||||
|
||||
if apiErrorObj != nil {
|
||||
zap.S().Errorf(apiErrorObj.Err.Error())
|
||||
respondError(w, apiErrorObj, nil)
|
||||
return
|
||||
}
|
||||
response_data := &model.QueryDataV2{
|
||||
ResultType: "matrix",
|
||||
Result: nil,
|
||||
|
||||
// prometheus instant query needs same timestamp
|
||||
if metricsQueryRangeParams.CompositeMetricQuery.PanelType == model.QUERY_VALUE &&
|
||||
metricsQueryRangeParams.CompositeMetricQuery.QueryType == model.PROM {
|
||||
metricsQueryRangeParams.Start = metricsQueryRangeParams.End
|
||||
}
|
||||
aH.respond(w, response_data)
|
||||
|
||||
// round up the end to neaerest multiple
|
||||
if metricsQueryRangeParams.CompositeMetricQuery.QueryType == model.QUERY_BUILDER {
|
||||
end := (metricsQueryRangeParams.End) / 1000
|
||||
step := metricsQueryRangeParams.Step
|
||||
metricsQueryRangeParams.End = (end / step * step) * 1000
|
||||
}
|
||||
|
||||
type channelResult struct {
|
||||
Series []*model.Series
|
||||
Err error
|
||||
}
|
||||
|
||||
execClickHouseQueries := func(queries map[string]string) ([]*model.Series, error) {
|
||||
var seriesList []*model.Series
|
||||
ch := make(chan channelResult, len(queries))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for name, query := range queries {
|
||||
wg.Add(1)
|
||||
go func(name, query string) {
|
||||
defer wg.Done()
|
||||
seriesList, err := (*aH.reader).GetMetricResult(r.Context(), query)
|
||||
for _, series := range seriesList {
|
||||
series.QueryName = name
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err)}
|
||||
return
|
||||
}
|
||||
ch <- channelResult{Series: seriesList}
|
||||
}(name, query)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(ch)
|
||||
|
||||
var errs []error
|
||||
// read values from the channel
|
||||
for r := range ch {
|
||||
if r.Err != nil {
|
||||
errs = append(errs, r.Err)
|
||||
continue
|
||||
}
|
||||
seriesList = append(seriesList, r.Series...)
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n"))
|
||||
}
|
||||
return seriesList, nil
|
||||
}
|
||||
|
||||
execPromQueries := func(metricsQueryRangeParams *model.QueryRangeParamsV2) ([]*model.Series, error) {
|
||||
var seriesList []*model.Series
|
||||
ch := make(chan channelResult, len(metricsQueryRangeParams.CompositeMetricQuery.PromQueries))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for name, query := range metricsQueryRangeParams.CompositeMetricQuery.PromQueries {
|
||||
if query.Disabled {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(name string, query *model.PromQuery) {
|
||||
var seriesList []*model.Series
|
||||
defer wg.Done()
|
||||
queryModel := model.QueryRangeParams{
|
||||
Start: time.UnixMilli(metricsQueryRangeParams.Start),
|
||||
End: time.UnixMilli(metricsQueryRangeParams.End),
|
||||
Step: time.Duration(metricsQueryRangeParams.Step * int64(time.Second)),
|
||||
Query: query.Query,
|
||||
}
|
||||
promResult, _, err := (*aH.reader).GetQueryRangeResult(r.Context(), &queryModel)
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err)}
|
||||
return
|
||||
}
|
||||
matrix, _ := promResult.Matrix()
|
||||
for _, v := range matrix {
|
||||
var s model.Series
|
||||
s.QueryName = name
|
||||
s.Labels = v.Metric.Copy().Map()
|
||||
for _, p := range v.Points {
|
||||
s.Points = append(s.Points, model.MetricPoint{Timestamp: p.T, Value: p.V})
|
||||
}
|
||||
seriesList = append(seriesList, &s)
|
||||
}
|
||||
ch <- channelResult{Series: seriesList}
|
||||
}(name, query)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(ch)
|
||||
|
||||
var errs []error
|
||||
// read values from the channel
|
||||
for r := range ch {
|
||||
if r.Err != nil {
|
||||
errs = append(errs, r.Err)
|
||||
continue
|
||||
}
|
||||
seriesList = append(seriesList, r.Series...)
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n"))
|
||||
}
|
||||
return seriesList, nil
|
||||
}
|
||||
|
||||
var seriesList []*model.Series
|
||||
var err error
|
||||
switch metricsQueryRangeParams.CompositeMetricQuery.QueryType {
|
||||
case model.QUERY_BUILDER:
|
||||
runQueries := metrics.PrepareBuilderMetricQueries(metricsQueryRangeParams, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||
if runQueries.Err != nil {
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: runQueries.Err}, nil)
|
||||
return
|
||||
}
|
||||
seriesList, err = execClickHouseQueries(runQueries.Queries)
|
||||
|
||||
case model.CLICKHOUSE:
|
||||
queries := make(map[string]string)
|
||||
for name, chQuery := range metricsQueryRangeParams.CompositeMetricQuery.ClickHouseQueries {
|
||||
if chQuery.Disabled {
|
||||
continue
|
||||
}
|
||||
queries[name] = chQuery.Query
|
||||
}
|
||||
seriesList, err = execClickHouseQueries(queries)
|
||||
case model.PROM:
|
||||
seriesList, err = execPromQueries(metricsQueryRangeParams)
|
||||
default:
|
||||
err = fmt.Errorf("invalid query type")
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
respondError(w, apiErrObj, nil)
|
||||
return
|
||||
}
|
||||
if metricsQueryRangeParams.CompositeMetricQuery.PanelType == model.QUERY_VALUE &&
|
||||
len(seriesList) > 1 &&
|
||||
(metricsQueryRangeParams.CompositeMetricQuery.QueryType == model.QUERY_BUILDER ||
|
||||
metricsQueryRangeParams.CompositeMetricQuery.QueryType == model.CLICKHOUSE) {
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid: query resulted in more than one series for value type")}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
type ResponseFormat struct {
|
||||
ResultType string `json:"resultType"`
|
||||
Result []*model.Series `json:"result"`
|
||||
}
|
||||
resp := ResponseFormat{ResultType: "matrix", Result: seriesList}
|
||||
aH.respond(w, resp)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) listRulesFromProm(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -50,7 +50,8 @@ type Reader interface {
|
||||
// Setter Interfaces
|
||||
SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
|
||||
|
||||
GetMetricAutocompleteMetricNames(ctx context.Context, matchText string) (*[]string, *model.ApiError)
|
||||
GetMetricAutocompleteMetricNames(ctx context.Context, matchText string, limit int) (*[]string, *model.ApiError)
|
||||
GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError)
|
||||
GetMetricAutocompleteTagValue(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError)
|
||||
GetMetricResult(ctx context.Context, query string) ([]*model.Series, error)
|
||||
}
|
||||
|
421
pkg/query-service/app/metrics/query_builder.go
Normal file
421
pkg/query-service/app/metrics/query_builder.go
Normal file
@ -0,0 +1,421 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/govaluate"
|
||||
"go.signoz.io/query-service/constants"
|
||||
"go.signoz.io/query-service/model"
|
||||
)
|
||||
|
||||
type RunQueries struct {
|
||||
Queries map[string]string
|
||||
Err error
|
||||
}
|
||||
|
||||
var AggregateOperatorToPercentile = map[model.AggregateOperator]float64{
|
||||
model.P05: 0.5,
|
||||
model.P10: 0.10,
|
||||
model.P20: 0.20,
|
||||
model.P25: 0.25,
|
||||
model.P50: 0.50,
|
||||
model.P75: 0.75,
|
||||
model.P90: 0.90,
|
||||
model.P95: 0.95,
|
||||
model.P99: 0.99,
|
||||
}
|
||||
|
||||
var AggregateOperatorToSQLFunc = map[model.AggregateOperator]string{
|
||||
model.AVG: "avg",
|
||||
model.MAX: "max",
|
||||
model.MIN: "min",
|
||||
model.SUM: "sum",
|
||||
model.RATE_SUM: "sum",
|
||||
model.RATE_AVG: "avg",
|
||||
model.RATE_MAX: "max",
|
||||
model.RATE_MIN: "min",
|
||||
}
|
||||
|
||||
var SupportedFunctions = []string{"exp", "log", "ln", "exp2", "log2", "exp10", "log10", "sqrt", "cbrt", "erf", "erfc", "lgamma", "tgamma", "sin", "cos", "tan", "asin", "acos", "atan", "degrees", "radians"}
|
||||
|
||||
func GoValuateFuncs() map[string]govaluate.ExpressionFunction {
|
||||
var GoValuateFuncs = map[string]govaluate.ExpressionFunction{}
|
||||
for _, fn := range SupportedFunctions {
|
||||
GoValuateFuncs[fn] = func(args ...interface{}) (interface{}, error) {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
return GoValuateFuncs
|
||||
}
|
||||
|
||||
// formattedValue formats the value to be used in clickhouse query
|
||||
func formattedValue(v interface{}) string {
|
||||
switch x := v.(type) {
|
||||
case int:
|
||||
return fmt.Sprintf("%d", x)
|
||||
case float32, float64:
|
||||
return fmt.Sprintf("%f", x)
|
||||
case string:
|
||||
return fmt.Sprintf("'%s'", x)
|
||||
case bool:
|
||||
return fmt.Sprintf("%v", x)
|
||||
case []interface{}:
|
||||
switch x[0].(type) {
|
||||
case string:
|
||||
str := "["
|
||||
for idx, sVal := range x {
|
||||
str += fmt.Sprintf("'%s'", sVal)
|
||||
if idx != len(x)-1 {
|
||||
str += ","
|
||||
}
|
||||
}
|
||||
str += "]"
|
||||
return str
|
||||
case int, float32, float64, bool:
|
||||
return strings.Join(strings.Fields(fmt.Sprint(x)), ",")
|
||||
}
|
||||
return ""
|
||||
default:
|
||||
// may be log the warning here?
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
// BuildMetricsTimeSeriesFilterQuery builds the sub-query to be used for filtering
|
||||
// timeseries based on search criteria
|
||||
func BuildMetricsTimeSeriesFilterQuery(fs *model.FilterSet, groupTags []string, metricName string, aggregateOperator model.AggregateOperator) (string, error) {
|
||||
var conditions []string
|
||||
conditions = append(conditions, fmt.Sprintf("metric_name = %s", formattedValue(metricName)))
|
||||
if fs != nil && len(fs.Items) != 0 {
|
||||
for _, item := range fs.Items {
|
||||
toFormat := item.Value
|
||||
// if the received value is an array for like/match op, just take the first value
|
||||
if strings.ToLower(item.Operation) == "like" ||
|
||||
strings.ToLower(item.Operation) == "match" ||
|
||||
strings.ToLower(item.Operation) == "nlike" {
|
||||
x, ok := item.Value.([]interface{})
|
||||
if ok {
|
||||
if len(x) == 0 {
|
||||
continue
|
||||
}
|
||||
toFormat = x[0]
|
||||
}
|
||||
}
|
||||
fmtVal := formattedValue(toFormat)
|
||||
switch op := strings.ToLower(item.Operation); op {
|
||||
case "eq":
|
||||
conditions = append(conditions, fmt.Sprintf("labels_object.%s = %s", item.Key, fmtVal))
|
||||
case "neq":
|
||||
conditions = append(conditions, fmt.Sprintf("labels_object.%s != %s", item.Key, fmtVal))
|
||||
case "in":
|
||||
conditions = append(conditions, fmt.Sprintf("labels_object.%s IN %s", item.Key, fmtVal))
|
||||
case "nin":
|
||||
conditions = append(conditions, fmt.Sprintf("labels_object.%s NOT IN %s", item.Key, fmtVal))
|
||||
case "like":
|
||||
conditions = append(conditions, fmt.Sprintf("like(labels_object.%s, %s)", item.Key, fmtVal))
|
||||
case "nlike":
|
||||
conditions = append(conditions, fmt.Sprintf("notLike(labels_object.%s, %s)", item.Key, fmtVal))
|
||||
case "match":
|
||||
conditions = append(conditions, fmt.Sprintf("match(labels_object.%s, %s)", item.Key, fmtVal))
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported operation")
|
||||
}
|
||||
}
|
||||
}
|
||||
queryString := strings.Join(conditions, " AND ")
|
||||
|
||||
var selectLabels string
|
||||
if aggregateOperator == model.NOOP || aggregateOperator == model.RATE {
|
||||
selectLabels = "labels,"
|
||||
} else {
|
||||
for _, tag := range groupTags {
|
||||
selectLabels += fmt.Sprintf(" labels_object.%s as %s,", tag, tag)
|
||||
}
|
||||
}
|
||||
|
||||
filterSubQuery := fmt.Sprintf("SELECT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_TABLENAME, queryString)
|
||||
|
||||
return filterSubQuery, nil
|
||||
}
|
||||
|
||||
func BuildMetricQuery(qp *model.QueryRangeParamsV2, mq *model.MetricQuery, tableName string) (string, error) {
|
||||
|
||||
if qp.CompositeMetricQuery.PanelType == model.QUERY_VALUE && len(mq.GroupingTags) != 0 {
|
||||
return "", fmt.Errorf("reduce operator cannot be applied for the query")
|
||||
}
|
||||
|
||||
filterSubQuery, err := BuildMetricsTimeSeriesFilterQuery(mq.TagFilters, mq.GroupingTags, mq.MetricName, mq.AggregateOperator)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", formattedValue(mq.MetricName), qp.Start, qp.End)
|
||||
|
||||
// Select the aggregate value for interval
|
||||
queryTmpl :=
|
||||
"SELECT %s" +
|
||||
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
|
||||
" %s as value" +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||
" INNER JOIN" +
|
||||
" (%s) as filtered_time_series" +
|
||||
" USING fingerprint" +
|
||||
" WHERE " + samplesTableTimeFilter +
|
||||
" GROUP BY %s" +
|
||||
" ORDER BY %s ts"
|
||||
|
||||
groupBy := groupBy(mq.GroupingTags...)
|
||||
groupTags := groupSelect(mq.GroupingTags...)
|
||||
|
||||
switch mq.AggregateOperator {
|
||||
case model.RATE:
|
||||
// Calculate rate of change of metric for each unique time series
|
||||
groupBy = "fingerprint, ts"
|
||||
groupTags = "fingerprint,"
|
||||
op := "max(value)" // max value should be the closest value for point in time
|
||||
subQuery := fmt.Sprintf(
|
||||
queryTmpl, "any(labels) as labels, "+groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags,
|
||||
) // labels will be same so any should be fine
|
||||
query := `SELECT %s ts, runningDifference(value)/runningDifference(ts) as value FROM(%s)`
|
||||
|
||||
query = fmt.Sprintf(query, "labels as fullLabels,", subQuery)
|
||||
return query, nil
|
||||
case model.SUM_RATE:
|
||||
rateGroupBy := "fingerprint, " + groupBy
|
||||
rateGroupTags := "fingerprint, " + groupTags
|
||||
op := "max(value)"
|
||||
subQuery := fmt.Sprintf(
|
||||
queryTmpl, rateGroupTags, qp.Step, op, filterSubQuery, rateGroupBy, rateGroupTags,
|
||||
) // labels will be same so any should be fine
|
||||
query := `SELECT %s ts, runningDifference(value)/runningDifference(ts) as value FROM(%s) OFFSET 1`
|
||||
query = fmt.Sprintf(query, groupTags, subQuery)
|
||||
query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, query, groupBy, groupTags)
|
||||
return query, nil
|
||||
case model.RATE_SUM, model.RATE_MAX, model.RATE_AVG, model.RATE_MIN:
|
||||
op := fmt.Sprintf("%s(value)", AggregateOperatorToSQLFunc[mq.AggregateOperator])
|
||||
subQuery := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags)
|
||||
query := `SELECT %s ts, runningDifference(value)/runningDifference(ts) as value FROM(%s) OFFSET 1`
|
||||
query = fmt.Sprintf(query, groupTags, subQuery)
|
||||
return query, nil
|
||||
case model.P05, model.P10, model.P20, model.P25, model.P50, model.P75, model.P90, model.P95, model.P99:
|
||||
op := fmt.Sprintf("quantile(%v)(value)", AggregateOperatorToPercentile[mq.AggregateOperator])
|
||||
query := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags)
|
||||
return query, nil
|
||||
case model.AVG, model.SUM, model.MIN, model.MAX:
|
||||
op := fmt.Sprintf("%s(value)", AggregateOperatorToSQLFunc[mq.AggregateOperator])
|
||||
query := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags)
|
||||
return query, nil
|
||||
case model.COUNT:
|
||||
op := "toFloat64(count(*))"
|
||||
query := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags)
|
||||
return query, nil
|
||||
case model.COUNT_DISTINCT:
|
||||
op := "toFloat64(count(distinct(value)))"
|
||||
query := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags)
|
||||
return query, nil
|
||||
case model.NOOP:
|
||||
queryTmpl :=
|
||||
"SELECT fingerprint, labels as fullLabels," +
|
||||
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
|
||||
" any(value) as value" +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||
" INNER JOIN" +
|
||||
" (%s) as filtered_time_series" +
|
||||
" USING fingerprint" +
|
||||
" WHERE " + samplesTableTimeFilter +
|
||||
" GROUP BY fingerprint, labels, ts" +
|
||||
" ORDER BY fingerprint, labels, ts"
|
||||
query := fmt.Sprintf(queryTmpl, qp.Step, filterSubQuery)
|
||||
return query, nil
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported aggregate operator")
|
||||
}
|
||||
}
|
||||
|
||||
func groupBy(tags ...string) string {
|
||||
tags = append(tags, "ts")
|
||||
return strings.Join(tags, ",")
|
||||
}
|
||||
|
||||
func groupSelect(tags ...string) string {
|
||||
groupTags := strings.Join(tags, ",")
|
||||
if len(tags) != 0 {
|
||||
groupTags += ", "
|
||||
}
|
||||
return groupTags
|
||||
}
|
||||
|
||||
// validateExpressions validates the math expressions using the list of
|
||||
// allowed functions.
|
||||
func validateExpressions(expressions []string, funcs map[string]govaluate.ExpressionFunction) []error {
|
||||
var errs []error
|
||||
for _, exp := range expressions {
|
||||
_, err := govaluate.NewEvaluableExpressionWithFunctions(exp, funcs)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return errs
|
||||
}
|
||||
|
||||
// FormatErrs returns formatted error string
|
||||
func FormatErrs(errs []error, separator string) string {
|
||||
var errStrs []string
|
||||
for _, err := range errs {
|
||||
errStrs = append(errStrs, err.Error())
|
||||
}
|
||||
return strings.Join(errStrs, separator)
|
||||
}
|
||||
|
||||
func reduceQuery(query string, reduceTo model.ReduceToOperator, aggregateOperator model.AggregateOperator) (string, error) {
|
||||
var selectLabels string
|
||||
var groupBy string
|
||||
// NOOP and RATE can possibly return multiple time series and reduce should be applied
|
||||
// for each uniques series. When the final result contains more than one series we throw
|
||||
// an error post DB fetching. Otherwise just return the single data. This is not known until queried so the
|
||||
// the query is prepared accordingly.
|
||||
if aggregateOperator == model.NOOP || aggregateOperator == model.RATE {
|
||||
selectLabels = ", any(fullLabels) as fullLabels"
|
||||
groupBy = "GROUP BY fingerprint"
|
||||
}
|
||||
// the timestamp picked is not relevant here since the final value used is show the single
|
||||
// chart with just the query value. For the quer
|
||||
switch reduceTo {
|
||||
case model.RLAST:
|
||||
query = fmt.Sprintf("SELECT anyLast(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
|
||||
case model.RSUM:
|
||||
query = fmt.Sprintf("SELECT sum(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
|
||||
case model.RAVG:
|
||||
query = fmt.Sprintf("SELECT avg(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
|
||||
case model.RMAX:
|
||||
query = fmt.Sprintf("SELECT max(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
|
||||
case model.RMIN:
|
||||
query = fmt.Sprintf("SELECT min(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported reduce operator")
|
||||
}
|
||||
return query, nil
|
||||
}
|
||||
|
||||
// varToQuery constructs the query for each named builder block
|
||||
func varToQuery(qp *model.QueryRangeParamsV2, tableName string) (map[string]string, error) {
|
||||
evalFuncs := GoValuateFuncs()
|
||||
varToQuery := make(map[string]string)
|
||||
for _, builderQuery := range qp.CompositeMetricQuery.BuilderQueries {
|
||||
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(builderQuery.Expression, evalFuncs)
|
||||
|
||||
// Use the parsed expression and build the query for each variable
|
||||
// if not already exists
|
||||
var errs []error
|
||||
for _, _var := range expression.Vars() {
|
||||
if _, ok := varToQuery[_var]; !ok {
|
||||
mq := qp.CompositeMetricQuery.BuilderQueries[_var]
|
||||
query, err := BuildMetricQuery(qp, mq, tableName)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
if qp.CompositeMetricQuery.PanelType == model.QUERY_VALUE {
|
||||
query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
varToQuery[_var] = query
|
||||
}
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return nil, fmt.Errorf("error while creating query: %s", FormatErrs(errs, "\n"))
|
||||
}
|
||||
}
|
||||
return varToQuery, nil
|
||||
}
|
||||
|
||||
// expressionToQuery constructs the query for the expression
|
||||
func expressionToQuery(qp *model.QueryRangeParamsV2, varToQuery map[string]string, expression *govaluate.EvaluableExpression) (string, error) {
|
||||
var formulaQuery string
|
||||
vars := expression.Vars()
|
||||
for idx, var_ := range vars[1:] {
|
||||
x, y := vars[idx], var_
|
||||
if !reflect.DeepEqual(qp.CompositeMetricQuery.BuilderQueries[x].GroupingTags, qp.CompositeMetricQuery.BuilderQueries[y].GroupingTags) {
|
||||
return "", fmt.Errorf("group by must be same")
|
||||
}
|
||||
}
|
||||
var modified []govaluate.ExpressionToken
|
||||
tokens := expression.Tokens()
|
||||
for idx := range tokens {
|
||||
token := tokens[idx]
|
||||
if token.Kind == govaluate.VARIABLE {
|
||||
token.Value = fmt.Sprintf("%v.value", token.Value)
|
||||
token.Meta = fmt.Sprintf("%v.value", token.Meta)
|
||||
}
|
||||
modified = append(modified, token)
|
||||
}
|
||||
formula, _ := govaluate.NewEvaluableExpressionFromTokens(modified)
|
||||
|
||||
var formulaSubQuery string
|
||||
var joinUsing string
|
||||
for idx, var_ := range vars {
|
||||
query := varToQuery[var_]
|
||||
groupTags := qp.CompositeMetricQuery.BuilderQueries[var_].GroupingTags
|
||||
groupTags = append(groupTags, "ts")
|
||||
joinUsing = strings.Join(groupTags, ",")
|
||||
formulaSubQuery += fmt.Sprintf("(%s) as %s ", query, var_)
|
||||
if idx < len(vars)-1 {
|
||||
formulaSubQuery += "INNER JOIN"
|
||||
} else if len(vars) > 1 {
|
||||
formulaSubQuery += fmt.Sprintf("USING (%s)", joinUsing)
|
||||
}
|
||||
}
|
||||
formulaQuery = fmt.Sprintf("SELECT %s, %s as value FROM ", joinUsing, formula.ExpressionString()) + formulaSubQuery
|
||||
return formulaQuery, nil
|
||||
}
|
||||
|
||||
// PrepareBuilderMetricQueries constructs the queries to be run for query range timeseries
|
||||
func PrepareBuilderMetricQueries(qp *model.QueryRangeParamsV2, tableName string) *RunQueries {
|
||||
evalFuncs := GoValuateFuncs()
|
||||
|
||||
// validate the expressions
|
||||
var expressions []string
|
||||
for _, bq := range qp.CompositeMetricQuery.BuilderQueries {
|
||||
expressions = append(expressions, bq.Expression)
|
||||
}
|
||||
if errs := validateExpressions(expressions, evalFuncs); len(errs) != 0 {
|
||||
return &RunQueries{Err: fmt.Errorf("invalid expressions: %s", FormatErrs(errs, "\n"))}
|
||||
}
|
||||
|
||||
varToQuery, err := varToQuery(qp, tableName)
|
||||
if err != nil {
|
||||
return &RunQueries{Err: err}
|
||||
}
|
||||
|
||||
namedQueries := make(map[string]string)
|
||||
|
||||
var errs []error
|
||||
for _, builderQuery := range qp.CompositeMetricQuery.BuilderQueries {
|
||||
if builderQuery.Disabled {
|
||||
continue
|
||||
}
|
||||
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(builderQuery.Expression, evalFuncs)
|
||||
tokens := expression.Tokens()
|
||||
// expression with one token is used to represent
|
||||
// that there are no functions applied on query
|
||||
if len(tokens) == 1 {
|
||||
_var := tokens[0].Value.(string)
|
||||
namedQueries[builderQuery.QueryName] = varToQuery[_var]
|
||||
} else {
|
||||
query, err := expressionToQuery(qp, varToQuery, expression)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
namedQueries[builderQuery.QueryName] = query
|
||||
}
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return &RunQueries{Err: fmt.Errorf("errors with formulas: %s", FormatErrs(errs, "\n"))}
|
||||
}
|
||||
fmt.Println(namedQueries)
|
||||
return &RunQueries{Queries: namedQueries}
|
||||
}
|
130
pkg/query-service/app/metrics/query_builder_test.go
Normal file
130
pkg/query-service/app/metrics/query_builder_test.go
Normal file
@ -0,0 +1,130 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
"go.signoz.io/query-service/model"
|
||||
)
|
||||
|
||||
func TestBuildQuery(t *testing.T) {
|
||||
Convey("TestSimpleQueryWithName", t, func() {
|
||||
q := &model.QueryRangeParamsV2{
|
||||
Start: 1650991982000,
|
||||
End: 1651078382000,
|
||||
Step: 60,
|
||||
CompositeMetricQuery: &model.CompositeMetricQuery{
|
||||
BuilderQueries: map[string]*model.MetricQuery{
|
||||
"a": {
|
||||
QueryName: "a",
|
||||
MetricName: "name",
|
||||
AggregateOperator: model.RATE_MAX,
|
||||
Expression: "a",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
queries := PrepareBuilderMetricQueries(q, "table").Queries
|
||||
So(len(queries), ShouldEqual, 1)
|
||||
So(queries["a"], ShouldContainSubstring, "WHERE metric_name = 'name'")
|
||||
So(queries["a"], ShouldContainSubstring, "runningDifference(value)/runningDifference(ts)")
|
||||
})
|
||||
}
|
||||
|
||||
func TestBuildQueryWithFilters(t *testing.T) {
|
||||
Convey("TestBuildQueryWithFilters", t, func() {
|
||||
q := &model.QueryRangeParamsV2{
|
||||
Start: 1650991982000,
|
||||
End: 1651078382000,
|
||||
Step: 60,
|
||||
CompositeMetricQuery: &model.CompositeMetricQuery{
|
||||
BuilderQueries: map[string]*model.MetricQuery{
|
||||
"a": {
|
||||
QueryName: "a",
|
||||
MetricName: "name",
|
||||
TagFilters: &model.FilterSet{Operation: "AND", Items: []model.FilterItem{
|
||||
{Key: "a", Value: "b", Operation: "neq"},
|
||||
}},
|
||||
AggregateOperator: model.RATE_MAX,
|
||||
Expression: "a",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
queries := PrepareBuilderMetricQueries(q, "table").Queries
|
||||
So(len(queries), ShouldEqual, 1)
|
||||
|
||||
So(queries["a"], ShouldContainSubstring, "WHERE metric_name = 'name' AND labels_object.a != 'b'")
|
||||
So(queries["a"], ShouldContainSubstring, "runningDifference(value)/runningDifference(ts)")
|
||||
})
|
||||
}
|
||||
|
||||
func TestBuildQueryWithMultipleQueries(t *testing.T) {
|
||||
Convey("TestBuildQueryWithFilters", t, func() {
|
||||
q := &model.QueryRangeParamsV2{
|
||||
Start: 1650991982000,
|
||||
End: 1651078382000,
|
||||
Step: 60,
|
||||
CompositeMetricQuery: &model.CompositeMetricQuery{
|
||||
BuilderQueries: map[string]*model.MetricQuery{
|
||||
"a": {
|
||||
QueryName: "a",
|
||||
MetricName: "name",
|
||||
TagFilters: &model.FilterSet{Operation: "AND", Items: []model.FilterItem{
|
||||
{Key: "in", Value: []interface{}{"a", "b", "c"}, Operation: "in"},
|
||||
}},
|
||||
AggregateOperator: model.RATE_AVG,
|
||||
Expression: "a",
|
||||
},
|
||||
"b": {
|
||||
QueryName: "b",
|
||||
MetricName: "name2",
|
||||
AggregateOperator: model.RATE_MAX,
|
||||
Expression: "b",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
queries := PrepareBuilderMetricQueries(q, "table").Queries
|
||||
So(len(queries), ShouldEqual, 2)
|
||||
So(queries["a"], ShouldContainSubstring, "WHERE metric_name = 'name' AND labels_object.in IN ['a','b','c']")
|
||||
So(queries["a"], ShouldContainSubstring, "runningDifference(value)/runningDifference(ts)")
|
||||
})
|
||||
}
|
||||
|
||||
func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) {
|
||||
Convey("TestBuildQueryWithFilters", t, func() {
|
||||
q := &model.QueryRangeParamsV2{
|
||||
Start: 1650991982000,
|
||||
End: 1651078382000,
|
||||
Step: 60,
|
||||
CompositeMetricQuery: &model.CompositeMetricQuery{
|
||||
BuilderQueries: map[string]*model.MetricQuery{
|
||||
"a": {
|
||||
QueryName: "a",
|
||||
MetricName: "name",
|
||||
TagFilters: &model.FilterSet{Operation: "AND", Items: []model.FilterItem{
|
||||
{Key: "in", Value: []interface{}{"a", "b", "c"}, Operation: "in"},
|
||||
}},
|
||||
AggregateOperator: model.RATE_MAX,
|
||||
Expression: "a",
|
||||
},
|
||||
"b": {
|
||||
MetricName: "name2",
|
||||
AggregateOperator: model.RATE_AVG,
|
||||
Expression: "b",
|
||||
},
|
||||
"c": {
|
||||
QueryName: "c",
|
||||
Expression: "a/b",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
queries := PrepareBuilderMetricQueries(q, "table").Queries
|
||||
So(len(queries), ShouldEqual, 3)
|
||||
So(queries["c"], ShouldContainSubstring, "SELECT ts, a.value / b.value")
|
||||
So(queries["c"], ShouldContainSubstring, "WHERE metric_name = 'name' AND labels_object.in IN ['a','b','c']")
|
||||
So(queries["c"], ShouldContainSubstring, "runningDifference(value)/runningDifference(ts)")
|
||||
})
|
||||
}
|
@ -667,3 +667,12 @@ func parseChangePasswordRequest(r *http.Request) (*model.ChangePasswordRequest,
|
||||
|
||||
return &req, nil
|
||||
}
|
||||
|
||||
func parseFilterSet(r *http.Request) (*model.FilterSet, error) {
|
||||
var filterSet model.FilterSet
|
||||
err := json.NewDecoder(r.Body).Decode(&filterSet)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &filterSet, nil
|
||||
}
|
||||
|
@ -5,19 +5,39 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"go.signoz.io/query-service/app/metrics"
|
||||
"go.signoz.io/query-service/model"
|
||||
)
|
||||
|
||||
func validateQueryRangeParamsV2(qp *model.QueryRangeParamsV2) error {
|
||||
var errs []error
|
||||
if !(qp.DataSource >= model.METRICS && qp.DataSource <= model.LOGS) {
|
||||
errs = append(errs, fmt.Errorf("unsupported data source"))
|
||||
}
|
||||
if !(qp.CompositeMetricQuery.QueryType >= model.QUERY_BUILDER && qp.CompositeMetricQuery.QueryType <= model.PROM) {
|
||||
errs = append(errs, fmt.Errorf("unsupported query type"))
|
||||
}
|
||||
if !(qp.CompositeMetricQuery.PanelType >= model.TIME_SERIES && qp.CompositeMetricQuery.PanelType <= model.QUERY_VALUE) {
|
||||
errs = append(errs, fmt.Errorf("unsupported panel type"))
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return fmt.Errorf("one or more errors found : %s", metrics.FormatErrs(errs, ","))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ParseMetricQueryRangeParams(r *http.Request) (*model.QueryRangeParamsV2, *model.ApiError) {
|
||||
|
||||
var postData *model.QueryRangeParamsV2
|
||||
err := json.NewDecoder(r.Body).Decode(&postData)
|
||||
|
||||
if err != nil {
|
||||
if err := json.NewDecoder(r.Body).Decode(&postData); err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
}
|
||||
if err := validateQueryRangeParamsV2(postData); err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
return postData, nil
|
||||
}
|
||||
|
||||
func ParseMetricAutocompleteTagParams(r *http.Request) (*model.MetricAutocompleteTagParams, *model.ApiError) {
|
||||
|
60
pkg/query-service/app/parser_test.go
Normal file
60
pkg/query-service/app/parser_test.go
Normal file
@ -0,0 +1,60 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/smartystreets/assertions/should"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
"go.signoz.io/query-service/app/metrics"
|
||||
"go.signoz.io/query-service/model"
|
||||
)
|
||||
|
||||
func TestParseFilterSingleFilter(t *testing.T) {
|
||||
Convey("TestParseFilterSingleFilter", t, func() {
|
||||
postBody := []byte(`{
|
||||
"op": "AND",
|
||||
"items": [
|
||||
{"key": "namespace", "value": "a", "op": "EQ"}
|
||||
]
|
||||
}`)
|
||||
req, _ := http.NewRequest("POST", "", bytes.NewReader(postBody))
|
||||
res, _ := parseFilterSet(req)
|
||||
query, _ := metrics.BuildMetricsTimeSeriesFilterQuery(res, []string{}, "table", model.NOOP)
|
||||
So(query, ShouldContainSubstring, "signoz_metrics.time_series_v2 WHERE metric_name = 'table' AND labels_object.namespace = 'a'")
|
||||
})
|
||||
}
|
||||
|
||||
func TestParseFilterMultipleFilter(t *testing.T) {
|
||||
Convey("TestParseFilterMultipleFilter", t, func() {
|
||||
postBody := []byte(`{
|
||||
"op": "AND",
|
||||
"items": [
|
||||
{"key": "namespace", "value": "a", "op": "EQ"},
|
||||
{"key": "host", "value": ["host-1", "host-2"], "op": "IN"}
|
||||
]
|
||||
}`)
|
||||
req, _ := http.NewRequest("POST", "", bytes.NewReader(postBody))
|
||||
res, _ := parseFilterSet(req)
|
||||
query, _ := metrics.BuildMetricsTimeSeriesFilterQuery(res, []string{}, "table", model.NOOP)
|
||||
So(query, should.ContainSubstring, "labels_object.host IN ['host-1','host-2']")
|
||||
So(query, should.ContainSubstring, "labels_object.namespace = 'a'")
|
||||
})
|
||||
}
|
||||
|
||||
func TestParseFilterNotSupportedOp(t *testing.T) {
|
||||
Convey("TestParseFilterNotSupportedOp", t, func() {
|
||||
postBody := []byte(`{
|
||||
"op": "AND",
|
||||
"items": [
|
||||
{"key": "namespace", "value": "a", "op": "PO"}
|
||||
]
|
||||
}`)
|
||||
req, _ := http.NewRequest("POST", "", bytes.NewReader(postBody))
|
||||
res, _ := parseFilterSet(req)
|
||||
_, err := metrics.BuildMetricsTimeSeriesFilterQuery(res, []string{}, "table", model.NOOP)
|
||||
So(err, should.BeError, "unsupported operation")
|
||||
})
|
||||
}
|
@ -62,6 +62,11 @@ const (
|
||||
StatusFailed = "failed"
|
||||
StatusSuccess = "success"
|
||||
)
|
||||
const (
|
||||
SIGNOZ_METRIC_DBNAME = "signoz_metrics"
|
||||
SIGNOZ_SAMPLES_TABLENAME = "samples_v2"
|
||||
SIGNOZ_TIMESERIES_TABLENAME = "time_series_v2"
|
||||
)
|
||||
|
||||
func GetOrDefaultEnv(key string, fallback string) string {
|
||||
v := os.Getenv(key)
|
||||
|
@ -4,6 +4,7 @@ go 1.17
|
||||
|
||||
require (
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.0.12
|
||||
github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb
|
||||
github.com/go-kit/log v0.1.0
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/gorilla/handlers v1.5.1
|
||||
@ -107,7 +108,7 @@ require (
|
||||
github.com/segmentio/backo-go v1.0.0 // indirect
|
||||
github.com/shopspring/decimal v1.3.1 // indirect
|
||||
github.com/sirupsen/logrus v1.8.1 // indirect
|
||||
github.com/smartystreets/assertions v1.1.0 // indirect
|
||||
github.com/smartystreets/assertions v1.1.0
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/spf13/pflag v1.0.3 // indirect
|
||||
github.com/stretchr/testify v1.7.1
|
||||
@ -139,4 +140,4 @@ require (
|
||||
k8s.io/client-go v8.0.0+incompatible // indirect
|
||||
)
|
||||
|
||||
replace github.com/prometheus/prometheus => github.com/SigNoz/prometheus v1.9.70
|
||||
replace github.com/prometheus/prometheus => github.com/SigNoz/prometheus v1.9.71
|
||||
|
@ -55,8 +55,10 @@ github.com/ClickHouse/clickhouse-go/v2 v2.0.12/go.mod h1:u4RoNQLLM2W6hNSPYrIESLJ
|
||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
|
||||
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
|
||||
github.com/SigNoz/prometheus v1.9.70 h1:0214i78cje5MkX0tXYwX2cK4cHXrFw18WcSLhv4YDpk=
|
||||
github.com/SigNoz/prometheus v1.9.70/go.mod h1:Y4J9tGDmacMC+EcOTp+EIAn2C1sN+9kE+idyVKadiVM=
|
||||
github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb h1:bneLSKPf9YUSFmafKx32bynV6QrzViL/s+ZDvQxH1E4=
|
||||
github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb/go.mod h1:JznGDNg9x1cujDKa22RaQOimOvvEfy3nxzDGd8XDgmA=
|
||||
github.com/SigNoz/prometheus v1.9.71 h1:X+6f4k5bqX+lpPFHCi+f6XiSehTj3Yzh1B/FDJi//Sk=
|
||||
github.com/SigNoz/prometheus v1.9.71/go.mod h1:Y4J9tGDmacMC+EcOTp+EIAn2C1sN+9kE+idyVKadiVM=
|
||||
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
|
||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
|
@ -18,27 +18,114 @@ type QueryRangeParams struct {
|
||||
Stats string
|
||||
}
|
||||
|
||||
type Query struct {
|
||||
Datasource string `json:"datasource"`
|
||||
Format string `json:"format"`
|
||||
Expr string `json:"expr"`
|
||||
type MetricQuery struct {
|
||||
QueryName string `json:"queryName"`
|
||||
MetricName string `json:"metricName"`
|
||||
TagFilters *FilterSet `json:"tagFilters,omitempty"`
|
||||
GroupingTags []string `json:"groupBy,omitempty"`
|
||||
AggregateOperator AggregateOperator `json:"aggregateOperator"`
|
||||
Expression string `json:"expression"`
|
||||
Disabled bool `json:"disabled"`
|
||||
ReduceTo ReduceToOperator `json:"reduceTo,omitempty"`
|
||||
}
|
||||
|
||||
type ReduceToOperator int
|
||||
|
||||
const (
|
||||
_ ReduceToOperator = iota
|
||||
RLAST
|
||||
RSUM
|
||||
RAVG
|
||||
RMAX
|
||||
RMIN
|
||||
)
|
||||
|
||||
type QueryType int
|
||||
|
||||
const (
|
||||
_ QueryType = iota
|
||||
QUERY_BUILDER
|
||||
CLICKHOUSE
|
||||
PROM
|
||||
)
|
||||
|
||||
type PromQuery struct {
|
||||
Query string `json:"query"`
|
||||
Stats string `json:"stats,omitempty"`
|
||||
Disabled bool `json:"disabled"`
|
||||
}
|
||||
|
||||
type ClickHouseQuery struct {
|
||||
Query string `json:"query"`
|
||||
Disabled bool `json:"disabled"`
|
||||
}
|
||||
|
||||
type PanelType int
|
||||
|
||||
const (
|
||||
_ PanelType = iota
|
||||
TIME_SERIES
|
||||
QUERY_VALUE
|
||||
)
|
||||
|
||||
type CompositeMetricQuery struct {
|
||||
BuilderQueries map[string]*MetricQuery `json:"builderQueries,omitempty"`
|
||||
ClickHouseQueries map[string]*ClickHouseQuery `json:"chQueries,omitempty"`
|
||||
PromQueries map[string]*PromQuery `json:"promQueries,omitempty"`
|
||||
PanelType PanelType `json:"panelType"`
|
||||
QueryType QueryType `json:"queryType"`
|
||||
}
|
||||
|
||||
type AggregateOperator int
|
||||
|
||||
const (
|
||||
_ AggregateOperator = iota
|
||||
NOOP
|
||||
COUNT
|
||||
COUNT_DISTINCT
|
||||
SUM
|
||||
AVG
|
||||
MAX
|
||||
MIN
|
||||
P05
|
||||
P10
|
||||
P20
|
||||
P25
|
||||
P50
|
||||
P75
|
||||
P90
|
||||
P95
|
||||
P99
|
||||
RATE
|
||||
SUM_RATE
|
||||
// leave blank space for possily {AVG, X}_RATE
|
||||
_
|
||||
_
|
||||
_
|
||||
RATE_SUM
|
||||
RATE_AVG
|
||||
RATE_MAX
|
||||
RATE_MIN
|
||||
)
|
||||
|
||||
type DataSource int
|
||||
|
||||
const (
|
||||
_ DataSource = iota
|
||||
METRICS
|
||||
TRACES
|
||||
LOGS
|
||||
)
|
||||
|
||||
type QueryRangeParamsV2 struct {
|
||||
Start time.Time
|
||||
End time.Time
|
||||
Step time.Duration
|
||||
StartStr string `json:"start"`
|
||||
EndStr string `json:"end"`
|
||||
StepStr string `json:"step"`
|
||||
Queries []Query `json:"queries"`
|
||||
}
|
||||
|
||||
func (params QueryRangeParamsV2) sanitizeAndValidate() (*QueryRangeParamsV2, error) {
|
||||
|
||||
return nil, nil
|
||||
DataSource DataSource `json:"dataSource"`
|
||||
Start int64 `json:"start"`
|
||||
End int64 `json:"end"`
|
||||
Step int64 `json:"step"`
|
||||
CompositeMetricQuery *CompositeMetricQuery `json:"compositeMetricQuery"`
|
||||
}
|
||||
|
||||
// Metric auto complete types
|
||||
type metricTags map[string]string
|
||||
|
||||
type MetricAutocompleteTagParams struct {
|
||||
@ -192,7 +279,7 @@ type TTLParams struct {
|
||||
}
|
||||
|
||||
type GetTTLParams struct {
|
||||
Type string
|
||||
Type string
|
||||
}
|
||||
|
||||
type GetErrorsParams struct {
|
||||
@ -205,3 +292,19 @@ type GetErrorParams struct {
|
||||
ErrorID string
|
||||
ServiceName string
|
||||
}
|
||||
|
||||
type FilterItem struct {
|
||||
Key string `json:"key"`
|
||||
Value interface{} `json:"value"`
|
||||
Operation string `json:"op"`
|
||||
}
|
||||
|
||||
type FilterSet struct {
|
||||
Operation string `json:"op,omitempty"`
|
||||
Items []FilterItem `json:"items"`
|
||||
}
|
||||
|
||||
type RemoveTTLParams struct {
|
||||
Type string
|
||||
RemoveAllTTL bool
|
||||
}
|
||||
|
@ -356,3 +356,20 @@ type ErrorWithSpan struct {
|
||||
NewerErrorID string `json:"newerErrorId" ch:"newerErrorId"`
|
||||
OlderErrorID string `json:"olderErrorId" ch:"olderErrorId"`
|
||||
}
|
||||
|
||||
type Series struct {
|
||||
QueryName string `json:"queryName"`
|
||||
Labels map[string]string `json:"metric"`
|
||||
Points []MetricPoint `json:"values"`
|
||||
}
|
||||
|
||||
type MetricPoint struct {
|
||||
Timestamp int64
|
||||
Value float64
|
||||
}
|
||||
|
||||
// MarshalJSON implements json.Marshaler.
|
||||
func (p *MetricPoint) MarshalJSON() ([]byte, error) {
|
||||
v := strconv.FormatFloat(p.Value, 'f', -1, 64)
|
||||
return json.Marshal([...]interface{}{float64(p.Timestamp) / 1000, v})
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user