mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 06:39:03 +08:00
feat: add support for dashboard variables (#1557)
This commit is contained in:
parent
461a15d52d
commit
8556c87d46
@ -2813,7 +2813,7 @@ func (r *ClickHouseReader) GetMetricResult(ctx context.Context, query string) ([
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing query: ", err)
|
||||
return nil, fmt.Errorf("error in processing query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
@ -3239,3 +3239,39 @@ func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.Logs
|
||||
|
||||
return &aggregateResponse, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error) {
|
||||
var result model.DashboardVar
|
||||
rows, err := r.db.Query(ctx, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
columnTypes = rows.ColumnTypes()
|
||||
vars = make([]interface{}, len(columnTypes))
|
||||
)
|
||||
for i := range columnTypes {
|
||||
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(vars...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, v := range vars {
|
||||
switch v := v.(type) {
|
||||
case *string, *int8, *int16, *int32, *int64, *uint8, *uint16, *uint32, *uint64, *float32, *float64, *time.Time, *bool:
|
||||
result.VariableValues = append(result.VariableValues, reflect.ValueOf(v).Elem().Interface())
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported value type encountered")
|
||||
}
|
||||
}
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
@ -9,7 +9,9 @@ import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
@ -320,6 +322,7 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
|
||||
router.HandleFunc("/api/v1/dashboards/{uuid}", ViewAccess(aH.getDashboard)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/dashboards/{uuid}", EditAccess(aH.updateDashboard)).Methods(http.MethodPut)
|
||||
router.HandleFunc("/api/v1/dashboards/{uuid}", EditAccess(aH.deleteDashboard)).Methods(http.MethodDelete)
|
||||
router.HandleFunc("/api/v1/variables/query", ViewAccess(aH.queryDashboardVars)).Methods(http.MethodGet)
|
||||
|
||||
router.HandleFunc("/api/v1/feedback", OpenAccess(aH.submitFeedback)).Methods(http.MethodPost)
|
||||
// router.HandleFunc("/api/v1/get_percentiles", aH.getApplicationPercentiles).Methods(http.MethodGet)
|
||||
@ -483,9 +486,11 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
|
||||
type channelResult struct {
|
||||
Series []*model.Series
|
||||
Err error
|
||||
Name string
|
||||
Query string
|
||||
}
|
||||
|
||||
execClickHouseQueries := func(queries map[string]string) ([]*model.Series, error) {
|
||||
execClickHouseQueries := func(queries map[string]string) ([]*model.Series, error, map[string]string) {
|
||||
var seriesList []*model.Series
|
||||
ch := make(chan channelResult, len(queries))
|
||||
var wg sync.WaitGroup
|
||||
@ -500,7 +505,7 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err)}
|
||||
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query}
|
||||
return
|
||||
}
|
||||
ch <- channelResult{Series: seriesList}
|
||||
@ -511,21 +516,23 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
|
||||
close(ch)
|
||||
|
||||
var errs []error
|
||||
errQuriesByName := make(map[string]string)
|
||||
// read values from the channel
|
||||
for r := range ch {
|
||||
if r.Err != nil {
|
||||
errs = append(errs, r.Err)
|
||||
errQuriesByName[r.Name] = r.Query
|
||||
continue
|
||||
}
|
||||
seriesList = append(seriesList, r.Series...)
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n"))
|
||||
return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n")), errQuriesByName
|
||||
}
|
||||
return seriesList, nil
|
||||
return seriesList, nil, nil
|
||||
}
|
||||
|
||||
execPromQueries := func(metricsQueryRangeParams *model.QueryRangeParamsV2) ([]*model.Series, error) {
|
||||
execPromQueries := func(metricsQueryRangeParams *model.QueryRangeParamsV2) ([]*model.Series, error, map[string]string) {
|
||||
var seriesList []*model.Series
|
||||
ch := make(chan channelResult, len(metricsQueryRangeParams.CompositeMetricQuery.PromQueries))
|
||||
var wg sync.WaitGroup
|
||||
@ -538,6 +545,19 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
|
||||
go func(name string, query *model.PromQuery) {
|
||||
var seriesList []*model.Series
|
||||
defer wg.Done()
|
||||
tmpl := template.New("promql-query")
|
||||
tmpl, tmplErr := tmpl.Parse(query.Query)
|
||||
if tmplErr != nil {
|
||||
ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query}
|
||||
return
|
||||
}
|
||||
var queryBuf bytes.Buffer
|
||||
tmplErr = tmpl.Execute(&queryBuf, metricsQueryRangeParams.Variables)
|
||||
if tmplErr != nil {
|
||||
ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query}
|
||||
return
|
||||
}
|
||||
query.Query = queryBuf.String()
|
||||
queryModel := model.QueryRangeParams{
|
||||
Start: time.UnixMilli(metricsQueryRangeParams.Start),
|
||||
End: time.UnixMilli(metricsQueryRangeParams.End),
|
||||
@ -546,7 +566,7 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
|
||||
}
|
||||
promResult, _, err := (*aH.reader).GetQueryRangeResult(r.Context(), &queryModel)
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err)}
|
||||
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query.Query}
|
||||
return
|
||||
}
|
||||
matrix, _ := promResult.Matrix()
|
||||
@ -567,22 +587,25 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
|
||||
close(ch)
|
||||
|
||||
var errs []error
|
||||
errQuriesByName := make(map[string]string)
|
||||
// read values from the channel
|
||||
for r := range ch {
|
||||
if r.Err != nil {
|
||||
errs = append(errs, r.Err)
|
||||
errQuriesByName[r.Name] = r.Query
|
||||
continue
|
||||
}
|
||||
seriesList = append(seriesList, r.Series...)
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n"))
|
||||
return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n")), errQuriesByName
|
||||
}
|
||||
return seriesList, nil
|
||||
return seriesList, nil, nil
|
||||
}
|
||||
|
||||
var seriesList []*model.Series
|
||||
var err error
|
||||
var errQuriesByName map[string]string
|
||||
switch metricsQueryRangeParams.CompositeMetricQuery.QueryType {
|
||||
case model.QUERY_BUILDER:
|
||||
runQueries := metrics.PrepareBuilderMetricQueries(metricsQueryRangeParams, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||
@ -590,7 +613,7 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: runQueries.Err}, nil)
|
||||
return
|
||||
}
|
||||
seriesList, err = execClickHouseQueries(runQueries.Queries)
|
||||
seriesList, err, errQuriesByName = execClickHouseQueries(runQueries.Queries)
|
||||
|
||||
case model.CLICKHOUSE:
|
||||
queries := make(map[string]string)
|
||||
@ -598,20 +621,32 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
|
||||
if chQuery.Disabled {
|
||||
continue
|
||||
}
|
||||
queries[name] = chQuery.Query
|
||||
tmpl := template.New("clickhouse-query")
|
||||
tmpl, err := tmpl.Parse(chQuery.Query)
|
||||
if err != nil {
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
var query bytes.Buffer
|
||||
err = tmpl.Execute(&query, metricsQueryRangeParams.Variables)
|
||||
if err != nil {
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
queries[name] = query.String()
|
||||
}
|
||||
seriesList, err = execClickHouseQueries(queries)
|
||||
seriesList, err, errQuriesByName = execClickHouseQueries(queries)
|
||||
case model.PROM:
|
||||
seriesList, err = execPromQueries(metricsQueryRangeParams)
|
||||
seriesList, err, errQuriesByName = execPromQueries(metricsQueryRangeParams)
|
||||
default:
|
||||
err = fmt.Errorf("invalid query type")
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, errQuriesByName)
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
respondError(w, apiErrObj, nil)
|
||||
respondError(w, apiErrObj, errQuriesByName)
|
||||
return
|
||||
}
|
||||
if metricsQueryRangeParams.CompositeMetricQuery.PanelType == model.QUERY_VALUE &&
|
||||
@ -707,6 +742,25 @@ func (aH *APIHandler) deleteDashboard(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) queryDashboardVars(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query := r.URL.Query().Get("query")
|
||||
if query == "" {
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("query is required")}, nil)
|
||||
return
|
||||
}
|
||||
if strings.Contains(strings.ToLower(query), "alter table") {
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("query shouldn't alter data")}, nil)
|
||||
return
|
||||
}
|
||||
dashboardVars, err := (*aH.reader).QueryDashboardVars(r.Context(), query)
|
||||
if err != nil {
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
aH.respond(w, dashboardVars)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) updateDashboard(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
uuid := mux.Vars(r)["uuid"]
|
||||
@ -1034,11 +1088,11 @@ func (aH *APIHandler) queryRangeMetrics(w http.ResponseWriter, r *http.Request)
|
||||
if res.Err != nil {
|
||||
switch res.Err.(type) {
|
||||
case promql.ErrQueryCanceled:
|
||||
respondError(w, &model.ApiError{model.ErrorCanceled, res.Err}, nil)
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorCanceled, Err: res.Err}, nil)
|
||||
case promql.ErrQueryTimeout:
|
||||
respondError(w, &model.ApiError{model.ErrorTimeout, res.Err}, nil)
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorTimeout, Err: res.Err}, nil)
|
||||
}
|
||||
respondError(w, &model.ApiError{model.ErrorExec, res.Err}, nil)
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorExec, Err: res.Err}, nil)
|
||||
}
|
||||
|
||||
response_data := &model.QueryData{
|
||||
@ -1088,11 +1142,11 @@ func (aH *APIHandler) queryMetrics(w http.ResponseWriter, r *http.Request) {
|
||||
if res.Err != nil {
|
||||
switch res.Err.(type) {
|
||||
case promql.ErrQueryCanceled:
|
||||
respondError(w, &model.ApiError{model.ErrorCanceled, res.Err}, nil)
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorCanceled, Err: res.Err}, nil)
|
||||
case promql.ErrQueryTimeout:
|
||||
respondError(w, &model.ApiError{model.ErrorTimeout, res.Err}, nil)
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorTimeout, Err: res.Err}, nil)
|
||||
}
|
||||
respondError(w, &model.ApiError{model.ErrorExec, res.Err}, nil)
|
||||
respondError(w, &model.ApiError{Typ: model.ErrorExec, Err: res.Err}, nil)
|
||||
}
|
||||
|
||||
response_data := &model.QueryData{
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"github.com/SigNoz/govaluate"
|
||||
"go.signoz.io/query-service/constants"
|
||||
"go.signoz.io/query-service/model"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type RunQueries struct {
|
||||
@ -50,8 +51,8 @@ func GoValuateFuncs() map[string]govaluate.ExpressionFunction {
|
||||
return GoValuateFuncs
|
||||
}
|
||||
|
||||
// formattedValue formats the value to be used in clickhouse query
|
||||
func formattedValue(v interface{}) string {
|
||||
// FormattedValue formats the value to be used in clickhouse query
|
||||
func FormattedValue(v interface{}) string {
|
||||
switch x := v.(type) {
|
||||
case int:
|
||||
return fmt.Sprintf("%d", x)
|
||||
@ -62,6 +63,9 @@ func formattedValue(v interface{}) string {
|
||||
case bool:
|
||||
return fmt.Sprintf("%v", x)
|
||||
case []interface{}:
|
||||
if len(x) == 0 {
|
||||
return ""
|
||||
}
|
||||
switch x[0].(type) {
|
||||
case string:
|
||||
str := "["
|
||||
@ -75,10 +79,12 @@ func formattedValue(v interface{}) string {
|
||||
return str
|
||||
case int, float32, float64, bool:
|
||||
return strings.Join(strings.Fields(fmt.Sprint(x)), ",")
|
||||
default:
|
||||
zap.L().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x[0])))
|
||||
return ""
|
||||
}
|
||||
return ""
|
||||
default:
|
||||
// may be log the warning here?
|
||||
zap.L().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x)))
|
||||
return ""
|
||||
}
|
||||
}
|
||||
@ -87,7 +93,7 @@ func formattedValue(v interface{}) string {
|
||||
// timeseries based on search criteria
|
||||
func BuildMetricsTimeSeriesFilterQuery(fs *model.FilterSet, groupTags []string, metricName string, aggregateOperator model.AggregateOperator) (string, error) {
|
||||
var conditions []string
|
||||
conditions = append(conditions, fmt.Sprintf("metric_name = %s", formattedValue(metricName)))
|
||||
conditions = append(conditions, fmt.Sprintf("metric_name = %s", FormattedValue(metricName)))
|
||||
if fs != nil && len(fs.Items) != 0 {
|
||||
for _, item := range fs.Items {
|
||||
toFormat := item.Value
|
||||
@ -102,7 +108,7 @@ func BuildMetricsTimeSeriesFilterQuery(fs *model.FilterSet, groupTags []string,
|
||||
toFormat = x[0]
|
||||
}
|
||||
}
|
||||
fmtVal := formattedValue(toFormat)
|
||||
fmtVal := FormattedValue(toFormat)
|
||||
switch op {
|
||||
case "eq":
|
||||
conditions = append(conditions, fmt.Sprintf("labels_object.%s = %s", item.Key, fmtVal))
|
||||
@ -152,7 +158,7 @@ func BuildMetricQuery(qp *model.QueryRangeParamsV2, mq *model.MetricQuery, table
|
||||
return "", err
|
||||
}
|
||||
|
||||
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", formattedValue(mq.MetricName), qp.Start, qp.End)
|
||||
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", FormattedValue(mq.MetricName), qp.Start, qp.End)
|
||||
|
||||
// Select the aggregate value for interval
|
||||
queryTmpl :=
|
||||
@ -419,3 +425,31 @@ func PrepareBuilderMetricQueries(qp *model.QueryRangeParamsV2, tableName string)
|
||||
}
|
||||
return &RunQueries{Queries: namedQueries}
|
||||
}
|
||||
|
||||
// PromFormattedValue formats the value to be used in promql
|
||||
func PromFormattedValue(v interface{}) string {
|
||||
switch x := v.(type) {
|
||||
case int:
|
||||
return fmt.Sprintf("%d", x)
|
||||
case float32, float64:
|
||||
return fmt.Sprintf("%f", x)
|
||||
case string:
|
||||
return fmt.Sprintf("%s", x)
|
||||
case bool:
|
||||
return fmt.Sprintf("%v", x)
|
||||
case []interface{}:
|
||||
if len(x) == 0 {
|
||||
return ""
|
||||
}
|
||||
switch x[0].(type) {
|
||||
case string, int, float32, float64, bool:
|
||||
return strings.Trim(strings.Join(strings.Fields(fmt.Sprint(x)), "|"), "[]")
|
||||
default:
|
||||
zap.L().Error("invalid type for prom formatted value", zap.Any("type", reflect.TypeOf(x[0])))
|
||||
return ""
|
||||
}
|
||||
default:
|
||||
zap.L().Error("invalid type for prom formatted value", zap.Any("type", reflect.TypeOf(x)))
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"go.signoz.io/query-service/app/metrics"
|
||||
"go.signoz.io/query-service/model"
|
||||
@ -36,6 +37,44 @@ func ParseMetricQueryRangeParams(r *http.Request) (*model.QueryRangeParamsV2, *m
|
||||
if err := validateQueryRangeParamsV2(postData); err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
}
|
||||
// prepare the variables for the corrspnding query type
|
||||
formattedVars := make(map[string]interface{})
|
||||
for name, value := range postData.Variables {
|
||||
if postData.CompositeMetricQuery.QueryType == model.PROM {
|
||||
formattedVars[name] = metrics.PromFormattedValue(value)
|
||||
} else if postData.CompositeMetricQuery.QueryType == model.CLICKHOUSE {
|
||||
formattedVars[name] = metrics.FormattedValue(value)
|
||||
}
|
||||
}
|
||||
// replace the variables in metrics builder filter item with actual value
|
||||
if postData.CompositeMetricQuery.QueryType == model.QUERY_BUILDER {
|
||||
for _, query := range postData.CompositeMetricQuery.BuilderQueries {
|
||||
for idx := range query.TagFilters.Items {
|
||||
item := &query.TagFilters.Items[idx]
|
||||
value := item.Value
|
||||
if value != nil {
|
||||
switch x := value.(type) {
|
||||
case string:
|
||||
variableName := strings.Trim(x, "{{ . }}")
|
||||
if _, ok := postData.Variables[variableName]; ok {
|
||||
item.Value = postData.Variables[variableName]
|
||||
}
|
||||
case []interface{}:
|
||||
if len(x) > 0 {
|
||||
switch x[0].(type) {
|
||||
case string:
|
||||
variableName := strings.Trim(x[0].(string), "{{ . }}")
|
||||
if _, ok := postData.Variables[variableName]; ok {
|
||||
item.Value = postData.Variables[variableName]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
postData.Variables = formattedVars
|
||||
|
||||
return postData, nil
|
||||
}
|
||||
|
@ -70,4 +70,6 @@ type Reader interface {
|
||||
|
||||
// Connection needed for rules, not ideal but required
|
||||
GetConn() clickhouse.Conn
|
||||
|
||||
QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error)
|
||||
}
|
||||
|
@ -118,11 +118,12 @@ const (
|
||||
)
|
||||
|
||||
type QueryRangeParamsV2 struct {
|
||||
DataSource DataSource `json:"dataSource"`
|
||||
Start int64 `json:"start"`
|
||||
End int64 `json:"end"`
|
||||
Step int64 `json:"step"`
|
||||
CompositeMetricQuery *CompositeMetricQuery `json:"compositeMetricQuery"`
|
||||
DataSource DataSource `json:"dataSource"`
|
||||
Start int64 `json:"start"`
|
||||
End int64 `json:"end"`
|
||||
Step int64 `json:"step"`
|
||||
CompositeMetricQuery *CompositeMetricQuery `json:"compositeMetricQuery"`
|
||||
Variables map[string]interface{} `json:"variables,omitempty"`
|
||||
}
|
||||
|
||||
// Metric auto complete types
|
||||
|
@ -492,3 +492,7 @@ func (s *ServiceItem) MarshalJSON() ([]byte, error) {
|
||||
Alias: (*Alias)(s),
|
||||
})
|
||||
}
|
||||
|
||||
type DashboardVar struct {
|
||||
VariableValues []interface{} `json:"variableValues"`
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user