diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 23e9f6a9fa..d3ebee0492 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -3767,6 +3767,145 @@ func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3 return &attributeValues, nil } +func readRow(vars []interface{}, columnNames []string) ([]string, map[string]string, v3.Point) { + // Each row will have a value and a timestamp, and an optional list of label values + // example: {Timestamp: ..., Value: ...} + // The timestamp may also not present in some cases where the time series is reduced to single value + var point v3.Point + + // groupBy is a container to hold label values for the current point + // example: ["frontend", "/fetch"] + var groupBy []string + + // groupAttributes is a container to hold the key-value pairs for the current + // metric point. + // example: {"serviceName": "frontend", "operation": "/fetch"} + groupAttributes := make(map[string]string) + + for idx, v := range vars { + colName := columnNames[idx] + switch v := v.(type) { + case *string: + // special case for returning all labels in metrics datasource + if colName == "fullLabels" { + var metric map[string]string + err := json.Unmarshal([]byte(*v), &metric) + if err != nil { + zap.S().Errorf("unexpected error encountered %v", err) + } + for key, val := range metric { + groupBy = append(groupBy, val) + groupAttributes[key] = val + } + } else { + groupBy = append(groupBy, *v) + groupAttributes[colName] = *v + } + case *time.Time: + point.Timestamp = v.UnixMilli() + case *float64, *float32: + if _, ok := constants.ReservedColumnTargetAliases[colName]; ok { + point.Value = float64(reflect.ValueOf(v).Elem().Float()) + } else { + groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Float())) + groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Float()) + } + case *uint8, *uint64, *uint16, *uint32: + if _, ok := constants.ReservedColumnTargetAliases[colName]; ok { + point.Value = float64(reflect.ValueOf(v).Elem().Uint()) + } else { + groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint())) + groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Uint()) + } + case *int8, *int16, *int32, *int64: + if _, ok := constants.ReservedColumnTargetAliases[colName]; ok { + point.Value = float64(reflect.ValueOf(v).Elem().Int()) + } else { + groupBy = append(groupBy, fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int())) + groupAttributes[colName] = fmt.Sprintf("%v", reflect.ValueOf(v).Elem().Int()) + } + default: + zap.S().Errorf("unsupported var type %v found in metric builder query result for column %s", v, colName) + } + } + return groupBy, groupAttributes, point +} + +func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNames []string) ([]*v3.Series, error) { + // when groupBy is applied, each combination of cartesian product + // of attribute values is a separate series. Each item in seriesToPoints + // represent a unique series where the key is sorted attribute values joined + // by "," and the value is the list of points for that series + + // For instance, group by (serviceName, operation) + // with two services and three operations in each will result in (maximum of) 6 series + // ("frontend", "order") x ("/fetch", "/fetch/{Id}", "/order") + // + // ("frontend", "/fetch") + // ("frontend", "/fetch/{Id}") + // ("frontend", "/order") + // ("order", "/fetch") + // ("order", "/fetch/{Id}") + // ("order", "/order") + seriesToPoints := make(map[string][]v3.Point) + + // seriesToAttrs is a mapping of key to a map of attribute key to attribute value + // for each series. This is used to populate the series' attributes + // For instance, for the above example, the seriesToAttrs will be + // { + // "frontend,/fetch": {"serviceName": "frontend", "operation": "/fetch"}, + // "frontend,/fetch/{Id}": {"serviceName": "frontend", "operation": "/fetch/{Id}"}, + // "frontend,/order": {"serviceName": "frontend", "operation": "/order"}, + // "order,/fetch": {"serviceName": "order", "operation": "/fetch"}, + // "order,/fetch/{Id}": {"serviceName": "order", "operation": "/fetch/{Id}"}, + // "order,/order": {"serviceName": "order", "operation": "/order"}, + // } + seriesToAttrs := make(map[string]map[string]string) + + for rows.Next() { + if err := rows.Scan(vars...); err != nil { + return nil, err + } + groupBy, groupAttributes, metricPoint := readRow(vars, columnNames) + sort.Strings(groupBy) + key := strings.Join(groupBy, "") + seriesToAttrs[key] = groupAttributes + seriesToPoints[key] = append(seriesToPoints[key], metricPoint) + } + + var seriesList []*v3.Series + for key := range seriesToPoints { + series := v3.Series{Labels: seriesToAttrs[key], Points: seriesToPoints[key]} + seriesList = append(seriesList, &series) + } + return seriesList, nil +} + +// GetTimeSeriesResultV3 runs the query and returns list of time series +func (r *ClickHouseReader) GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error) { + + defer utils.Elapsed("GetTimeSeriesResultV3", query)() + + rows, err := r.db.Query(ctx, query) + + if err != nil { + zap.S().Errorf("error while reading time series result %v", err) + return nil, err + } + defer rows.Close() + + var ( + columnTypes = rows.ColumnTypes() + columnNames = rows.Columns() + vars = make([]interface{}, len(columnTypes)) + ) + for i := range columnTypes { + vars[i] = reflect.New(columnTypes[i].ScanType()).Interface() + } + + return readRowsForTimeSeriesResult(rows, vars, columnNames) +} + func (r *ClickHouseReader) CheckClickHouse(ctx context.Context) error { rows, err := r.db.Query(ctx, "SELECT 1") if err != nil { diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 62ebeeeedf..7a6097638a 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -22,6 +22,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/explorer" "go.signoz.io/signoz/pkg/query-service/app/logs" "go.signoz.io/signoz/pkg/query-service/app/metrics" + metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" "go.signoz.io/signoz/pkg/query-service/app/parser" "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/constants" @@ -36,6 +37,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/rules" "go.signoz.io/signoz/pkg/query-service/telemetry" "go.signoz.io/signoz/pkg/query-service/version" + "go.uber.org/multierr" "go.uber.org/zap" ) @@ -63,6 +65,7 @@ type APIHandler struct { ruleManager *rules.Manager featureFlags interfaces.FeatureLookup ready func(http.HandlerFunc) http.HandlerFunc + queryBuilder *queryBuilder // SetupCompleted indicates if SigNoz is ready for general use. // at the moment, we mark the app ready when the first user @@ -101,6 +104,17 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { featureFlags: opts.FeatureFlags, } + builderOpts := queryBuilderOptions{ + BuildMetricQuery: metricsv3.PrepareMetricQuery, + BuildTraceQuery: func(start, end, step int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error) { + return "", errors.New("not implemented") + }, + BuildLogQuery: func(start, end, step int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error) { + return "", errors.New("not implemented") + }, + } + aH.queryBuilder = NewQueryBuilder(builderOpts) + aH.ready = aH.testReady dashboards.LoadDashboardFiles() @@ -244,6 +258,7 @@ func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMid subRouter.HandleFunc("/autocomplete/aggregate_attributes", am.ViewAccess(aH.autocompleteAggregateAttributes)).Methods(http.MethodGet) subRouter.HandleFunc("/autocomplete/attribute_keys", am.ViewAccess(aH.autoCompleteAttributeKeys)).Methods(http.MethodGet) subRouter.HandleFunc("/autocomplete/attribute_values", am.ViewAccess(aH.autoCompleteAttributeValues)).Methods(http.MethodGet) + subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeV3)).Methods(http.MethodPost) } func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) { @@ -2424,3 +2439,186 @@ func (aH *APIHandler) autoCompleteAttributeValues(w http.ResponseWriter, r *http aH.Respond(w, response) } + +func (aH *APIHandler) execClickHouseQueries(ctx context.Context, queries map[string]string) ([]*v3.Result, error, map[string]string) { + type channelResult struct { + Series []*v3.Series + Err error + Name string + Query string + } + + ch := make(chan channelResult, len(queries)) + var wg sync.WaitGroup + + for name, query := range queries { + wg.Add(1) + go func(name, query string) { + defer wg.Done() + seriesList, err := aH.reader.GetTimeSeriesResultV3(ctx, query) + + if err != nil { + ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query} + return + } + ch <- channelResult{Series: seriesList, Name: name, Query: query} + }(name, query) + } + + wg.Wait() + close(ch) + + var errs []error + errQuriesByName := make(map[string]string) + res := make([]*v3.Result, 0) + // read values from the channel + for r := range ch { + if r.Err != nil { + errs = append(errs, r.Err) + errQuriesByName[r.Name] = r.Query + continue + } + res = append(res, &v3.Result{ + QueryName: r.Name, + Series: r.Series, + }) + } + if len(errs) != 0 { + return nil, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)), errQuriesByName + } + return res, nil, nil +} + +func (aH *APIHandler) execPromQueries(ctx context.Context, metricsQueryRangeParams *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]string) { + type channelResult struct { + Series []*v3.Series + Err error + Name string + Query string + } + + ch := make(chan channelResult, len(metricsQueryRangeParams.CompositeQuery.PromQueries)) + var wg sync.WaitGroup + + for name, query := range metricsQueryRangeParams.CompositeQuery.PromQueries { + if query.Disabled { + continue + } + wg.Add(1) + go func(name string, query *v3.PromQuery) { + var seriesList []*v3.Series + defer wg.Done() + tmpl := template.New("promql-query") + tmpl, tmplErr := tmpl.Parse(query.Query) + if tmplErr != nil { + ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query} + return + } + var queryBuf bytes.Buffer + tmplErr = tmpl.Execute(&queryBuf, metricsQueryRangeParams.Variables) + if tmplErr != nil { + ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query} + return + } + query.Query = queryBuf.String() + queryModel := model.QueryRangeParams{ + Start: time.UnixMilli(metricsQueryRangeParams.Start), + End: time.UnixMilli(metricsQueryRangeParams.End), + Step: time.Duration(metricsQueryRangeParams.Step * int64(time.Second)), + Query: query.Query, + } + promResult, _, err := aH.reader.GetQueryRangeResult(ctx, &queryModel) + if err != nil { + ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query.Query} + return + } + matrix, _ := promResult.Matrix() + for _, v := range matrix { + var s v3.Series + s.Labels = v.Metric.Copy().Map() + for _, p := range v.Points { + s.Points = append(s.Points, v3.Point{Timestamp: p.T, Value: p.V}) + } + seriesList = append(seriesList, &s) + } + ch <- channelResult{Series: seriesList, Name: name, Query: query.Query} + }(name, query) + } + + wg.Wait() + close(ch) + + var errs []error + errQuriesByName := make(map[string]string) + res := make([]*v3.Result, 0) + // read values from the channel + for r := range ch { + if r.Err != nil { + errs = append(errs, r.Err) + errQuriesByName[r.Name] = r.Query + continue + } + res = append(res, &v3.Result{ + QueryName: r.Name, + Series: r.Series, + }) + } + if len(errs) != 0 { + return nil, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)), errQuriesByName + } + return res, nil, nil +} + +func (aH *APIHandler) queryRangeV3(queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) { + + var result []*v3.Result + var err error + var errQuriesByName map[string]string + switch queryRangeParams.CompositeQuery.QueryType { + case v3.QueryTypeBuilder: + queries, err := aH.queryBuilder.prepareQueries(queryRangeParams) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + result, err, errQuriesByName = aH.execClickHouseQueries(r.Context(), queries) + case v3.QueryTypeClickHouseSQL: + queries := make(map[string]string) + for name, query := range queryRangeParams.CompositeQuery.ClickHouseQueries { + if query.Disabled { + continue + } + queries[name] = query.Query + } + result, err, errQuriesByName = aH.execClickHouseQueries(r.Context(), queries) + case v3.QueryTypePromQL: + result, err, errQuriesByName = aH.execPromQueries(r.Context(), queryRangeParams) + default: + err = fmt.Errorf("invalid query type") + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, errQuriesByName) + return + } + + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) { + queryRangeParams, apiErrorObj := ParseQueryRangeParams(r) + + if apiErrorObj != nil { + zap.S().Errorf(apiErrorObj.Err.Error()) + RespondError(w, apiErrorObj, nil) + return + } + + aH.queryRangeV3(queryRangeParams, w, r) +} diff --git a/pkg/query-service/app/metrics/v3/query_builder.go b/pkg/query-service/app/metrics/v3/query_builder.go new file mode 100644 index 0000000000..b26da0bc1f --- /dev/null +++ b/pkg/query-service/app/metrics/v3/query_builder.go @@ -0,0 +1,363 @@ +package v3 + +import ( + "fmt" + "strings" + + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{ + v3.AggregateOperatorP05: 0.5, + v3.AggregateOperatorP10: 0.10, + v3.AggregateOperatorP20: 0.20, + v3.AggregateOperatorP25: 0.25, + v3.AggregateOperatorP50: 0.50, + v3.AggregateOperatorP75: 0.75, + v3.AggregateOperatorP90: 0.90, + v3.AggregateOperatorP95: 0.95, + v3.AggregateOperatorP99: 0.99, + v3.AggregateOperatorHistQuant50: 0.50, + v3.AggregateOperatorHistQuant75: 0.75, + v3.AggregateOperatorHistQuant90: 0.90, + v3.AggregateOperatorHistQuant95: 0.95, + v3.AggregateOperatorHistQuant99: 0.99, +} + +var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{ + v3.AggregateOperatorAvg: "avg", + v3.AggregateOperatorMax: "max", + v3.AggregateOperatorMin: "min", + v3.AggregateOperatorSum: "sum", + v3.AggregateOperatorRateSum: "sum", + v3.AggregateOperatorRateAvg: "avg", + v3.AggregateOperatorRateMax: "max", + v3.AggregateOperatorRateMin: "min", +} + +// See https://github.com/SigNoz/signoz/issues/2151#issuecomment-1467249056 +var rateWithoutNegative = `if (runningDifference(value) < 0 OR runningDifference(ts) < 0, nan, runningDifference(value)/runningDifference(ts))` + +// buildMetricsTimeSeriesFilterQuery builds the sub-query to be used for filtering +// timeseries based on search criteria +func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.AttributeKey, metricName string, aggregateOperator v3.AggregateOperator) (string, error) { + var conditions []string + conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(metricName))) + + if fs != nil && len(fs.Items) != 0 { + for _, item := range fs.Items { + toFormat := item.Value + op := strings.ToLower(strings.TrimSpace(item.Operator)) + // if the received value is an array for like/match op, just take the first value + if op == "like" || op == "match" || op == "nlike" || op == "nmatch" { + x, ok := item.Value.([]interface{}) + if ok { + if len(x) == 0 { + continue + } + toFormat = x[0] + } + } + fmtVal := utils.ClickHouseFormattedValue(toFormat) + switch op { + case "eq": + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal)) + case "neq": + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal)) + case "in": + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal)) + case "nin": + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal)) + case "like": + conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case "nlike": + conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case "match": + conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case "nmatch": + conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case "gt": + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal)) + case "gte": + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal)) + case "lt": + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal)) + case "lte": + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal)) + case "contains": + conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case "ncontains": + conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case "exists": + conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), %s)", item.Key.Key)) + case "nexists": + conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), %s)", item.Key.Key)) + default: + return "", fmt.Errorf("unsupported operation") + } + } + } + queryString := strings.Join(conditions, " AND ") + + var selectLabels string + if aggregateOperator == v3.AggregateOperatorNoOp || aggregateOperator == v3.AggregateOperatorRate { + selectLabels = "labels," + } else { + for _, tag := range groupTags { + selectLabels += fmt.Sprintf(" JSONExtractString(labels, '%s') as %s,", tag.Key, tag.Key) + } + } + + filterSubQuery := fmt.Sprintf("SELECT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_TABLENAME, queryString) + + return filterSubQuery, nil +} + +func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName string) (string, error) { + + filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, mq.AggregateAttribute.Key, mq.AggregateOperator) + if err != nil { + return "", err + } + + samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + + // Select the aggregate value for interval + queryTmpl := + "SELECT %s" + + " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " %s as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " GLOBAL INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableTimeFilter + + " GROUP BY %s" + + " ORDER BY %s ts" + + // tagsWithoutLe is used to group by all tags except le + // This is done because we want to group by le only when we are calculating quantile + // Otherwise, we want to group by all tags except le + tagsWithoutLe := []string{} + for _, tag := range mq.GroupBy { + if tag.Key != "le" { + tagsWithoutLe = append(tagsWithoutLe, tag.Key) + } + } + + groupByWithoutLe := groupBy(tagsWithoutLe...) + groupTagsWithoutLe := groupSelect(tagsWithoutLe...) + orderWithoutLe := orderBy(mq.OrderBy, tagsWithoutLe) + + groupBy := groupByAttributeKeyTags(mq.GroupBy...) + groupTags := groupSelectAttributeKeyTags(mq.GroupBy...) + orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) + + if len(orderBy) != 0 { + orderBy += "," + } + + switch mq.AggregateOperator { + case v3.AggregateOperatorRate: + // Calculate rate of change of metric for each unique time series + groupBy = "fingerprint, ts" + groupTags = "fingerprint," + op := "max(value)" // max value should be the closest value for point in time + subQuery := fmt.Sprintf( + queryTmpl, "any(labels) as labels, "+groupTags, step, op, filterSubQuery, groupBy, orderBy, + ) // labels will be same so any should be fine + query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s)` + + query = fmt.Sprintf(query, "labels as fullLabels,", subQuery) + return query, nil + case v3.AggregateOperatorSumRate: + rateGroupBy := "fingerprint, " + groupBy + rateGroupTags := "fingerprint, " + groupTags + rateOrderBy := "fingerprint, " + orderBy + op := "max(value)" + subQuery := fmt.Sprintf( + queryTmpl, rateGroupTags, step, op, filterSubQuery, rateGroupBy, rateOrderBy, + ) // labels will be same so any should be fine + query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s)` + query = fmt.Sprintf(query, groupTags, subQuery) + query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, query, groupBy, orderBy) + return query, nil + case + v3.AggregateOperatorRateSum, + v3.AggregateOperatorRateMax, + v3.AggregateOperatorRateAvg, + v3.AggregateOperatorRateMin: + op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) + subQuery := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s)` + query = fmt.Sprintf(query, groupTags, subQuery) + return query, nil + case + v3.AggregateOperatorP05, + v3.AggregateOperatorP10, + v3.AggregateOperatorP20, + v3.AggregateOperatorP25, + v3.AggregateOperatorP50, + v3.AggregateOperatorP75, + v3.AggregateOperatorP90, + v3.AggregateOperatorP95, + v3.AggregateOperatorP99: + op := fmt.Sprintf("quantile(%v)(value)", aggregateOperatorToPercentile[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorHistQuant50, v3.AggregateOperatorHistQuant75, v3.AggregateOperatorHistQuant90, v3.AggregateOperatorHistQuant95, v3.AggregateOperatorHistQuant99: + rateGroupBy := "fingerprint, " + groupBy + rateGroupTags := "fingerprint, " + groupTags + rateOrderBy := "fingerprint, " + orderBy + op := "max(value)" + subQuery := fmt.Sprintf( + queryTmpl, rateGroupTags, step, op, filterSubQuery, rateGroupBy, rateOrderBy, + ) // labels will be same so any should be fine + query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s)` + query = fmt.Sprintf(query, groupTags, subQuery) + query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, query, groupBy, orderBy) + value := aggregateOperatorToPercentile[mq.AggregateOperator] + + query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, orderWithoutLe) + return query, nil + case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: + op := fmt.Sprintf("%s(value)", aggregateOperatorToSQLFunc[mq.AggregateOperator]) + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOpeatorCount: + op := "toFloat64(count(*))" + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorCountDistinct: + op := "toFloat64(count(distinct(value)))" + query := fmt.Sprintf(queryTmpl, groupTags, step, op, filterSubQuery, groupBy, orderBy) + return query, nil + case v3.AggregateOperatorNoOp: + queryTmpl := + "SELECT fingerprint, labels as fullLabels," + + " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " any(value) as value" + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " GLOBAL INNER JOIN" + + " (%s) as filtered_time_series" + + " USING fingerprint" + + " WHERE " + samplesTableTimeFilter + + " GROUP BY fingerprint, labels, ts" + + " ORDER BY fingerprint, labels, ts" + query := fmt.Sprintf(queryTmpl, step, filterSubQuery) + return query, nil + default: + return "", fmt.Errorf("unsupported aggregate operator") + } +} + +// groupBy returns a string of comma separated tags for group by clause +// `ts` is always added to the group by clause +func groupBy(tags ...string) string { + tags = append(tags, "ts") + return strings.Join(tags, ",") +} + +// groupSelect returns a string of comma separated tags for select clause +func groupSelect(tags ...string) string { + groupTags := strings.Join(tags, ",") + if len(tags) != 0 { + groupTags += ", " + } + return groupTags +} + +func groupByAttributeKeyTags(tags ...v3.AttributeKey) string { + groupTags := []string{} + for _, tag := range tags { + groupTags = append(groupTags, tag.Key) + } + return groupBy(groupTags...) +} + +func groupSelectAttributeKeyTags(tags ...v3.AttributeKey) string { + groupTags := []string{} + for _, tag := range tags { + groupTags = append(groupTags, tag.Key) + } + return groupSelect(groupTags...) +} + +// orderBy returns a string of comma separated tags for order by clause +// if the order is not specified, it defaults to ASC +func orderBy(items []v3.OrderBy, tags []string) string { + var orderBy []string + for _, tag := range tags { + found := false + for _, item := range items { + if item.ColumnName == tag { + found = true + orderBy = append(orderBy, fmt.Sprintf("%s %s", item.ColumnName, item.Order)) + break + } + } + if !found { + orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag)) + } + } + return strings.Join(orderBy, ",") +} + +func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string { + var groupTags []string + for _, tag := range tags { + groupTags = append(groupTags, tag.Key) + } + return orderBy(items, groupTags) +} + +func having(items []v3.Having) string { + var having []string + for _, item := range items { + having = append(having, fmt.Sprintf("%s %s %v", item.ColumnName, item.Operator, utils.ClickHouseFormattedValue(item.Value))) + } + return strings.Join(having, " AND ") +} + +func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v3.AggregateOperator) (string, error) { + var selectLabels string + var groupBy string + // NOOP and RATE can possibly return multiple time series and reduce should be applied + // for each uniques series. When the final result contains more than one series we throw + // an error post DB fetching. Otherwise just return the single data. This is not known until queried so the + // the query is prepared accordingly. + if aggregateOperator == v3.AggregateOperatorNoOp || aggregateOperator == v3.AggregateOperatorRate { + selectLabels = ", any(fullLabels) as fullLabels" + groupBy = "GROUP BY fingerprint" + } + // the timestamp picked is not relevant here since the final value used is show the single + // chart with just the query value. For the quer + switch reduceTo { + case v3.ReduceToOperatorLast: + query = fmt.Sprintf("SELECT anyLast(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy) + case v3.ReduceToOperatorSum: + query = fmt.Sprintf("SELECT sum(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy) + case v3.ReduceToOperatorAvg: + query = fmt.Sprintf("SELECT avg(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy) + case v3.ReduceToOperatorMax: + query = fmt.Sprintf("SELECT max(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy) + case v3.ReduceToOperatorMin: + query = fmt.Sprintf("SELECT min(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy) + default: + return "", fmt.Errorf("unsupported reduce operator") + } + return query, nil +} + +func PrepareMetricQuery(start, end, step int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery) (string, error) { + query, err := buildMetricQuery(start, end, step, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + if err != nil { + return "", err + } + if panelType == v3.PanelTypeValue { + query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator) + } + return query, err +} diff --git a/pkg/query-service/app/metrics/v3/query_builder_test.go b/pkg/query-service/app/metrics/v3/query_builder_test.go new file mode 100644 index 0000000000..12bb2dca87 --- /dev/null +++ b/pkg/query-service/app/metrics/v3/query_builder_test.go @@ -0,0 +1,98 @@ +package v3 + +import ( + "testing" + + "github.com/stretchr/testify/require" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestBuildQuery(t *testing.T) { + t.Run("TestSimpleQueryWithName", func(t *testing.T) { + q := &v3.QueryRangeParamsV3{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "name"}, + AggregateOperator: v3.AggregateOperatorRateMax, + Expression: "A", + }, + }, + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + }, + } + query, err := PrepareMetricQuery(q.Start, q.End, q.Step, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"]) + require.NoError(t, err) + require.Contains(t, query, "WHERE metric_name = 'name'") + }) +} + +func TestBuildQueryWithFilters(t *testing.T) { + t.Run("TestBuildQueryWithFilters", func(t *testing.T) { + q := &v3.QueryRangeParamsV3{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "name"}, + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "a"}, Value: "b", Operator: "neq"}, + {Key: v3.AttributeKey{Key: "code"}, Value: "ERROR_*", Operator: "nmatch"}, + }}, + AggregateOperator: v3.AggregateOperatorRateMax, + Expression: "A", + }, + }, + }, + } + query, err := PrepareMetricQuery(q.Start, q.End, q.Step, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"]) + require.NoError(t, err) + + require.Contains(t, query, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'a') != 'b'") + require.Contains(t, query, rateWithoutNegative) + require.Contains(t, query, "not match(JSONExtractString(labels, 'code'), 'ERROR_*')") + }) +} + +func TestBuildQueryWithMultipleQueries(t *testing.T) { + t.Run("TestBuildQueryWithFilters", func(t *testing.T) { + q := &v3.QueryRangeParamsV3{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "name"}, + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "in"}, Value: []interface{}{"a", "b", "c"}, Operator: "in"}, + }}, + AggregateOperator: v3.AggregateOperatorRateAvg, + Expression: "A", + }, + "B": { + QueryName: "B", + AggregateAttribute: v3.AttributeKey{Key: "name2"}, + AggregateOperator: v3.AggregateOperatorRateMax, + Expression: "B", + }, + }, + }, + } + + query, err := PrepareMetricQuery(q.Start, q.End, q.Step, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"]) + require.NoError(t, err) + + require.Contains(t, query, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']") + require.Contains(t, query, rateWithoutNegative) + }) +} diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 608e5df8a6..a35f3138de 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -1,6 +1,7 @@ package app import ( + "bytes" "encoding/json" "errors" "fmt" @@ -8,15 +9,21 @@ import ( "net/http" "strconv" "strings" + "text/template" "time" + "github.com/SigNoz/govaluate" "github.com/gorilla/mux" promModel "github.com/prometheus/common/model" + "go.uber.org/multierr" + "go.signoz.io/signoz/pkg/query-service/app/metrics" "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" + querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate" ) var allowedFunctions = []string{"count", "ratePerSec", "sum", "avg", "min", "max", "p50", "p90", "p95", "p99"} @@ -902,3 +909,147 @@ func parseFilterAttributeValueRequest(r *http.Request) (*v3.FilterAttributeValue } return &req, nil } + +func validateQueryRangeParamsV3(qp *v3.QueryRangeParamsV3) error { + err := qp.CompositeQuery.Validate() + if err != nil { + return err + } + + var expressions []string + for _, q := range qp.CompositeQuery.BuilderQueries { + expressions = append(expressions, q.Expression) + } + errs := validateExpressions(expressions, evalFuncs, qp.CompositeQuery) + if len(errs) > 0 { + return multierr.Combine(errs...) + } + return nil +} + +// validateExpressions validates the math expressions using the list of +// allowed functions. +func validateExpressions(expressions []string, funcs map[string]govaluate.ExpressionFunction, cq *v3.CompositeQuery) []error { + var errs []error + for _, exp := range expressions { + evalExp, err := govaluate.NewEvaluableExpressionWithFunctions(exp, funcs) + if err != nil { + errs = append(errs, err) + continue + } + variables := evalExp.Vars() + for _, v := range variables { + var hasVariable bool + for _, q := range cq.BuilderQueries { + if q.Expression == v { + hasVariable = true + break + } + } + if !hasVariable { + errs = append(errs, fmt.Errorf("unknown variable %s", v)) + } + } + } + return errs +} + +func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiError) { + + var queryRangeParams *v3.QueryRangeParamsV3 + + // parse the request body + if err := json.NewDecoder(r.Body).Decode(&queryRangeParams); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} + } + + // validate the request body + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} + } + + // prepare the variables for the corrspnding query type + formattedVars := make(map[string]interface{}) + for name, value := range queryRangeParams.Variables { + if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypePromQL { + formattedVars[name] = metrics.PromFormattedValue(value) + } else if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeClickHouseSQL { + formattedVars[name] = utils.ClickHouseFormattedValue(value) + } + } + + // replace the variables in metrics builder filter item with actual value + // example: {"key": "host", "value": "{{ .host }}", "operator": "equals"} with + // variables {"host": "test"} will be replaced with {"key": "host", "value": "test", "operator": "equals"} + + if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder { + for _, query := range queryRangeParams.CompositeQuery.BuilderQueries { + if query.Filters == nil || len(query.Filters.Items) == 0 { + continue + } + for idx := range query.Filters.Items { + item := &query.Filters.Items[idx] + value := item.Value + if value != nil { + switch x := value.(type) { + case string: + variableName := strings.Trim(x, "{{ . }}") + if _, ok := queryRangeParams.Variables[variableName]; ok { + item.Value = queryRangeParams.Variables[variableName] + } + case []interface{}: + if len(x) > 0 { + switch x[0].(type) { + case string: + variableName := strings.Trim(x[0].(string), "{{ . }}") + if _, ok := queryRangeParams.Variables[variableName]; ok { + item.Value = queryRangeParams.Variables[variableName] + } + } + } + } + } + } + } + } + queryRangeParams.Variables = formattedVars + + // prometheus instant query needs same timestamp + if queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeValue && + queryRangeParams.CompositeQuery.QueryType == v3.QueryTypePromQL { + queryRangeParams.Start = queryRangeParams.End + } + + // round up the end to neaerest multiple + if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder { + end := (queryRangeParams.End) / 1000 + step := queryRangeParams.Step + queryRangeParams.End = (end / step * step) * 1000 + } + + // replace go template variables in clickhouse query + if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeClickHouseSQL { + for _, chQuery := range queryRangeParams.CompositeQuery.ClickHouseQueries { + if chQuery.Disabled { + continue + } + tmpl := template.New("clickhouse-query") + tmpl, err := tmpl.Parse(chQuery.Query) + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} + } + var query bytes.Buffer + + // replace go template variables + querytemplate.AssignReservedVarsV3(queryRangeParams) + + err = tmpl.Execute(&query, queryRangeParams.Variables) + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} + } + chQuery.Query = query.String() + } + } + + return queryRangeParams, nil +} diff --git a/pkg/query-service/app/parser_test.go b/pkg/query-service/app/parser_test.go index 222a1c9ee6..ea3eb97854 100644 --- a/pkg/query-service/app/parser_test.go +++ b/pkg/query-service/app/parser_test.go @@ -2,14 +2,18 @@ package app import ( "bytes" + "encoding/json" + "fmt" "net/http" "net/http/httptest" "strings" "testing" + "time" "github.com/smartystreets/assertions/should" . "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.signoz.io/signoz/pkg/query-service/app/metrics" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" @@ -337,3 +341,419 @@ func TestParseFilterAttributeValueRequest(t *testing.T) { assert.Equal(t, reqCase.expectedSearchText, filterAttrRequest.SearchText) } } + +func TestParseQueryRangeParamsCompositeQuery(t *testing.T) { + reqCases := []struct { + desc string + compositeQuery v3.CompositeQuery + expectErr bool + errMsg string + }{ + { + desc: "no query in request", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypeClickHouseSQL, + }, + expectErr: true, + errMsg: "composite query must contain at least one query", + }, + { + desc: "invalid panel type", + compositeQuery: v3.CompositeQuery{ + PanelType: "invalid", + QueryType: v3.QueryTypeClickHouseSQL, + ClickHouseQueries: map[string]*v3.ClickHouseQuery{ + "A": { + Query: "query", + Disabled: false, + }, + }, + }, + expectErr: true, + errMsg: "panel type is invalid", + }, + { + desc: "invalid query type", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: "invalid", + ClickHouseQueries: map[string]*v3.ClickHouseQuery{ + "A": { + Query: "query", + Disabled: false, + }, + }, + }, + expectErr: true, + errMsg: "query type is invalid", + }, + { + desc: "invalid prometheus query", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypePromQL, + PromQueries: map[string]*v3.PromQuery{ + "A": { + Query: "", + Disabled: false, + }, + }, + }, + expectErr: true, + errMsg: "query is empty", + }, + { + desc: "invalid clickhouse query", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypeClickHouseSQL, + ClickHouseQueries: map[string]*v3.ClickHouseQuery{ + "A": { + Query: "", + Disabled: false, + }, + }, + }, + expectErr: true, + errMsg: "query is empty", + }, + { + desc: "invalid prometheus query with disabled query", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypePromQL, + PromQueries: map[string]*v3.PromQuery{ + "A": { + Query: "", + Disabled: true, + }, + }, + }, + expectErr: true, + errMsg: "query is empty", + }, + { + desc: "invalid clickhouse query with disabled query", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypeClickHouseSQL, + ClickHouseQueries: map[string]*v3.ClickHouseQuery{ + "A": { + Query: "", + Disabled: true, + }, + }, + }, + expectErr: true, + errMsg: "query is empty", + }, + { + desc: "valid prometheus query", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypePromQL, + PromQueries: map[string]*v3.PromQuery{ + "A": { + Query: "http_calls_total", + Disabled: false, + }, + }, + }, + expectErr: false, + }, + { + desc: "invalid builder query without query name", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypeBuilder, + BuilderQueries: map[string]*v3.BuilderQuery{ + "": { + QueryName: "", + Expression: "A", + }, + }, + }, + expectErr: true, + errMsg: "query name is required", + }, + { + desc: "invalid data source for builder query", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypeBuilder, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: "invalid", + Expression: "A", + }, + }, + }, + expectErr: true, + errMsg: "data source is invalid", + }, + { + desc: "invalid aggregate operator for builder query", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypeBuilder, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: "metrics", + AggregateOperator: "invalid", + Expression: "A", + }, + }, + }, + expectErr: true, + errMsg: "aggregate operator is invalid", + }, + { + desc: "invalid aggregate attribute for builder query", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypeBuilder, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: "traces", + AggregateOperator: "sum", + AggregateAttribute: v3.AttributeKey{}, + Expression: "A", + }, + }, + }, + expectErr: true, + errMsg: "aggregate attribute is required", + }, + { + desc: "invalid group by attribute for builder query", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypeBuilder, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: "logs", + AggregateOperator: "sum", + AggregateAttribute: v3.AttributeKey{Key: "attribute"}, + GroupBy: []v3.AttributeKey{{Key: ""}}, + Expression: "A", + }, + }, + }, + expectErr: true, + errMsg: "builder query A is invalid: group by is invalid", + }, + } + + for _, tc := range reqCases { + t.Run(tc.desc, func(t *testing.T) { + + queryRangeParams := &v3.QueryRangeParamsV3{ + Start: time.Now().Add(-time.Hour).UnixMilli(), + End: time.Now().UnixMilli(), + Step: time.Minute.Microseconds(), + CompositeQuery: &tc.compositeQuery, + Variables: map[string]interface{}{}, + } + + body := &bytes.Buffer{} + err := json.NewEncoder(body).Encode(queryRangeParams) + require.NoError(t, err) + req := httptest.NewRequest(http.MethodPost, "/api/v3/query_range", body) + + _, apiErr := ParseQueryRangeParams(req) + if tc.expectErr { + require.Error(t, apiErr) + require.Contains(t, apiErr.Error(), tc.errMsg) + } else { + require.Nil(t, apiErr) + } + }) + } +} + +func TestParseQueryRangeParamsExpressions(t *testing.T) { + reqCases := []struct { + desc string + compositeQuery v3.CompositeQuery + expectErr bool + errMsg string + }{ + { + desc: "invalid expression", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypeBuilder, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorSum, + AggregateAttribute: v3.AttributeKey{Key: "attribute_metrics"}, + Expression: "A +", + }, + }, + }, + expectErr: true, + errMsg: "Unexpected end of expression", + }, + { + desc: "invalid expression", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypeBuilder, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: v3.DataSourceLogs, + AggregateOperator: v3.AggregateOperatorSum, + AggregateAttribute: v3.AttributeKey{Key: "attribute_logs"}, + Expression: "A", + }, + "F1": { + QueryName: "F1", + Expression: "A + B", + }, + }, + }, + expectErr: true, + errMsg: "unknown variable B", + }, + { + desc: "invalid expression", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypeBuilder, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: v3.DataSourceLogs, + AggregateOperator: v3.AggregateOperatorSum, + AggregateAttribute: v3.AttributeKey{Key: "attribute_logs"}, + Expression: "A", + }, + "F1": { + QueryName: "F1", + Expression: "A + B + C", + }, + }, + }, + expectErr: true, + errMsg: "unknown variable B; unknown variable C", + }, + } + + for _, tc := range reqCases { + t.Run(tc.desc, func(t *testing.T) { + + queryRangeParams := &v3.QueryRangeParamsV3{ + Start: time.Now().Add(-time.Hour).UnixMilli(), + End: time.Now().UnixMilli(), + Step: time.Minute.Microseconds(), + CompositeQuery: &tc.compositeQuery, + Variables: map[string]interface{}{}, + } + + body := &bytes.Buffer{} + err := json.NewEncoder(body).Encode(queryRangeParams) + require.NoError(t, err) + req := httptest.NewRequest(http.MethodPost, "/api/v3/query_range", body) + + _, apiErr := ParseQueryRangeParams(req) + if tc.expectErr { + if apiErr == nil { + t.Fatalf("expected error %s, got nil", tc.errMsg) + } + require.Error(t, apiErr) + require.Contains(t, apiErr.Error(), tc.errMsg) + } else { + require.Nil(t, apiErr) + } + }) + } +} + +func TestParseQueryRangeParamsDashboardVarsSubstitution(t *testing.T) { + reqCases := []struct { + desc string + compositeQuery v3.CompositeQuery + variables map[string]interface{} + expectErr bool + errMsg string + expectedValue []interface{} + }{ + { + desc: "valid builder query with dashboard variables", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypeBuilder, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorSum, + AggregateAttribute: v3.AttributeKey{Key: "attribute_metrics"}, + Expression: "A", + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "service_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + Operator: "EQ", + Value: "{{.service_name}}", + }, + { + Key: v3.AttributeKey{Key: "operation_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + Operator: "IN", + Value: "{{.operation_name}}", + }, + }, + }, + }, + }, + }, + variables: map[string]interface{}{ + "service_name": "route", + "operation_name": []interface{}{ + "GET /route", + "POST /route", + }, + }, + expectErr: false, + expectedValue: []interface{}{"route", []interface{}{"GET /route", "POST /route"}}, + }, + } + + for _, tc := range reqCases { + t.Run(tc.desc, func(t *testing.T) { + + queryRangeParams := &v3.QueryRangeParamsV3{ + Start: time.Now().Add(-time.Hour).UnixMilli(), + End: time.Now().UnixMilli(), + Step: time.Minute.Microseconds(), + CompositeQuery: &tc.compositeQuery, + Variables: tc.variables, + } + + body := &bytes.Buffer{} + err := json.NewEncoder(body).Encode(queryRangeParams) + require.NoError(t, err) + req := httptest.NewRequest(http.MethodPost, "/api/v3/query_range", body) + + parsedQueryRangeParams, apiErr := ParseQueryRangeParams(req) + if tc.expectErr { + require.Error(t, apiErr) + require.Contains(t, apiErr.Error(), tc.errMsg) + } else { + fmt.Println(apiErr) + require.Nil(t, apiErr) + require.Equal(t, parsedQueryRangeParams.CompositeQuery.BuilderQueries["A"].Filters.Items[0].Value, tc.expectedValue[0]) + require.Equal(t, parsedQueryRangeParams.CompositeQuery.BuilderQueries["A"].Filters.Items[1].Value, tc.expectedValue[1]) + } + }) + } +} diff --git a/pkg/query-service/app/query_builder.go b/pkg/query-service/app/query_builder.go new file mode 100644 index 0000000000..4ebddbf5ed --- /dev/null +++ b/pkg/query-service/app/query_builder.go @@ -0,0 +1,179 @@ +package app + +import ( + "fmt" + "strings" + + "github.com/SigNoz/govaluate" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.uber.org/zap" +) + +var SupportedFunctions = []string{ + "exp", + "log", + "ln", + "exp2", + "log2", + "exp10", + "log10", + "sqrt", + "cbrt", + "erf", + "erfc", + "lgamma", + "tgamma", + "sin", + "cos", + "tan", + "asin", + "acos", + "atan", + "degrees", + "radians", +} + +var evalFuncs = map[string]govaluate.ExpressionFunction{} + +type prepareTracesQueryFunc func(start, end, step int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error) +type prepareLogsQueryFunc func(start, end, step int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error) +type prepareMetricQueryFunc func(start, end, step int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error) + +type queryBuilder struct { + options queryBuilderOptions +} + +type queryBuilderOptions struct { + BuildTraceQuery prepareTracesQueryFunc + BuildLogQuery prepareLogsQueryFunc + BuildMetricQuery prepareMetricQueryFunc +} + +func NewQueryBuilder(options queryBuilderOptions) *queryBuilder { + return &queryBuilder{ + options: options, + } +} + +func init() { + for _, fn := range SupportedFunctions { + evalFuncs[fn] = func(args ...interface{}) (interface{}, error) { + return nil, nil + } + } +} + +// unique returns the unique values in the slice +func unique(slice []string) []string { + keys := make(map[string]struct{}) + list := []string{} + for _, entry := range slice { + if _, value := keys[entry]; !value { + keys[entry] = struct{}{} + list = append(list, entry) + } + } + return list +} + +// expressionToQuery constructs the query for the expression +func expressionToQuery(qp *v3.QueryRangeParamsV3, varToQuery map[string]string, expression *govaluate.EvaluableExpression) (string, error) { + var formulaQuery string + variables := unique(expression.Vars()) + + var modified []govaluate.ExpressionToken + tokens := expression.Tokens() + for idx := range tokens { + token := tokens[idx] + if token.Kind == govaluate.VARIABLE { + token.Value = fmt.Sprintf("%v.value", token.Value) + token.Meta = fmt.Sprintf("%v.value", token.Meta) + } + modified = append(modified, token) + } + // err should be nil here since the expression is already validated + formula, _ := govaluate.NewEvaluableExpressionFromTokens(modified) + + var formulaSubQuery string + var joinUsing string + var prevVar string + for idx, variable := range variables { + query := varToQuery[variable] + groupTags := []string{} + for _, tag := range qp.CompositeQuery.BuilderQueries[variable].GroupBy { + groupTags = append(groupTags, tag.Key) + } + groupTags = append(groupTags, "ts") + if joinUsing == "" { + for _, tag := range groupTags { + joinUsing += fmt.Sprintf("%s.%s as %s, ", variable, tag, tag) + } + joinUsing = strings.TrimSuffix(joinUsing, ", ") + } + formulaSubQuery += fmt.Sprintf("(%s) as %s ", query, variable) + if idx > 0 { + formulaSubQuery += " ON " + for _, tag := range groupTags { + formulaSubQuery += fmt.Sprintf("%s.%s = %s.%s AND ", prevVar, tag, variable, tag) + } + formulaSubQuery = strings.TrimSuffix(formulaSubQuery, " AND ") + } + if idx < len(variables)-1 { + formulaSubQuery += " GLOBAL INNER JOIN" + } + prevVar = variable + } + formulaQuery = fmt.Sprintf("SELECT %s, %s as value FROM ", joinUsing, formula.ExpressionString()) + formulaSubQuery + return formulaQuery, nil +} + +func (qb *queryBuilder) prepareQueries(params *v3.QueryRangeParamsV3) (map[string]string, error) { + queries := make(map[string]string) + + compositeQuery := params.CompositeQuery + + if compositeQuery != nil { + + // Build queries for each builder query + for queryName, query := range compositeQuery.BuilderQueries { + if query.Expression == queryName { + switch query.DataSource { + case v3.DataSourceTraces: + queryString, err := qb.options.BuildTraceQuery(params.Start, params.End, params.Step, compositeQuery.QueryType, compositeQuery.PanelType, query) + if err != nil { + return nil, err + } + queries[queryName] = queryString + case v3.DataSourceLogs: + queryString, err := qb.options.BuildLogQuery(params.Start, params.End, params.Step, compositeQuery.QueryType, compositeQuery.PanelType, query) + if err != nil { + return nil, err + } + queries[queryName] = queryString + case v3.DataSourceMetrics: + queryString, err := qb.options.BuildMetricQuery(params.Start, params.End, params.Step, compositeQuery.QueryType, compositeQuery.PanelType, query) + if err != nil { + return nil, err + } + queries[queryName] = queryString + default: + zap.S().Errorf("Unknown data source %s", query.DataSource) + } + } + } + + // Build queries for each expression + for _, query := range compositeQuery.BuilderQueries { + if query.Expression != query.QueryName { + expression, _ := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, evalFuncs) + + queryString, err := expressionToQuery(params, queries, expression) + if err != nil { + return nil, err + } + queries[query.QueryName] = queryString + } + } + } + return queries, nil +} diff --git a/pkg/query-service/app/query_builder_test.go b/pkg/query-service/app/query_builder_test.go new file mode 100644 index 0000000000..1e9a282212 --- /dev/null +++ b/pkg/query-service/app/query_builder_test.go @@ -0,0 +1,198 @@ +package app + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" + metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) { + t.Run("TestBuildQueryWithFilters", func(t *testing.T) { + q := &v3.QueryRangeParamsV3{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{Key: "name"}, + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "in"}, Value: []interface{}{"a", "b", "c"}, Operator: "in"}, + }}, + AggregateOperator: v3.AggregateOperatorRateMax, + Expression: "A", + }, + "B": { + AggregateAttribute: v3.AttributeKey{Key: "name2"}, + DataSource: v3.DataSourceMetrics, + AggregateOperator: v3.AggregateOperatorRateAvg, + Expression: "B", + }, + "C": { + QueryName: "C", + Expression: "A/B", + }, + }, + }, + } + qbOptions := queryBuilderOptions{ + BuildMetricQuery: metricsv3.PrepareMetricQuery, + } + qb := NewQueryBuilder(qbOptions) + + queries, err := qb.prepareQueries(q) + + require.NoError(t, err) + + require.Contains(t, queries["C"], "SELECT A.ts as ts, A.value / B.value") + require.Contains(t, queries["C"], "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']") + require.Contains(t, queries["C"], "runningDifference(value)/runningDifference(ts)") + }) +} + +func TestBuildQueryWithIncorrectQueryRef(t *testing.T) { + t.Run("TestBuildQueryWithFilters", func(t *testing.T) { + q := &v3.QueryRangeParamsV3{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{Key: "name"}, + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "in"}, Value: []interface{}{"a", "b", "c"}, Operator: "in"}, + }}, + AggregateOperator: v3.AggregateOperatorRateMax, + Expression: "A", + }, + "C": { + QueryName: "C", + Expression: "A*2", + }, + }, + }, + } + + qbOptions := queryBuilderOptions{ + BuildMetricQuery: metricsv3.PrepareMetricQuery, + } + qb := NewQueryBuilder(qbOptions) + + _, err := qb.prepareQueries(q) + + require.NoError(t, err) + }) +} + +func TestBuildQueryWithThreeOrMoreQueriesRefAndFormula(t *testing.T) { + t.Run("TestBuildQueryWithFilters", func(t *testing.T) { + q := &v3.QueryRangeParamsV3{ + Start: 1650991982000, + End: 1651078382000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{Key: "name"}, + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "in"}, Value: []interface{}{"a", "b", "c"}, Operator: "in"}, + }}, + AggregateOperator: v3.AggregateOperatorRateMax, + Expression: "A", + Disabled: true, + }, + "B": { + AggregateAttribute: v3.AttributeKey{Key: "name2"}, + DataSource: v3.DataSourceMetrics, + + AggregateOperator: v3.AggregateOperatorRateMax, + Expression: "B", + Disabled: true, + }, + "C": { + AggregateAttribute: v3.AttributeKey{Key: "name3"}, + DataSource: v3.DataSourceMetrics, + + AggregateOperator: v3.AggregateOperatorSumRate, + Expression: "C", + Disabled: true, + }, + "F1": { + QueryName: "F1", + Expression: "A/B", + }, + "F2": { + QueryName: "F2", + Expression: "A/(B+C)", + }, + "F3": { + QueryName: "F3", + Expression: "A*A", + }, + "F4": { + QueryName: "F4", + Expression: "A*B*C", + }, + "F5": { + QueryName: "F5", + Expression: "((A - B) / B) * 100", + }, + }, + }, + } + + qbOptions := queryBuilderOptions{ + BuildMetricQuery: metricsv3.PrepareMetricQuery, + } + qb := NewQueryBuilder(qbOptions) + + queries, err := qb.prepareQueries(q) + + require.NoError(t, err) + + require.Contains(t, queries["F1"], "SELECT A.ts as ts, A.value / B.value") + require.Equal(t, 1, strings.Count(queries["F1"], " ON ")) + + require.Contains(t, queries["F2"], "SELECT A.ts as ts, A.value / (B.value + C.value)") + require.Equal(t, 2, strings.Count(queries["F2"], " ON ")) + + // Working with same query multiple times should not join on itself + require.NotContains(t, queries["F3"], " ON ") + + require.Contains(t, queries["F4"], "SELECT A.ts as ts, A.value * B.value * C.value") + require.Equal(t, 2, strings.Count(queries["F4"], " ON ")) + + require.Contains(t, queries["F5"], "SELECT A.ts as ts, ((A.value - B.value) / B.value) * 100") + require.Equal(t, 1, strings.Count(queries["F5"], " ON ")) + + // res := PrepareBuilderMetricQueries(q, "table") + // So(res.Err, ShouldBeNil) + // queries := res.Queries + // So(len(queries), ShouldEqual, 5) + // So(queries["F1"], ShouldContainSubstring, "SELECT A.ts as ts, A.value / B.value") + // So(strings.Count(queries["F1"], " ON "), ShouldEqual, 1) + + // So(queries["F2"], ShouldContainSubstring, "SELECT A.ts as ts, A.value / (B.value + C.value)") + // So(strings.Count(queries["F2"], " ON "), ShouldEqual, 2) + + // // Working with same query multiple times should not join on itself + // So(queries["F3"], ShouldNotContainSubstring, " ON ") + + // So(queries["F4"], ShouldContainSubstring, "SELECT A.ts as ts, A.value * B.value * C.value") + // // Number of times JOIN ON appears is N-1 where N is number of unique queries + // So(strings.Count(queries["F4"], " ON "), ShouldEqual, 2) + + // So(queries["F5"], ShouldContainSubstring, "SELECT A.ts as ts, ((A.value - B.value) / B.value) * 100") + // So(strings.Count(queries["F5"], " ON "), ShouldEqual, 1) + }) +} diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 191e7c6e5f..d433163aa7 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -213,7 +213,11 @@ const ( // ReservedColumnTargetAliases identifies result value from a user // written clickhouse query. The column alias indcate which value is // to be considered as final result (or target) -var ReservedColumnTargetAliases = map[string]bool{"result": true, "res": true, "value": true} +var ReservedColumnTargetAliases = map[string]struct{}{ + "result": {}, + "res": {}, + "value": {}, +} const ( StringTagMapCol = "stringTagMap" diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index c8c66669d0..089003fc8c 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -60,6 +60,7 @@ type Reader interface { GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) + GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error) GetTotalSpans(ctx context.Context) (uint64, error) GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error) diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 1221e46a1b..8a55fb02a8 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -1,7 +1,9 @@ package v3 import ( + "encoding/json" "fmt" + "strconv" "time" "github.com/google/uuid" @@ -248,11 +250,13 @@ func (a AttributeKey) Validate() error { return fmt.Errorf("invalid attribute dataType: %s", a.DataType) } - switch a.Type { - case AttributeKeyTypeResource, AttributeKeyTypeTag: - break - default: - return fmt.Errorf("invalid attribute type: %s", a.Type) + if a.IsColumn { + switch a.Type { + case AttributeKeyTypeResource, AttributeKeyTypeTag: + break + default: + return fmt.Errorf("invalid attribute type: %s", a.Type) + } } if a.Key == "" { @@ -410,16 +414,16 @@ func (b *BuilderQuery) Validate() error { } if b.GroupBy != nil { for _, groupBy := range b.GroupBy { - if groupBy.Validate() != nil { - return fmt.Errorf("group by is invalid") + if err := groupBy.Validate(); err != nil { + return fmt.Errorf("group by is invalid %w", err) } } } if b.SelectColumns != nil { for _, selectColumn := range b.SelectColumns { - if selectColumn.Validate() != nil { - return fmt.Errorf("select column is invalid") + if err := selectColumn.Validate(); err != nil { + return fmt.Errorf("select column is invalid %w", err) } } } @@ -473,9 +477,9 @@ type QueryRangeResponse struct { } type Result struct { - QueryName string `json:"queryName"` - Series *Series `json:"series"` - List []*Row `json:"list"` + QueryName string `json:"queryName"` + Series []*Series `json:"series"` + List []*Row `json:"list"` } type Series struct { @@ -489,8 +493,14 @@ type Row struct { } type Point struct { - Timestamp int64 `json:"timestamp"` - Value float64 `json:"value"` + Timestamp int64 + Value float64 +} + +// MarshalJSON implements json.Marshaler. +func (p *Point) MarshalJSON() ([]byte, error) { + v := strconv.FormatFloat(p.Value, 'f', -1, 64) + return json.Marshal(map[string]interface{}{"timestamp": p.Timestamp, "value": v}) } // ExploreQuery is a query for the explore page diff --git a/pkg/query-service/utils/format.go b/pkg/query-service/utils/format.go new file mode 100644 index 0000000000..af722a70d3 --- /dev/null +++ b/pkg/query-service/utils/format.go @@ -0,0 +1,47 @@ +package utils + +import ( + "fmt" + "reflect" + "strings" + + "go.uber.org/zap" +) + +// ClickHouseFormattedValue formats the value to be used in clickhouse query +func ClickHouseFormattedValue(v interface{}) string { + switch x := v.(type) { + case int: + return fmt.Sprintf("%d", x) + case float32, float64: + return fmt.Sprintf("%f", x) + case string: + return fmt.Sprintf("'%s'", x) + case bool: + return fmt.Sprintf("%v", x) + case []interface{}: + if len(x) == 0 { + return "" + } + switch x[0].(type) { + case string: + str := "[" + for idx, sVal := range x { + str += fmt.Sprintf("'%s'", sVal) + if idx != len(x)-1 { + str += "," + } + } + str += "]" + return str + case int, float32, float64, bool: + return strings.Join(strings.Fields(fmt.Sprint(x)), ",") + default: + zap.S().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x[0]))) + return "" + } + default: + zap.S().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x))) + return "" + } +} diff --git a/pkg/query-service/utils/queryTemplate/vars.go b/pkg/query-service/utils/queryTemplate/vars.go index 093977aa01..b63487ec4a 100644 --- a/pkg/query-service/utils/queryTemplate/vars.go +++ b/pkg/query-service/utils/queryTemplate/vars.go @@ -4,6 +4,7 @@ import ( "fmt" "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) // AssignReservedVars assigns values for go template vars. assumes that @@ -22,3 +23,20 @@ func AssignReservedVars(metricsQueryRangeParams *model.QueryRangeParamsV2) { metricsQueryRangeParams.Variables["end_datetime"] = fmt.Sprintf("toDateTime(%d)", metricsQueryRangeParams.End/1000) } + +// AssignReservedVars assigns values for go template vars. assumes that +// model.QueryRangeParamsV3.Start and End are Unix Nano timestamps +func AssignReservedVarsV3(metricsQueryRangeParams *v3.QueryRangeParamsV3) { + metricsQueryRangeParams.Variables["start_timestamp"] = metricsQueryRangeParams.Start / 1000 + metricsQueryRangeParams.Variables["end_timestamp"] = metricsQueryRangeParams.End / 1000 + + metricsQueryRangeParams.Variables["start_timestamp_ms"] = metricsQueryRangeParams.Start + metricsQueryRangeParams.Variables["end_timestamp_ms"] = metricsQueryRangeParams.End + + metricsQueryRangeParams.Variables["start_timestamp_nano"] = metricsQueryRangeParams.Start * 1e6 + metricsQueryRangeParams.Variables["end_timestamp_nano"] = metricsQueryRangeParams.End * 1e6 + + metricsQueryRangeParams.Variables["start_datetime"] = fmt.Sprintf("toDateTime(%d)", metricsQueryRangeParams.Start/1000) + metricsQueryRangeParams.Variables["end_datetime"] = fmt.Sprintf("toDateTime(%d)", metricsQueryRangeParams.End/1000) + +} diff --git a/pkg/query-service/utils/time.go b/pkg/query-service/utils/time.go index 90e132aff2..ea644d7600 100644 --- a/pkg/query-service/utils/time.go +++ b/pkg/query-service/utils/time.go @@ -6,9 +6,9 @@ import ( "go.uber.org/zap" ) -func Elapsed(funcName string) func() { +func Elapsed(funcName string, args ...interface{}) func() { start := time.Now() return func() { - zap.S().Infof("%s took %v\n", funcName, time.Since(start)) + zap.S().Infof("func %s took %v with args %v", funcName, time.Since(start), args) } }