diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index f405c69f6d..17f3e5b047 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -14,6 +14,7 @@ import ( "net/http" "net/url" "os" + "reflect" "regexp" "sort" "strconv" @@ -61,10 +62,8 @@ const ( signozErrorIndexTable = "signoz_error_index" signozTraceTableName = "signoz_index_v2" signozMetricDBName = "signoz_metrics" - signozSampleName = "samples" - signozTSName = "time_series" - signozSampleTableName = "samples" - signozTSTableName = "time_series" + signozSampleTableName = "samples_v2" + signozTSTableName = "time_series_v2" minTimespanForProgressiveSearch = time.Hour minTimespanForProgressiveSearchMargin = time.Minute @@ -2368,7 +2367,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } case constants.MetricsTTL: - tableName = signozMetricDBName + "." + signozSampleName + tableName = signozMetricDBName + "." + signozSampleTableName statusItem, err := r.checkTTLStatusItem(ctx, tableName) if err != nil { return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")} @@ -2607,7 +2606,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil case constants.MetricsTTL: - tableNameArray := []string{signozMetricDBName + "." + signozSampleName} + tableNameArray := []string{signozMetricDBName + "." + signozSampleTableName} status, err := r.setTTLQueryStatus(ctx, tableNameArray) if err != nil { return nil, err @@ -2726,16 +2725,16 @@ func (r *ClickHouseReader) GetMetricAutocompleteTagKey(ctx context.Context, para tagsWhereClause := "" for key, val := range params.MetricTags { - tagsWhereClause += fmt.Sprintf("AND JSONExtractString(labels,'%s') = '%s'", key, val) + tagsWhereClause += fmt.Sprintf(" AND labels_object.%s = '%s' ", key, val) } // "select distinctTagKeys from (SELECT DISTINCT arrayJoin(tagKeys) distinctTagKeys from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from signoz_metrics.time_series WHERE JSONExtractString(labels,'__name__')='node_udp_queues')) WHERE distinctTagKeys ILIKE '%host%';" if len(params.Match) != 0 { - query = fmt.Sprintf("select distinctTagKeys from (SELECT DISTINCT arrayJoin(tagKeys) distinctTagKeys from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from %s.%s WHERE JSONExtractString(labels,'__name__')=$1 %s)) WHERE distinctTagKeys ILIKE $2;", signozMetricDBName, signozTSTableName, tagsWhereClause) + query = fmt.Sprintf("select distinctTagKeys from (SELECT DISTINCT arrayJoin(tagKeys) distinctTagKeys from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from %s.%s WHERE metric_name=$1 %s)) WHERE distinctTagKeys ILIKE $2;", signozMetricDBName, signozTSTableName, tagsWhereClause) rows, err = r.db.Query(ctx, query, params.MetricName, fmt.Sprintf("%%%s%%", params.Match)) } else { - query = fmt.Sprintf("select distinctTagKeys from (SELECT DISTINCT arrayJoin(tagKeys) distinctTagKeys from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from %s.%s WHERE JSONExtractString(labels,'__name__')=$1 %s ));", signozMetricDBName, signozTSTableName, tagsWhereClause) + query = fmt.Sprintf("select distinctTagKeys from (SELECT DISTINCT arrayJoin(tagKeys) distinctTagKeys from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from %s.%s WHERE metric_name=$1 %s ));", signozMetricDBName, signozTSTableName, tagsWhereClause) rows, err = r.db.Query(ctx, query, params.MetricName) } @@ -2765,16 +2764,16 @@ func (r *ClickHouseReader) GetMetricAutocompleteTagValue(ctx context.Context, pa tagsWhereClause := "" for key, val := range params.MetricTags { - tagsWhereClause += fmt.Sprintf("AND JSONExtractString(labels,'%s') = '%s'", key, val) + tagsWhereClause += fmt.Sprintf(" AND labels_object.%s = '%s' ", key, val) } if len(params.Match) != 0 { - query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels, $1)) from %s.%s WHERE JSONExtractString(labels,'__name__')=$2 %s AND JSONExtractString(labels, $1) ILIKE $3;", signozMetricDBName, signozTSTableName, tagsWhereClause) + query = fmt.Sprintf("SELECT DISTINCT(labels_object.%s) from %s.%s WHERE metric_name=$1 %s AND labels_object.%s ILIKE $2;", params.TagKey, signozMetricDBName, signozTSTableName, tagsWhereClause, params.TagKey) rows, err = r.db.Query(ctx, query, params.TagKey, params.MetricName, fmt.Sprintf("%%%s%%", params.Match)) } else { - query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels, $1)) FROM %s.%s WHERE JSONExtractString(labels,'__name__')=$2 %s;", signozMetricDBName, signozTSTableName, tagsWhereClause) + query = fmt.Sprintf("SELECT DISTINCT(labels_object.%s) FROM %s.%s WHERE metric_name=$2 %s;", params.TagKey, signozMetricDBName, signozTSTableName, tagsWhereClause) rows, err = r.db.Query(ctx, query, params.TagKey, params.MetricName) } @@ -2796,20 +2795,18 @@ func (r *ClickHouseReader) GetMetricAutocompleteTagValue(ctx context.Context, pa return &tagValueList, nil } -func (r *ClickHouseReader) GetMetricAutocompleteMetricNames(ctx context.Context, matchText string) (*[]string, *model.ApiError) { +func (r *ClickHouseReader) GetMetricAutocompleteMetricNames(ctx context.Context, matchText string, limit int) (*[]string, *model.ApiError) { var query string var err error var metricNameList []string var rows driver.Rows - if len(matchText) != 0 { - query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels,'__name__')) from %s.%s WHERE JSONExtractString(labels,'__name__') ILIKE $1;", signozMetricDBName, signozTSTableName) - rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", matchText)) - } else { - query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels,'__name__')) from %s.%s;", signozMetricDBName, signozTSTableName) - rows, err = r.db.Query(ctx, query) + query = fmt.Sprintf("SELECT DISTINCT(metric_name) from %s.%s WHERE metric_name ILIKE $1", signozMetricDBName, signozTSTableName) + if limit != 0 { + query = query + fmt.Sprintf(" LIMIT %d;", limit) } + rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", matchText)) if err != nil { zap.S().Error(err) @@ -2828,3 +2825,84 @@ func (r *ClickHouseReader) GetMetricAutocompleteMetricNames(ctx context.Context, return &metricNameList, nil } + +// GetMetricResult runs the query and returns list of time series +func (r *ClickHouseReader) GetMetricResult(ctx context.Context, query string) ([]*model.Series, error) { + + rows, err := r.db.Query(ctx, query) + + if err != nil { + zap.S().Debug("Error in processing query: ", err) + return nil, fmt.Errorf("error in processing query") + } + + var ( + columnTypes = rows.ColumnTypes() + columnNames = rows.Columns() + vars = make([]interface{}, len(columnTypes)) + ) + for i := range columnTypes { + vars[i] = reflect.New(columnTypes[i].ScanType()).Interface() + } + // when group by is applied, each combination of cartesian product + // of attributes is separate series. each item in metricPointsMap + // represent a unique series. + metricPointsMap := make(map[string][]model.MetricPoint) + // attribute key-value pairs for each group selection + attributesMap := make(map[string]map[string]string) + + defer rows.Close() + for rows.Next() { + if err := rows.Scan(vars...); err != nil { + return nil, err + } + var groupBy []string + var metricPoint model.MetricPoint + groupAttributes := make(map[string]string) + // Assuming that the end result row contains a timestamp, value and option labels + // Label key and value are both strings. + for idx, v := range vars { + colName := columnNames[idx] + switch v := v.(type) { + case *string: + // special case for returning all labels + if colName == "fullLabels" { + var metric map[string]string + err := json.Unmarshal([]byte(*v), &metric) + if err != nil { + return nil, err + } + for key, val := range metric { + groupBy = append(groupBy, val) + groupAttributes[key] = val + } + } else { + groupBy = append(groupBy, *v) + groupAttributes[colName] = *v + } + case *time.Time: + metricPoint.Timestamp = v.UnixMilli() + case *float64: + metricPoint.Value = *v + } + } + sort.Strings(groupBy) + key := strings.Join(groupBy, "") + attributesMap[key] = groupAttributes + metricPointsMap[key] = append(metricPointsMap[key], metricPoint) + } + + var seriesList []*model.Series + for key := range metricPointsMap { + points := metricPointsMap[key] + // first point in each series could be invalid since the + // aggregations are applied with point from prev series + if len(points) != 0 && len(points) > 1 { + points = points[1:] + } + attributes := attributesMap[key] + series := model.Series{Labels: attributes, Points: points} + seriesList = append(seriesList, &series) + } + return seriesList, nil +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index ee4633ff1a..400c9f2de4 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -7,12 +7,16 @@ import ( "fmt" "io/ioutil" "net/http" + "strconv" + "sync" + "time" "github.com/gorilla/mux" jsoniter "github.com/json-iterator/go" _ "github.com/mattn/go-sqlite3" "github.com/prometheus/prometheus/promql" "go.signoz.io/query-service/app/dashboards" + "go.signoz.io/query-service/app/metrics" "go.signoz.io/query-service/app/parser" "go.signoz.io/query-service/auth" "go.signoz.io/query-service/constants" @@ -384,7 +388,12 @@ func (aH *APIHandler) getRule(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) metricAutocompleteMetricName(w http.ResponseWriter, r *http.Request) { matchText := r.URL.Query().Get("match") - metricNameList, apiErrObj := (*aH.reader).GetMetricAutocompleteMetricNames(r.Context(), matchText) + limit, err := strconv.Atoi(r.URL.Query().Get("limit")) + if err != nil { + limit = 0 // no limit + } + + metricNameList, apiErrObj := (*aH.reader).GetMetricAutocompleteMetricNames(r.Context(), matchText, limit) if apiErrObj != nil { respondError(w, apiErrObj, nil) @@ -436,18 +445,173 @@ func (aH *APIHandler) metricAutocompleteTagValue(w http.ResponseWriter, r *http. func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request) { metricsQueryRangeParams, apiErrorObj := parser.ParseMetricQueryRangeParams(r) - fmt.Println(metricsQueryRangeParams) - if apiErrorObj != nil { zap.S().Errorf(apiErrorObj.Err.Error()) respondError(w, apiErrorObj, nil) return } - response_data := &model.QueryDataV2{ - ResultType: "matrix", - Result: nil, + + // prometheus instant query needs same timestamp + if metricsQueryRangeParams.CompositeMetricQuery.PanelType == model.QUERY_VALUE && + metricsQueryRangeParams.CompositeMetricQuery.QueryType == model.PROM { + metricsQueryRangeParams.Start = metricsQueryRangeParams.End } - aH.respond(w, response_data) + + // round up the end to neaerest multiple + if metricsQueryRangeParams.CompositeMetricQuery.QueryType == model.QUERY_BUILDER { + end := (metricsQueryRangeParams.End) / 1000 + step := metricsQueryRangeParams.Step + metricsQueryRangeParams.End = (end / step * step) * 1000 + } + + type channelResult struct { + Series []*model.Series + Err error + } + + execClickHouseQueries := func(queries map[string]string) ([]*model.Series, error) { + var seriesList []*model.Series + ch := make(chan channelResult, len(queries)) + var wg sync.WaitGroup + + for name, query := range queries { + wg.Add(1) + go func(name, query string) { + defer wg.Done() + seriesList, err := (*aH.reader).GetMetricResult(r.Context(), query) + for _, series := range seriesList { + series.QueryName = name + } + + if err != nil { + ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err)} + return + } + ch <- channelResult{Series: seriesList} + }(name, query) + } + + wg.Wait() + close(ch) + + var errs []error + // read values from the channel + for r := range ch { + if r.Err != nil { + errs = append(errs, r.Err) + continue + } + seriesList = append(seriesList, r.Series...) + } + if len(errs) != 0 { + return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n")) + } + return seriesList, nil + } + + execPromQueries := func(metricsQueryRangeParams *model.QueryRangeParamsV2) ([]*model.Series, error) { + var seriesList []*model.Series + ch := make(chan channelResult, len(metricsQueryRangeParams.CompositeMetricQuery.PromQueries)) + var wg sync.WaitGroup + + for name, query := range metricsQueryRangeParams.CompositeMetricQuery.PromQueries { + if query.Disabled { + continue + } + wg.Add(1) + go func(name string, query *model.PromQuery) { + var seriesList []*model.Series + defer wg.Done() + queryModel := model.QueryRangeParams{ + Start: time.UnixMilli(metricsQueryRangeParams.Start), + End: time.UnixMilli(metricsQueryRangeParams.End), + Step: time.Duration(metricsQueryRangeParams.Step * int64(time.Second)), + Query: query.Query, + } + promResult, _, err := (*aH.reader).GetQueryRangeResult(r.Context(), &queryModel) + if err != nil { + ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err)} + return + } + matrix, _ := promResult.Matrix() + for _, v := range matrix { + var s model.Series + s.QueryName = name + s.Labels = v.Metric.Copy().Map() + for _, p := range v.Points { + s.Points = append(s.Points, model.MetricPoint{Timestamp: p.T, Value: p.V}) + } + seriesList = append(seriesList, &s) + } + ch <- channelResult{Series: seriesList} + }(name, query) + } + + wg.Wait() + close(ch) + + var errs []error + // read values from the channel + for r := range ch { + if r.Err != nil { + errs = append(errs, r.Err) + continue + } + seriesList = append(seriesList, r.Series...) + } + if len(errs) != 0 { + return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n")) + } + return seriesList, nil + } + + var seriesList []*model.Series + var err error + switch metricsQueryRangeParams.CompositeMetricQuery.QueryType { + case model.QUERY_BUILDER: + runQueries := metrics.PrepareBuilderMetricQueries(metricsQueryRangeParams, constants.SIGNOZ_TIMESERIES_TABLENAME) + if runQueries.Err != nil { + respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: runQueries.Err}, nil) + return + } + seriesList, err = execClickHouseQueries(runQueries.Queries) + + case model.CLICKHOUSE: + queries := make(map[string]string) + for name, chQuery := range metricsQueryRangeParams.CompositeMetricQuery.ClickHouseQueries { + if chQuery.Disabled { + continue + } + queries[name] = chQuery.Query + } + seriesList, err = execClickHouseQueries(queries) + case model.PROM: + seriesList, err = execPromQueries(metricsQueryRangeParams) + default: + err = fmt.Errorf("invalid query type") + respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + respondError(w, apiErrObj, nil) + return + } + if metricsQueryRangeParams.CompositeMetricQuery.PanelType == model.QUERY_VALUE && + len(seriesList) > 1 && + (metricsQueryRangeParams.CompositeMetricQuery.QueryType == model.QUERY_BUILDER || + metricsQueryRangeParams.CompositeMetricQuery.QueryType == model.CLICKHOUSE) { + respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid: query resulted in more than one series for value type")}, nil) + return + } + + type ResponseFormat struct { + ResultType string `json:"resultType"` + Result []*model.Series `json:"result"` + } + resp := ResponseFormat{ResultType: "matrix", Result: seriesList} + aH.respond(w, resp) } func (aH *APIHandler) listRulesFromProm(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/query-service/app/interface.go b/pkg/query-service/app/interface.go index cbbb78249b..fadbcd1e79 100644 --- a/pkg/query-service/app/interface.go +++ b/pkg/query-service/app/interface.go @@ -50,7 +50,8 @@ type Reader interface { // Setter Interfaces SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) - GetMetricAutocompleteMetricNames(ctx context.Context, matchText string) (*[]string, *model.ApiError) + GetMetricAutocompleteMetricNames(ctx context.Context, matchText string, limit int) (*[]string, *model.ApiError) GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) GetMetricAutocompleteTagValue(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) + GetMetricResult(ctx context.Context, query string) ([]*model.Series, error) } diff --git a/pkg/query-service/app/metrics/query_builder.go b/pkg/query-service/app/metrics/query_builder.go new file mode 100644 index 0000000000..bf1896af2e --- /dev/null +++ b/pkg/query-service/app/metrics/query_builder.go @@ -0,0 +1,421 @@ +package metrics + +import ( + "fmt" + "reflect" + "strings" + + "github.com/SigNoz/govaluate" + "go.signoz.io/query-service/constants" + "go.signoz.io/query-service/model" +) + +type RunQueries struct { + Queries map[string]string + Err error +} + +var AggregateOperatorToPercentile = map[model.AggregateOperator]float64{ + model.P05: 0.5, + model.P10: 0.10, + model.P20: 0.20, + model.P25: 0.25, + model.P50: 0.50, + model.P75: 0.75, + model.P90: 0.90, + model.P95: 0.95, + model.P99: 0.99, +} + +var AggregateOperatorToSQLFunc = map[model.AggregateOperator]string{ + model.AVG: "avg", + model.MAX: "max", + model.MIN: "min", + model.SUM: "sum", + model.RATE_SUM: "sum", + model.RATE_AVG: "avg", + model.RATE_MAX: "max", + model.RATE_MIN: "min", +} + +var SupportedFunctions = []string{"exp", "log", "ln", "exp2", "log2", "exp10", "log10", "sqrt", "cbrt", "erf", "erfc", "lgamma", "tgamma", "sin", "cos", "tan", "asin", "acos", "atan", "degrees", "radians"} + +func GoValuateFuncs() map[string]govaluate.ExpressionFunction { + var GoValuateFuncs = map[string]govaluate.ExpressionFunction{} + for _, fn := range SupportedFunctions { + GoValuateFuncs[fn] = func(args ...interface{}) (interface{}, error) { + return nil, nil + } + } + return GoValuateFuncs +} + +// formattedValue formats the value to be used in clickhouse query +func formattedValue(v interface{}) string { + switch x := v.(type) { + case int: + return fmt.Sprintf("%d", x) + case float32, float64: + return fmt.Sprintf("%f", x) + case string: + return fmt.Sprintf("'%s'", x) + case bool: + return fmt.Sprintf("%v", x) + case []interface{}: + switch x[0].(type) { + case string: + str := "[" + for idx, sVal := range x { + str += fmt.Sprintf("'%s'", sVal) + if idx != len(x)-1 { + str += "," + } + } + str += "]" + return str + case int, float32, float64, bool: + return strings.Join(strings.Fields(fmt.Sprint(x)), ",") + } + return "" + default: + // may be log the warning here? + return "" + } +} + +// BuildMetricsTimeSeriesFilterQuery builds the sub-query to be used for filtering +// timeseries based on search criteria +func BuildMetricsTimeSeriesFilterQuery(fs *model.FilterSet, groupTags []string, metricName string, aggregateOperator model.AggregateOperator) (string, error) { + var conditions []string + conditions = append(conditions, fmt.Sprintf("metric_name = %s", formattedValue(metricName))) + if fs != nil && len(fs.Items) != 0 { + for _, item := range fs.Items { + toFormat := item.Value + // if the received value is an array for like/match op, just take the first value + if strings.ToLower(item.Operation) == "like" || + strings.ToLower(item.Operation) == "match" || + strings.ToLower(item.Operation) == "nlike" { + x, ok := item.Value.([]interface{}) + if ok { + if len(x) == 0 { + continue + } + toFormat = x[0] + } + } + fmtVal := formattedValue(toFormat) + switch op := strings.ToLower(item.Operation); op { + case "eq": + conditions = append(conditions, fmt.Sprintf("labels_object.%s = %s", item.Key, fmtVal)) + case "neq": + conditions = append(conditions, fmt.Sprintf("labels_object.%s != %s", item.Key, fmtVal)) + case "in": + conditions = append(conditions, fmt.Sprintf("labels_object.%s IN %s", item.Key, fmtVal)) + case "nin": + conditions = append(conditions, fmt.Sprintf("labels_object.%s NOT IN %s", item.Key, fmtVal)) + case "like": + conditions = append(conditions, fmt.Sprintf("like(labels_object.%s, %s)", item.Key, fmtVal)) + case "nlike": + conditions = append(conditions, fmt.Sprintf("notLike(labels_object.%s, %s)", item.Key, fmtVal)) + case "match": + conditions = append(conditions, fmt.Sprintf("match(labels_object.%s, %s)", item.Key, fmtVal)) + default: + return "", fmt.Errorf("unsupported operation") + } + } + } + queryString := strings.Join(conditions, " AND ") + + var selectLabels string + if aggregateOperator == model.NOOP || aggregateOperator == model.RATE { + selectLabels = "labels," + } else { + for _, tag := range groupTags { + selectLabels += fmt.Sprintf(" labels_object.%s as %s,", tag, tag) + } + } + + filterSubQuery := fmt.Sprintf("SELECT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_TABLENAME, queryString) + + return filterSubQuery, nil +} + +func BuildMetricQuery(qp *model.QueryRangeParamsV2, mq *model.MetricQuery, tableName string) (string, error) { + + if qp.CompositeMetricQuery.PanelType == model.QUERY_VALUE && len(mq.GroupingTags) != 0 { + return "", fmt.Errorf("reduce operator cannot be applied for the query") + } + + filterSubQuery, err := BuildMetricsTimeSeriesFilterQuery(mq.TagFilters, mq.GroupingTags, mq.MetricName, mq.AggregateOperator) + if err != nil { + return "", err + } + + samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", formattedValue(mq.MetricName), qp.Start, qp.End) + + // Select the aggregate value for interval + queryTmpl := + "SELECT %s" + + " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " %s as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableTimeFilter + + " GROUP BY %s" + + " ORDER BY %s ts" + + groupBy := groupBy(mq.GroupingTags...) + groupTags := groupSelect(mq.GroupingTags...) + + switch mq.AggregateOperator { + case model.RATE: + // Calculate rate of change of metric for each unique time series + groupBy = "fingerprint, ts" + groupTags = "fingerprint," + op := "max(value)" // max value should be the closest value for point in time + subQuery := fmt.Sprintf( + queryTmpl, "any(labels) as labels, "+groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags, + ) // labels will be same so any should be fine + query := `SELECT %s ts, runningDifference(value)/runningDifference(ts) as value FROM(%s)` + + query = fmt.Sprintf(query, "labels as fullLabels,", subQuery) + return query, nil + case model.SUM_RATE: + rateGroupBy := "fingerprint, " + groupBy + rateGroupTags := "fingerprint, " + groupTags + op := "max(value)" + subQuery := fmt.Sprintf( + queryTmpl, rateGroupTags, qp.Step, op, filterSubQuery, rateGroupBy, rateGroupTags, + ) // labels will be same so any should be fine + query := `SELECT %s ts, runningDifference(value)/runningDifference(ts) as value FROM(%s) OFFSET 1` + query = fmt.Sprintf(query, groupTags, subQuery) + query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, query, groupBy, groupTags) + return query, nil + case model.RATE_SUM, model.RATE_MAX, model.RATE_AVG, model.RATE_MIN: + op := fmt.Sprintf("%s(value)", AggregateOperatorToSQLFunc[mq.AggregateOperator]) + subQuery := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags) + query := `SELECT %s ts, runningDifference(value)/runningDifference(ts) as value FROM(%s) OFFSET 1` + query = fmt.Sprintf(query, groupTags, subQuery) + return query, nil + case model.P05, model.P10, model.P20, model.P25, model.P50, model.P75, model.P90, model.P95, model.P99: + op := fmt.Sprintf("quantile(%v)(value)", AggregateOperatorToPercentile[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags) + return query, nil + case model.AVG, model.SUM, model.MIN, model.MAX: + op := fmt.Sprintf("%s(value)", AggregateOperatorToSQLFunc[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags) + return query, nil + case model.COUNT: + op := "toFloat64(count(*))" + query := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags) + return query, nil + case model.COUNT_DISTINCT: + op := "toFloat64(count(distinct(value)))" + query := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags) + return query, nil + case model.NOOP: + queryTmpl := + "SELECT fingerprint, labels as fullLabels," + + " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " any(value) as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableTimeFilter + + " GROUP BY fingerprint, labels, ts" + + " ORDER BY fingerprint, labels, ts" + query := fmt.Sprintf(queryTmpl, qp.Step, filterSubQuery) + return query, nil + default: + return "", fmt.Errorf("unsupported aggregate operator") + } +} + +func groupBy(tags ...string) string { + tags = append(tags, "ts") + return strings.Join(tags, ",") +} + +func groupSelect(tags ...string) string { + groupTags := strings.Join(tags, ",") + if len(tags) != 0 { + groupTags += ", " + } + return groupTags +} + +// validateExpressions validates the math expressions using the list of +// allowed functions. +func validateExpressions(expressions []string, funcs map[string]govaluate.ExpressionFunction) []error { + var errs []error + for _, exp := range expressions { + _, err := govaluate.NewEvaluableExpressionWithFunctions(exp, funcs) + if err != nil { + errs = append(errs, err) + } + } + return errs +} + +// FormatErrs returns formatted error string +func FormatErrs(errs []error, separator string) string { + var errStrs []string + for _, err := range errs { + errStrs = append(errStrs, err.Error()) + } + return strings.Join(errStrs, separator) +} + +func reduceQuery(query string, reduceTo model.ReduceToOperator, aggregateOperator model.AggregateOperator) (string, error) { + var selectLabels string + var groupBy string + // NOOP and RATE can possibly return multiple time series and reduce should be applied + // for each uniques series. When the final result contains more than one series we throw + // an error post DB fetching. Otherwise just return the single data. This is not known until queried so the + // the query is prepared accordingly. + if aggregateOperator == model.NOOP || aggregateOperator == model.RATE { + selectLabels = ", any(fullLabels) as fullLabels" + groupBy = "GROUP BY fingerprint" + } + // the timestamp picked is not relevant here since the final value used is show the single + // chart with just the query value. For the quer + switch reduceTo { + case model.RLAST: + query = fmt.Sprintf("SELECT anyLast(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy) + case model.RSUM: + query = fmt.Sprintf("SELECT sum(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy) + case model.RAVG: + query = fmt.Sprintf("SELECT avg(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy) + case model.RMAX: + query = fmt.Sprintf("SELECT max(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy) + case model.RMIN: + query = fmt.Sprintf("SELECT min(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy) + default: + return "", fmt.Errorf("unsupported reduce operator") + } + return query, nil +} + +// varToQuery constructs the query for each named builder block +func varToQuery(qp *model.QueryRangeParamsV2, tableName string) (map[string]string, error) { + evalFuncs := GoValuateFuncs() + varToQuery := make(map[string]string) + for _, builderQuery := range qp.CompositeMetricQuery.BuilderQueries { + expression, _ := govaluate.NewEvaluableExpressionWithFunctions(builderQuery.Expression, evalFuncs) + + // Use the parsed expression and build the query for each variable + // if not already exists + var errs []error + for _, _var := range expression.Vars() { + if _, ok := varToQuery[_var]; !ok { + mq := qp.CompositeMetricQuery.BuilderQueries[_var] + query, err := BuildMetricQuery(qp, mq, tableName) + if err != nil { + errs = append(errs, err) + } else { + if qp.CompositeMetricQuery.PanelType == model.QUERY_VALUE { + query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator) + if err != nil { + errs = append(errs, err) + } + } + } + varToQuery[_var] = query + } + } + if len(errs) != 0 { + return nil, fmt.Errorf("error while creating query: %s", FormatErrs(errs, "\n")) + } + } + return varToQuery, nil +} + +// expressionToQuery constructs the query for the expression +func expressionToQuery(qp *model.QueryRangeParamsV2, varToQuery map[string]string, expression *govaluate.EvaluableExpression) (string, error) { + var formulaQuery string + vars := expression.Vars() + for idx, var_ := range vars[1:] { + x, y := vars[idx], var_ + if !reflect.DeepEqual(qp.CompositeMetricQuery.BuilderQueries[x].GroupingTags, qp.CompositeMetricQuery.BuilderQueries[y].GroupingTags) { + return "", fmt.Errorf("group by must be same") + } + } + var modified []govaluate.ExpressionToken + tokens := expression.Tokens() + for idx := range tokens { + token := tokens[idx] + if token.Kind == govaluate.VARIABLE { + token.Value = fmt.Sprintf("%v.value", token.Value) + token.Meta = fmt.Sprintf("%v.value", token.Meta) + } + modified = append(modified, token) + } + formula, _ := govaluate.NewEvaluableExpressionFromTokens(modified) + + var formulaSubQuery string + var joinUsing string + for idx, var_ := range vars { + query := varToQuery[var_] + groupTags := qp.CompositeMetricQuery.BuilderQueries[var_].GroupingTags + groupTags = append(groupTags, "ts") + joinUsing = strings.Join(groupTags, ",") + formulaSubQuery += fmt.Sprintf("(%s) as %s ", query, var_) + if idx < len(vars)-1 { + formulaSubQuery += "INNER JOIN" + } else if len(vars) > 1 { + formulaSubQuery += fmt.Sprintf("USING (%s)", joinUsing) + } + } + formulaQuery = fmt.Sprintf("SELECT %s, %s as value FROM ", joinUsing, formula.ExpressionString()) + formulaSubQuery + return formulaQuery, nil +} + +// PrepareBuilderMetricQueries constructs the queries to be run for query range timeseries +func PrepareBuilderMetricQueries(qp *model.QueryRangeParamsV2, tableName string) *RunQueries { + evalFuncs := GoValuateFuncs() + + // validate the expressions + var expressions []string + for _, bq := range qp.CompositeMetricQuery.BuilderQueries { + expressions = append(expressions, bq.Expression) + } + if errs := validateExpressions(expressions, evalFuncs); len(errs) != 0 { + return &RunQueries{Err: fmt.Errorf("invalid expressions: %s", FormatErrs(errs, "\n"))} + } + + varToQuery, err := varToQuery(qp, tableName) + if err != nil { + return &RunQueries{Err: err} + } + + namedQueries := make(map[string]string) + + var errs []error + for _, builderQuery := range qp.CompositeMetricQuery.BuilderQueries { + if builderQuery.Disabled { + continue + } + expression, _ := govaluate.NewEvaluableExpressionWithFunctions(builderQuery.Expression, evalFuncs) + tokens := expression.Tokens() + // expression with one token is used to represent + // that there are no functions applied on query + if len(tokens) == 1 { + _var := tokens[0].Value.(string) + namedQueries[builderQuery.QueryName] = varToQuery[_var] + } else { + query, err := expressionToQuery(qp, varToQuery, expression) + if err != nil { + errs = append(errs, err) + } + namedQueries[builderQuery.QueryName] = query + } + } + if len(errs) != 0 { + return &RunQueries{Err: fmt.Errorf("errors with formulas: %s", FormatErrs(errs, "\n"))} + } + fmt.Println(namedQueries) + return &RunQueries{Queries: namedQueries} +} diff --git a/pkg/query-service/app/metrics/query_builder_test.go b/pkg/query-service/app/metrics/query_builder_test.go new file mode 100644 index 0000000000..4530a01a79 --- /dev/null +++ b/pkg/query-service/app/metrics/query_builder_test.go @@ -0,0 +1,130 @@ +package metrics + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + "go.signoz.io/query-service/model" +) + +func TestBuildQuery(t *testing.T) { + Convey("TestSimpleQueryWithName", t, func() { + q := &model.QueryRangeParamsV2{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeMetricQuery: &model.CompositeMetricQuery{ + BuilderQueries: map[string]*model.MetricQuery{ + "a": { + QueryName: "a", + MetricName: "name", + AggregateOperator: model.RATE_MAX, + Expression: "a", + }, + }, + }, + } + queries := PrepareBuilderMetricQueries(q, "table").Queries + So(len(queries), ShouldEqual, 1) + So(queries["a"], ShouldContainSubstring, "WHERE metric_name = 'name'") + So(queries["a"], ShouldContainSubstring, "runningDifference(value)/runningDifference(ts)") + }) +} + +func TestBuildQueryWithFilters(t *testing.T) { + Convey("TestBuildQueryWithFilters", t, func() { + q := &model.QueryRangeParamsV2{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeMetricQuery: &model.CompositeMetricQuery{ + BuilderQueries: map[string]*model.MetricQuery{ + "a": { + QueryName: "a", + MetricName: "name", + TagFilters: &model.FilterSet{Operation: "AND", Items: []model.FilterItem{ + {Key: "a", Value: "b", Operation: "neq"}, + }}, + AggregateOperator: model.RATE_MAX, + Expression: "a", + }, + }, + }, + } + queries := PrepareBuilderMetricQueries(q, "table").Queries + So(len(queries), ShouldEqual, 1) + + So(queries["a"], ShouldContainSubstring, "WHERE metric_name = 'name' AND labels_object.a != 'b'") + So(queries["a"], ShouldContainSubstring, "runningDifference(value)/runningDifference(ts)") + }) +} + +func TestBuildQueryWithMultipleQueries(t *testing.T) { + Convey("TestBuildQueryWithFilters", t, func() { + q := &model.QueryRangeParamsV2{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeMetricQuery: &model.CompositeMetricQuery{ + BuilderQueries: map[string]*model.MetricQuery{ + "a": { + QueryName: "a", + MetricName: "name", + TagFilters: &model.FilterSet{Operation: "AND", Items: []model.FilterItem{ + {Key: "in", Value: []interface{}{"a", "b", "c"}, Operation: "in"}, + }}, + AggregateOperator: model.RATE_AVG, + Expression: "a", + }, + "b": { + QueryName: "b", + MetricName: "name2", + AggregateOperator: model.RATE_MAX, + Expression: "b", + }, + }, + }, + } + queries := PrepareBuilderMetricQueries(q, "table").Queries + So(len(queries), ShouldEqual, 2) + So(queries["a"], ShouldContainSubstring, "WHERE metric_name = 'name' AND labels_object.in IN ['a','b','c']") + So(queries["a"], ShouldContainSubstring, "runningDifference(value)/runningDifference(ts)") + }) +} + +func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) { + Convey("TestBuildQueryWithFilters", t, func() { + q := &model.QueryRangeParamsV2{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeMetricQuery: &model.CompositeMetricQuery{ + BuilderQueries: map[string]*model.MetricQuery{ + "a": { + QueryName: "a", + MetricName: "name", + TagFilters: &model.FilterSet{Operation: "AND", Items: []model.FilterItem{ + {Key: "in", Value: []interface{}{"a", "b", "c"}, Operation: "in"}, + }}, + AggregateOperator: model.RATE_MAX, + Expression: "a", + }, + "b": { + MetricName: "name2", + AggregateOperator: model.RATE_AVG, + Expression: "b", + }, + "c": { + QueryName: "c", + Expression: "a/b", + }, + }, + }, + } + queries := PrepareBuilderMetricQueries(q, "table").Queries + So(len(queries), ShouldEqual, 3) + So(queries["c"], ShouldContainSubstring, "SELECT ts, a.value / b.value") + So(queries["c"], ShouldContainSubstring, "WHERE metric_name = 'name' AND labels_object.in IN ['a','b','c']") + So(queries["c"], ShouldContainSubstring, "runningDifference(value)/runningDifference(ts)") + }) +} diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 2dd6e44c52..9d3705da9f 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -667,3 +667,12 @@ func parseChangePasswordRequest(r *http.Request) (*model.ChangePasswordRequest, return &req, nil } + +func parseFilterSet(r *http.Request) (*model.FilterSet, error) { + var filterSet model.FilterSet + err := json.NewDecoder(r.Body).Decode(&filterSet) + if err != nil { + return nil, err + } + return &filterSet, nil +} diff --git a/pkg/query-service/app/parser/metrics.go b/pkg/query-service/app/parser/metrics.go index 279331ba3c..ce4d079fa5 100644 --- a/pkg/query-service/app/parser/metrics.go +++ b/pkg/query-service/app/parser/metrics.go @@ -5,19 +5,39 @@ import ( "fmt" "net/http" + "go.signoz.io/query-service/app/metrics" "go.signoz.io/query-service/model" ) +func validateQueryRangeParamsV2(qp *model.QueryRangeParamsV2) error { + var errs []error + if !(qp.DataSource >= model.METRICS && qp.DataSource <= model.LOGS) { + errs = append(errs, fmt.Errorf("unsupported data source")) + } + if !(qp.CompositeMetricQuery.QueryType >= model.QUERY_BUILDER && qp.CompositeMetricQuery.QueryType <= model.PROM) { + errs = append(errs, fmt.Errorf("unsupported query type")) + } + if !(qp.CompositeMetricQuery.PanelType >= model.TIME_SERIES && qp.CompositeMetricQuery.PanelType <= model.QUERY_VALUE) { + errs = append(errs, fmt.Errorf("unsupported panel type")) + } + if len(errs) != 0 { + return fmt.Errorf("one or more errors found : %s", metrics.FormatErrs(errs, ",")) + } + return nil +} + func ParseMetricQueryRangeParams(r *http.Request) (*model.QueryRangeParamsV2, *model.ApiError) { var postData *model.QueryRangeParamsV2 - err := json.NewDecoder(r.Body).Decode(&postData) - if err != nil { + if err := json.NewDecoder(r.Body).Decode(&postData); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} + } + if err := validateQueryRangeParamsV2(postData); err != nil { return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} } - return nil, nil + return postData, nil } func ParseMetricAutocompleteTagParams(r *http.Request) (*model.MetricAutocompleteTagParams, *model.ApiError) { diff --git a/pkg/query-service/app/parser_test.go b/pkg/query-service/app/parser_test.go new file mode 100644 index 0000000000..6fa049a05e --- /dev/null +++ b/pkg/query-service/app/parser_test.go @@ -0,0 +1,60 @@ +package app + +import ( + "bytes" + "net/http" + "testing" + + "github.com/smartystreets/assertions/should" + + . "github.com/smartystreets/goconvey/convey" + "go.signoz.io/query-service/app/metrics" + "go.signoz.io/query-service/model" +) + +func TestParseFilterSingleFilter(t *testing.T) { + Convey("TestParseFilterSingleFilter", t, func() { + postBody := []byte(`{ + "op": "AND", + "items": [ + {"key": "namespace", "value": "a", "op": "EQ"} + ] + }`) + req, _ := http.NewRequest("POST", "", bytes.NewReader(postBody)) + res, _ := parseFilterSet(req) + query, _ := metrics.BuildMetricsTimeSeriesFilterQuery(res, []string{}, "table", model.NOOP) + So(query, ShouldContainSubstring, "signoz_metrics.time_series_v2 WHERE metric_name = 'table' AND labels_object.namespace = 'a'") + }) +} + +func TestParseFilterMultipleFilter(t *testing.T) { + Convey("TestParseFilterMultipleFilter", t, func() { + postBody := []byte(`{ + "op": "AND", + "items": [ + {"key": "namespace", "value": "a", "op": "EQ"}, + {"key": "host", "value": ["host-1", "host-2"], "op": "IN"} + ] + }`) + req, _ := http.NewRequest("POST", "", bytes.NewReader(postBody)) + res, _ := parseFilterSet(req) + query, _ := metrics.BuildMetricsTimeSeriesFilterQuery(res, []string{}, "table", model.NOOP) + So(query, should.ContainSubstring, "labels_object.host IN ['host-1','host-2']") + So(query, should.ContainSubstring, "labels_object.namespace = 'a'") + }) +} + +func TestParseFilterNotSupportedOp(t *testing.T) { + Convey("TestParseFilterNotSupportedOp", t, func() { + postBody := []byte(`{ + "op": "AND", + "items": [ + {"key": "namespace", "value": "a", "op": "PO"} + ] + }`) + req, _ := http.NewRequest("POST", "", bytes.NewReader(postBody)) + res, _ := parseFilterSet(req) + _, err := metrics.BuildMetricsTimeSeriesFilterQuery(res, []string{}, "table", model.NOOP) + So(err, should.BeError, "unsupported operation") + }) +} diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 2825ce31ea..b4bc4b08ef 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -62,6 +62,11 @@ const ( StatusFailed = "failed" StatusSuccess = "success" ) +const ( + SIGNOZ_METRIC_DBNAME = "signoz_metrics" + SIGNOZ_SAMPLES_TABLENAME = "samples_v2" + SIGNOZ_TIMESERIES_TABLENAME = "time_series_v2" +) func GetOrDefaultEnv(key string, fallback string) string { v := os.Getenv(key) diff --git a/pkg/query-service/go.mod b/pkg/query-service/go.mod index b101f18da5..6bd0e68ead 100644 --- a/pkg/query-service/go.mod +++ b/pkg/query-service/go.mod @@ -4,6 +4,7 @@ go 1.17 require ( github.com/ClickHouse/clickhouse-go/v2 v2.0.12 + github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb github.com/go-kit/log v0.1.0 github.com/google/uuid v1.3.0 github.com/gorilla/handlers v1.5.1 @@ -107,7 +108,7 @@ require ( github.com/segmentio/backo-go v1.0.0 // indirect github.com/shopspring/decimal v1.3.1 // indirect github.com/sirupsen/logrus v1.8.1 // indirect - github.com/smartystreets/assertions v1.1.0 // indirect + github.com/smartystreets/assertions v1.1.0 github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/pflag v1.0.3 // indirect github.com/stretchr/testify v1.7.1 @@ -139,4 +140,4 @@ require ( k8s.io/client-go v8.0.0+incompatible // indirect ) -replace github.com/prometheus/prometheus => github.com/SigNoz/prometheus v1.9.70 +replace github.com/prometheus/prometheus => github.com/SigNoz/prometheus v1.9.71 diff --git a/pkg/query-service/go.sum b/pkg/query-service/go.sum index 77fd087ceb..72675da497 100644 --- a/pkg/query-service/go.sum +++ b/pkg/query-service/go.sum @@ -55,8 +55,10 @@ github.com/ClickHouse/clickhouse-go/v2 v2.0.12/go.mod h1:u4RoNQLLM2W6hNSPYrIESLJ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= -github.com/SigNoz/prometheus v1.9.70 h1:0214i78cje5MkX0tXYwX2cK4cHXrFw18WcSLhv4YDpk= -github.com/SigNoz/prometheus v1.9.70/go.mod h1:Y4J9tGDmacMC+EcOTp+EIAn2C1sN+9kE+idyVKadiVM= +github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb h1:bneLSKPf9YUSFmafKx32bynV6QrzViL/s+ZDvQxH1E4= +github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb/go.mod h1:JznGDNg9x1cujDKa22RaQOimOvvEfy3nxzDGd8XDgmA= +github.com/SigNoz/prometheus v1.9.71 h1:X+6f4k5bqX+lpPFHCi+f6XiSehTj3Yzh1B/FDJi//Sk= +github.com/SigNoz/prometheus v1.9.71/go.mod h1:Y4J9tGDmacMC+EcOTp+EIAn2C1sN+9kE+idyVKadiVM= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 9cc1ce235c..d1763a0440 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -18,27 +18,114 @@ type QueryRangeParams struct { Stats string } -type Query struct { - Datasource string `json:"datasource"` - Format string `json:"format"` - Expr string `json:"expr"` +type MetricQuery struct { + QueryName string `json:"queryName"` + MetricName string `json:"metricName"` + TagFilters *FilterSet `json:"tagFilters,omitempty"` + GroupingTags []string `json:"groupBy,omitempty"` + AggregateOperator AggregateOperator `json:"aggregateOperator"` + Expression string `json:"expression"` + Disabled bool `json:"disabled"` + ReduceTo ReduceToOperator `json:"reduceTo,omitempty"` } +type ReduceToOperator int + +const ( + _ ReduceToOperator = iota + RLAST + RSUM + RAVG + RMAX + RMIN +) + +type QueryType int + +const ( + _ QueryType = iota + QUERY_BUILDER + CLICKHOUSE + PROM +) + +type PromQuery struct { + Query string `json:"query"` + Stats string `json:"stats,omitempty"` + Disabled bool `json:"disabled"` +} + +type ClickHouseQuery struct { + Query string `json:"query"` + Disabled bool `json:"disabled"` +} + +type PanelType int + +const ( + _ PanelType = iota + TIME_SERIES + QUERY_VALUE +) + +type CompositeMetricQuery struct { + BuilderQueries map[string]*MetricQuery `json:"builderQueries,omitempty"` + ClickHouseQueries map[string]*ClickHouseQuery `json:"chQueries,omitempty"` + PromQueries map[string]*PromQuery `json:"promQueries,omitempty"` + PanelType PanelType `json:"panelType"` + QueryType QueryType `json:"queryType"` +} + +type AggregateOperator int + +const ( + _ AggregateOperator = iota + NOOP + COUNT + COUNT_DISTINCT + SUM + AVG + MAX + MIN + P05 + P10 + P20 + P25 + P50 + P75 + P90 + P95 + P99 + RATE + SUM_RATE + // leave blank space for possily {AVG, X}_RATE + _ + _ + _ + RATE_SUM + RATE_AVG + RATE_MAX + RATE_MIN +) + +type DataSource int + +const ( + _ DataSource = iota + METRICS + TRACES + LOGS +) + type QueryRangeParamsV2 struct { - Start time.Time - End time.Time - Step time.Duration - StartStr string `json:"start"` - EndStr string `json:"end"` - StepStr string `json:"step"` - Queries []Query `json:"queries"` -} - -func (params QueryRangeParamsV2) sanitizeAndValidate() (*QueryRangeParamsV2, error) { - - return nil, nil + DataSource DataSource `json:"dataSource"` + Start int64 `json:"start"` + End int64 `json:"end"` + Step int64 `json:"step"` + CompositeMetricQuery *CompositeMetricQuery `json:"compositeMetricQuery"` } +// Metric auto complete types type metricTags map[string]string type MetricAutocompleteTagParams struct { @@ -192,7 +279,7 @@ type TTLParams struct { } type GetTTLParams struct { - Type string + Type string } type GetErrorsParams struct { @@ -205,3 +292,19 @@ type GetErrorParams struct { ErrorID string ServiceName string } + +type FilterItem struct { + Key string `json:"key"` + Value interface{} `json:"value"` + Operation string `json:"op"` +} + +type FilterSet struct { + Operation string `json:"op,omitempty"` + Items []FilterItem `json:"items"` +} + +type RemoveTTLParams struct { + Type string + RemoveAllTTL bool +} diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 358fe6e979..523ad7e96e 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -356,3 +356,20 @@ type ErrorWithSpan struct { NewerErrorID string `json:"newerErrorId" ch:"newerErrorId"` OlderErrorID string `json:"olderErrorId" ch:"olderErrorId"` } + +type Series struct { + QueryName string `json:"queryName"` + Labels map[string]string `json:"metric"` + Points []MetricPoint `json:"values"` +} + +type MetricPoint struct { + Timestamp int64 + Value float64 +} + +// MarshalJSON implements json.Marshaler. +func (p *MetricPoint) MarshalJSON() ([]byte, error) { + v := strconv.FormatFloat(p.Value, 'f', -1, 64) + return json.Marshal([...]interface{}{float64(p.Timestamp) / 1000, v}) +}