From 983ca1ec6a3812eee99724e2e2f55e0978ffaa92 Mon Sep 17 00:00:00 2001 From: Vishal Sharma Date: Thu, 24 Nov 2022 18:18:19 +0530 Subject: [PATCH] feat: introduce getSubTreeSpans function in clickhouse query builder & introduce smart trace detail algorithm (#1648) * perf: introduce smart trace detail algorithm * fix: remove hardcoded levels and handle null levels * feat: add support for broken trees * feat: use spanLimit env variable * fix: handle missing root span * add root spanId and root name * use permanent table * add kind, events and tagmap support * fix query formation * increase context timeout to 600s * perf improvement * handle error * return tableName as response to query * support multiple queries tableName * perf: improve memory and latency * feat: add getSubTree custom func and smart trace detail algo to ee * fix: create new functions for ee * chore: refactor codebase * chore: refactor frontend code Co-authored-by: Ankit Nayan Co-authored-by: Palash Gupta --- ee/query-service/app/api/api.go | 3 + ee/query-service/app/api/metrics.go | 236 +++++++++++ ee/query-service/app/api/traces.go | 39 ++ ee/query-service/app/db/metrics.go | 374 ++++++++++++++++++ ee/query-service/app/db/reader.go | 5 +- ee/query-service/app/db/trace.go | 222 +++++++++++ ee/query-service/app/server.go | 2 +- ee/query-service/constants/constants.go | 2 + ee/query-service/model/plans.go | 12 +- ee/query-service/model/trace.go | 22 ++ frontend/src/api/trace/getTraceItem.ts | 11 +- .../src/container/Trace/TraceTable/index.tsx | 7 +- frontend/src/container/TraceDetail/index.tsx | 17 +- frontend/src/container/TraceDetail/utils.ts | 25 ++ frontend/src/pages/TraceDetail/index.tsx | 14 +- frontend/src/types/api/trace/getTraceItem.ts | 7 + go.mod | 7 +- go.sum | 14 +- .../app/clickhouseReader/reader.go | 182 +++++---- pkg/query-service/app/http_handler.go | 17 +- pkg/query-service/app/parser.go | 24 ++ pkg/query-service/app/server.go | 2 +- pkg/query-service/interfaces/interface.go | 3 +- pkg/query-service/model/featureSet.go | 2 + pkg/query-service/model/response.go | 23 +- pkg/query-service/model/response_easyjson.go | 328 +++++++++++++++ 26 files changed, 1480 insertions(+), 120 deletions(-) create mode 100644 ee/query-service/app/api/metrics.go create mode 100644 ee/query-service/app/api/traces.go create mode 100644 ee/query-service/app/db/metrics.go create mode 100644 ee/query-service/app/db/trace.go create mode 100644 ee/query-service/model/trace.go create mode 100644 pkg/query-service/model/response_easyjson.go diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index a6497b615e..85bec52122 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -114,6 +114,9 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router) { router.HandleFunc("/api/v1/invite/{token}", baseapp.OpenAccess(ah.getInvite)).Methods(http.MethodGet) router.HandleFunc("/api/v1/register", baseapp.OpenAccess(ah.registerUser)).Methods(http.MethodPost) router.HandleFunc("/api/v1/login", baseapp.OpenAccess(ah.loginUser)).Methods(http.MethodPost) + router.HandleFunc("/api/v1/traces/{traceId}", baseapp.ViewAccess(ah.searchTraces)).Methods(http.MethodGet) + router.HandleFunc("/api/v2/metrics/query_range", baseapp.ViewAccess(ah.queryRangeMetricsV2)).Methods(http.MethodPost) + ah.APIHandler.RegisterRoutes(router) } diff --git a/ee/query-service/app/api/metrics.go b/ee/query-service/app/api/metrics.go new file mode 100644 index 0000000000..4b1a8e49dd --- /dev/null +++ b/ee/query-service/app/api/metrics.go @@ -0,0 +1,236 @@ +package api + +import ( + "bytes" + "fmt" + "net/http" + "sync" + "text/template" + "time" + + "go.signoz.io/signoz/pkg/query-service/app/metrics" + "go.signoz.io/signoz/pkg/query-service/app/parser" + "go.signoz.io/signoz/pkg/query-service/constants" + basemodel "go.signoz.io/signoz/pkg/query-service/model" + querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate" + "go.uber.org/zap" +) + +func (ah *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request) { + if !ah.CheckFeature(basemodel.CustomMetricsFunction) { + zap.S().Info("CustomMetricsFunction feature is not enabled in this plan") + ah.APIHandler.QueryRangeMetricsV2(w, r) + return + } + metricsQueryRangeParams, apiErrorObj := parser.ParseMetricQueryRangeParams(r) + + if apiErrorObj != nil { + zap.S().Errorf(apiErrorObj.Err.Error()) + RespondError(w, apiErrorObj, nil) + return + } + + // prometheus instant query needs same timestamp + if metricsQueryRangeParams.CompositeMetricQuery.PanelType == basemodel.QUERY_VALUE && + metricsQueryRangeParams.CompositeMetricQuery.QueryType == basemodel.PROM { + metricsQueryRangeParams.Start = metricsQueryRangeParams.End + } + + // round up the end to nearest multiple + if metricsQueryRangeParams.CompositeMetricQuery.QueryType == basemodel.QUERY_BUILDER { + end := (metricsQueryRangeParams.End) / 1000 + step := metricsQueryRangeParams.Step + metricsQueryRangeParams.End = (end / step * step) * 1000 + } + + type channelResult struct { + Series []*basemodel.Series + TableName string + Err error + Name string + Query string + } + + execClickHouseQueries := func(queries map[string]string) ([]*basemodel.Series, []string, error, map[string]string) { + var seriesList []*basemodel.Series + var tableName []string + ch := make(chan channelResult, len(queries)) + var wg sync.WaitGroup + + for name, query := range queries { + wg.Add(1) + go func(name, query string) { + defer wg.Done() + seriesList, tableName, err := ah.opts.DataConnector.GetMetricResultEE(r.Context(), query) + for _, series := range seriesList { + series.QueryName = name + } + + if err != nil { + ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query} + return + } + ch <- channelResult{Series: seriesList, TableName: tableName} + }(name, query) + } + + wg.Wait() + close(ch) + + var errs []error + errQuriesByName := make(map[string]string) + // read values from the channel + for r := range ch { + if r.Err != nil { + errs = append(errs, r.Err) + errQuriesByName[r.Name] = r.Query + continue + } + seriesList = append(seriesList, r.Series...) + tableName = append(tableName, r.TableName) + } + if len(errs) != 0 { + return nil, nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n")), errQuriesByName + } + return seriesList, tableName, nil, nil + } + + execPromQueries := func(metricsQueryRangeParams *basemodel.QueryRangeParamsV2) ([]*basemodel.Series, error, map[string]string) { + var seriesList []*basemodel.Series + ch := make(chan channelResult, len(metricsQueryRangeParams.CompositeMetricQuery.PromQueries)) + var wg sync.WaitGroup + + for name, query := range metricsQueryRangeParams.CompositeMetricQuery.PromQueries { + if query.Disabled { + continue + } + wg.Add(1) + go func(name string, query *basemodel.PromQuery) { + var seriesList []*basemodel.Series + defer wg.Done() + tmpl := template.New("promql-query") + tmpl, tmplErr := tmpl.Parse(query.Query) + if tmplErr != nil { + ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query} + return + } + var queryBuf bytes.Buffer + tmplErr = tmpl.Execute(&queryBuf, metricsQueryRangeParams.Variables) + if tmplErr != nil { + ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query} + return + } + query.Query = queryBuf.String() + queryModel := basemodel.QueryRangeParams{ + Start: time.UnixMilli(metricsQueryRangeParams.Start), + End: time.UnixMilli(metricsQueryRangeParams.End), + Step: time.Duration(metricsQueryRangeParams.Step * int64(time.Second)), + Query: query.Query, + } + promResult, _, err := ah.opts.DataConnector.GetQueryRangeResult(r.Context(), &queryModel) + if err != nil { + ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query.Query} + return + } + matrix, _ := promResult.Matrix() + for _, v := range matrix { + var s basemodel.Series + s.QueryName = name + s.Labels = v.Metric.Copy().Map() + for _, p := range v.Points { + s.Points = append(s.Points, basemodel.MetricPoint{Timestamp: p.T, Value: p.V}) + } + seriesList = append(seriesList, &s) + } + ch <- channelResult{Series: seriesList} + }(name, query) + } + + wg.Wait() + close(ch) + + var errs []error + errQuriesByName := make(map[string]string) + // read values from the channel + for r := range ch { + if r.Err != nil { + errs = append(errs, r.Err) + errQuriesByName[r.Name] = r.Query + continue + } + seriesList = append(seriesList, r.Series...) + } + if len(errs) != 0 { + return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n")), errQuriesByName + } + return seriesList, nil, nil + } + + var seriesList []*basemodel.Series + var tableName []string + var err error + var errQuriesByName map[string]string + switch metricsQueryRangeParams.CompositeMetricQuery.QueryType { + case basemodel.QUERY_BUILDER: + runQueries := metrics.PrepareBuilderMetricQueries(metricsQueryRangeParams, constants.SIGNOZ_TIMESERIES_TABLENAME) + if runQueries.Err != nil { + RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: runQueries.Err}, nil) + return + } + seriesList, tableName, err, errQuriesByName = execClickHouseQueries(runQueries.Queries) + + case basemodel.CLICKHOUSE: + queries := make(map[string]string) + + for name, chQuery := range metricsQueryRangeParams.CompositeMetricQuery.ClickHouseQueries { + if chQuery.Disabled { + continue + } + tmpl := template.New("clickhouse-query") + tmpl, err := tmpl.Parse(chQuery.Query) + if err != nil { + RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}, nil) + return + } + var query bytes.Buffer + + // replace go template variables + querytemplate.AssignReservedVars(metricsQueryRangeParams) + + err = tmpl.Execute(&query, metricsQueryRangeParams.Variables) + if err != nil { + RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}, nil) + return + } + queries[name] = query.String() + } + seriesList, tableName, err, errQuriesByName = execClickHouseQueries(queries) + case basemodel.PROM: + seriesList, err, errQuriesByName = execPromQueries(metricsQueryRangeParams) + default: + err = fmt.Errorf("invalid query type") + RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}, errQuriesByName) + return + } + + if err != nil { + apiErrObj := &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + if metricsQueryRangeParams.CompositeMetricQuery.PanelType == basemodel.QUERY_VALUE && + len(seriesList) > 1 && + (metricsQueryRangeParams.CompositeMetricQuery.QueryType == basemodel.QUERY_BUILDER || + metricsQueryRangeParams.CompositeMetricQuery.QueryType == basemodel.CLICKHOUSE) { + RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: fmt.Errorf("invalid: query resulted in more than one series for value type")}, nil) + return + } + + type ResponseFormat struct { + ResultType string `json:"resultType"` + Result []*basemodel.Series `json:"result"` + TableName []string `json:"tableName"` + } + resp := ResponseFormat{ResultType: "matrix", Result: seriesList, TableName: tableName} + ah.Respond(w, resp) +} diff --git a/ee/query-service/app/api/traces.go b/ee/query-service/app/api/traces.go new file mode 100644 index 0000000000..22d66f7a82 --- /dev/null +++ b/ee/query-service/app/api/traces.go @@ -0,0 +1,39 @@ +package api + +import ( + "net/http" + "strconv" + + "go.signoz.io/signoz/ee/query-service/app/db" + "go.signoz.io/signoz/ee/query-service/constants" + "go.signoz.io/signoz/ee/query-service/model" + baseapp "go.signoz.io/signoz/pkg/query-service/app" + basemodel "go.signoz.io/signoz/pkg/query-service/model" + "go.uber.org/zap" +) + +func (ah *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) { + + if !ah.CheckFeature(basemodel.SmartTraceDetail) { + zap.S().Info("SmartTraceDetail feature is not enabled in this plan") + ah.APIHandler.SearchTraces(w, r) + return + } + traceId, spanId, levelUpInt, levelDownInt, err := baseapp.ParseSearchTracesParams(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params") + return + } + spanLimit, err := strconv.Atoi(constants.SpanLimitStr) + if err != nil { + zap.S().Error("Error during strconv.Atoi() on SPAN_LIMIT env variable: ", err) + return + } + result, err := ah.opts.DataConnector.SearchTraces(r.Context(), traceId, spanId, levelUpInt, levelDownInt, spanLimit, db.SmartTraceAlgorithm) + if ah.HandleError(w, err, http.StatusBadRequest) { + return + } + + ah.WriteJSON(w, r, result) + +} diff --git a/ee/query-service/app/db/metrics.go b/ee/query-service/app/db/metrics.go new file mode 100644 index 0000000000..77e7d50c9b --- /dev/null +++ b/ee/query-service/app/db/metrics.go @@ -0,0 +1,374 @@ +package db + +import ( + "context" + "crypto/md5" + "encoding/json" + "fmt" + "reflect" + "regexp" + "sort" + "strings" + "time" + + "go.signoz.io/signoz/ee/query-service/model" + basemodel "go.signoz.io/signoz/pkg/query-service/model" + "go.signoz.io/signoz/pkg/query-service/utils" + "go.uber.org/zap" +) + +// GetMetricResultEE runs the query and returns list of time series +func (r *ClickhouseReader) GetMetricResultEE(ctx context.Context, query string) ([]*basemodel.Series, string, error) { + + defer utils.Elapsed("GetMetricResult")() + zap.S().Infof("Executing metric result query: %s", query) + + var hash string + // If getSubTreeSpans function is used in the clickhouse query + if strings.Index(query, "getSubTreeSpans(") != -1 { + var err error + query, hash, err = r.getSubTreeSpansCustomFunction(ctx, query, hash) + if err == fmt.Errorf("No spans found for the given query") { + return nil, "", nil + } + if err != nil { + return nil, "", err + } + } + + rows, err := r.conn.Query(ctx, query) + zap.S().Debug(query) + if err != nil { + zap.S().Debug("Error in processing query: ", err) + return nil, "", fmt.Errorf("error in processing query") + } + + var ( + columnTypes = rows.ColumnTypes() + columnNames = rows.Columns() + vars = make([]interface{}, len(columnTypes)) + ) + for i := range columnTypes { + vars[i] = reflect.New(columnTypes[i].ScanType()).Interface() + } + // when group by is applied, each combination of cartesian product + // of attributes is separate series. each item in metricPointsMap + // represent a unique series. + metricPointsMap := make(map[string][]basemodel.MetricPoint) + // attribute key-value pairs for each group selection + attributesMap := make(map[string]map[string]string) + + defer rows.Close() + for rows.Next() { + if err := rows.Scan(vars...); err != nil { + return nil, "", err + } + var groupBy []string + var metricPoint basemodel.MetricPoint + groupAttributes := make(map[string]string) + // Assuming that the end result row contains a timestamp, value and option labels + // Label key and value are both strings. + for idx, v := range vars { + colName := columnNames[idx] + switch v := v.(type) { + case *string: + // special case for returning all labels + if colName == "fullLabels" { + var metric map[string]string + err := json.Unmarshal([]byte(*v), &metric) + if err != nil { + return nil, "", err + } + for key, val := range metric { + groupBy = append(groupBy, val) + groupAttributes[key] = val + } + } else { + groupBy = append(groupBy, *v) + groupAttributes[colName] = *v + } + case *time.Time: + metricPoint.Timestamp = v.UnixMilli() + case *float64: + metricPoint.Value = *v + } + } + sort.Strings(groupBy) + key := strings.Join(groupBy, "") + attributesMap[key] = groupAttributes + metricPointsMap[key] = append(metricPointsMap[key], metricPoint) + } + + var seriesList []*basemodel.Series + for key := range metricPointsMap { + points := metricPointsMap[key] + // first point in each series could be invalid since the + // aggregations are applied with point from prev series + if len(points) != 0 && len(points) > 1 { + points = points[1:] + } + attributes := attributesMap[key] + series := basemodel.Series{Labels: attributes, Points: points} + seriesList = append(seriesList, &series) + } + // err = r.conn.Exec(ctx, "DROP TEMPORARY TABLE IF EXISTS getSubTreeSpans"+hash) + // if err != nil { + // zap.S().Error("Error in dropping temporary table: ", err) + // return nil, err + // } + if hash == "" { + return seriesList, hash, nil + } else { + return seriesList, "getSubTreeSpans" + hash, nil + } +} + +func (r *ClickhouseReader) getSubTreeSpansCustomFunction(ctx context.Context, query string, hash string) (string, string, error) { + + zap.S().Debugf("Executing getSubTreeSpans function") + + // str1 := `select fromUnixTimestamp64Milli(intDiv( toUnixTimestamp64Milli ( timestamp ), 100) * 100) AS interval, toFloat64(count()) as count from (select timestamp, spanId, parentSpanId, durationNano from getSubTreeSpans(select * from signoz_traces.signoz_index_v2 where serviceName='frontend' and name='/driver.DriverService/FindNearest' and traceID='00000000000000004b0a863cb5ed7681') where name='FindDriverIDs' group by interval order by interval asc;` + + // process the query to fetch subTree query + var subtreeInput string + query, subtreeInput, hash = processQuery(query, hash) + + err := r.conn.Exec(ctx, "DROP TABLE IF EXISTS getSubTreeSpans"+hash) + if err != nil { + zap.S().Error("Error in dropping temporary table: ", err) + return query, hash, err + } + + // Create temporary table to store the getSubTreeSpans() results + zap.S().Debugf("Creating temporary table getSubTreeSpans%s", hash) + err = r.conn.Exec(ctx, "CREATE TABLE IF NOT EXISTS "+"getSubTreeSpans"+hash+" (timestamp DateTime64(9) CODEC(DoubleDelta, LZ4), traceID FixedString(32) CODEC(ZSTD(1)), spanID String CODEC(ZSTD(1)), parentSpanID String CODEC(ZSTD(1)), rootSpanID String CODEC(ZSTD(1)), serviceName LowCardinality(String) CODEC(ZSTD(1)), name LowCardinality(String) CODEC(ZSTD(1)), rootName LowCardinality(String) CODEC(ZSTD(1)), durationNano UInt64 CODEC(T64, ZSTD(1)), kind Int8 CODEC(T64, ZSTD(1)), tagMap Map(LowCardinality(String), String) CODEC(ZSTD(1)), events Array(String) CODEC(ZSTD(2))) ENGINE = MergeTree() ORDER BY (timestamp)") + if err != nil { + zap.S().Error("Error in creating temporary table: ", err) + return query, hash, err + } + + var getSpansSubQueryDBResponses []model.GetSpansSubQueryDBResponse + getSpansSubQuery := subtreeInput + // Execute the subTree query + zap.S().Debugf("Executing subTree query: %s", getSpansSubQuery) + err = r.conn.Select(ctx, &getSpansSubQueryDBResponses, getSpansSubQuery) + + // zap.S().Info(getSpansSubQuery) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return query, hash, fmt.Errorf("Error in processing sql query") + } + + var searchScanResponses []basemodel.SearchSpanDBResponseItem + + // TODO : @ankit: I think the algorithm does not need to assume that subtrees are from the same TraceID. We can take this as an improvement later. + // Fetch all the spans from of same TraceID so that we can build subtree + modelQuery := fmt.Sprintf("SELECT timestamp, traceID, model FROM %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable) + + if len(getSpansSubQueryDBResponses) == 0 { + return query, hash, fmt.Errorf("No spans found for the given query") + } + zap.S().Debugf("Executing query to fetch all the spans from the same TraceID: %s", modelQuery) + err = r.conn.Select(ctx, &searchScanResponses, modelQuery, getSpansSubQueryDBResponses[0].TraceID) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return query, hash, fmt.Errorf("Error in processing sql query") + } + + // Process model to fetch the spans + zap.S().Debugf("Processing model to fetch the spans") + searchSpanResponses := []basemodel.SearchSpanResponseItem{} + for _, item := range searchScanResponses { + var jsonItem basemodel.SearchSpanResponseItem + json.Unmarshal([]byte(item.Model), &jsonItem) + jsonItem.TimeUnixNano = uint64(item.Timestamp.UnixNano()) + if jsonItem.Events == nil { + jsonItem.Events = []string{} + } + searchSpanResponses = append(searchSpanResponses, jsonItem) + } + // Build the subtree and store all the subtree spans in temporary table getSubTreeSpans+hash + // Use map to store pointer to the spans to avoid duplicates and save memory + zap.S().Debugf("Building the subtree to store all the subtree spans in temporary table getSubTreeSpans%s", hash) + + treeSearchResponse, err := getSubTreeAlgorithm(searchSpanResponses, getSpansSubQueryDBResponses) + if err != nil { + zap.S().Error("Error in getSubTreeAlgorithm function: ", err) + return query, hash, err + } + zap.S().Debugf("Preparing batch to store subtree spans in temporary table getSubTreeSpans%s", hash) + statement, err := r.conn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO getSubTreeSpans"+hash)) + if err != nil { + zap.S().Error("Error in preparing batch statement: ", err) + return query, hash, err + } + for _, span := range treeSearchResponse { + var parentID string + if len(span.References) > 0 && span.References[0].RefType == "CHILD_OF" { + parentID = span.References[0].SpanId + } + err = statement.Append( + time.Unix(0, int64(span.TimeUnixNano)), + span.TraceID, + span.SpanID, + parentID, + span.RootSpanID, + span.ServiceName, + span.Name, + span.RootName, + uint64(span.DurationNano), + int8(span.Kind), + span.TagMap, + span.Events, + ) + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return query, hash, err + } + } + zap.S().Debugf("Inserting the subtree spans in temporary table getSubTreeSpans%s", hash) + err = statement.Send() + if err != nil { + zap.S().Error("Error in sending statement: ", err) + return query, hash, err + } + return query, hash, nil +} + +func processQuery(query string, hash string) (string, string, string) { + re3 := regexp.MustCompile(`getSubTreeSpans`) + + submatchall3 := re3.FindAllStringIndex(query, -1) + getSubtreeSpansMatchIndex := submatchall3[0][1] + + query2countParenthesis := query[getSubtreeSpansMatchIndex:] + + sqlCompleteIndex := 0 + countParenthesisImbalance := 0 + for i, char := range query2countParenthesis { + + if string(char) == "(" { + countParenthesisImbalance += 1 + } + if string(char) == ")" { + countParenthesisImbalance -= 1 + } + if countParenthesisImbalance == 0 { + sqlCompleteIndex = i + break + } + } + subtreeInput := query2countParenthesis[1:sqlCompleteIndex] + + // hash the subtreeInput + hmd5 := md5.Sum([]byte(subtreeInput)) + hash = fmt.Sprintf("%x", hmd5) + + // Reformat the query to use the getSubTreeSpans function + query = query[:getSubtreeSpansMatchIndex] + hash + " " + query2countParenthesis[sqlCompleteIndex+1:] + return query, subtreeInput, hash +} + +// getSubTreeAlgorithm is an algorithm to build the subtrees of the spans and return the list of spans +func getSubTreeAlgorithm(payload []basemodel.SearchSpanResponseItem, getSpansSubQueryDBResponses []model.GetSpansSubQueryDBResponse) (map[string]*basemodel.SearchSpanResponseItem, error) { + + var spans []*model.SpanForTraceDetails + for _, spanItem := range payload { + var parentID string + if len(spanItem.References) > 0 && spanItem.References[0].RefType == "CHILD_OF" { + parentID = spanItem.References[0].SpanId + } + span := &model.SpanForTraceDetails{ + TimeUnixNano: spanItem.TimeUnixNano, + SpanID: spanItem.SpanID, + TraceID: spanItem.TraceID, + ServiceName: spanItem.ServiceName, + Name: spanItem.Name, + Kind: spanItem.Kind, + DurationNano: spanItem.DurationNano, + TagMap: spanItem.TagMap, + ParentID: parentID, + Events: spanItem.Events, + HasError: spanItem.HasError, + } + spans = append(spans, span) + } + + zap.S().Debug("Building Tree") + roots, err := buildSpanTrees(&spans) + if err != nil { + return nil, err + } + searchSpansResult := make(map[string]*basemodel.SearchSpanResponseItem) + // Every span which was fetched from getSubTree Input SQL query is considered root + // For each root, get the subtree spans + for _, getSpansSubQueryDBResponse := range getSpansSubQueryDBResponses { + targetSpan := &model.SpanForTraceDetails{} + // zap.S().Debug("Building tree for span id: " + getSpansSubQueryDBResponse.SpanID + " " + strconv.Itoa(i+1) + " of " + strconv.Itoa(len(getSpansSubQueryDBResponses))) + // Search target span object in the tree + for _, root := range roots { + targetSpan, err = breadthFirstSearch(root, getSpansSubQueryDBResponse.SpanID) + if targetSpan != nil { + break + } + if err != nil { + zap.S().Error("Error during BreadthFirstSearch(): ", err) + return nil, err + } + } + if targetSpan == nil { + return nil, nil + } + // Build subtree for the target span + // Mark the target span as root by setting parent ID as empty string + targetSpan.ParentID = "" + preParents := []*model.SpanForTraceDetails{targetSpan} + children := []*model.SpanForTraceDetails{} + + // Get the subtree child spans + for i := 0; len(preParents) != 0; i++ { + parents := []*model.SpanForTraceDetails{} + for _, parent := range preParents { + children = append(children, parent.Children...) + parents = append(parents, parent.Children...) + } + preParents = parents + } + + resultSpans := children + // Add the target span to the result spans + resultSpans = append(resultSpans, targetSpan) + + for _, item := range resultSpans { + references := []basemodel.OtelSpanRef{ + { + TraceId: item.TraceID, + SpanId: item.ParentID, + RefType: "CHILD_OF", + }, + } + + if item.Events == nil { + item.Events = []string{} + } + searchSpansResult[item.SpanID] = &basemodel.SearchSpanResponseItem{ + TimeUnixNano: item.TimeUnixNano, + SpanID: item.SpanID, + TraceID: item.TraceID, + ServiceName: item.ServiceName, + Name: item.Name, + Kind: item.Kind, + References: references, + DurationNano: item.DurationNano, + TagMap: item.TagMap, + Events: item.Events, + HasError: item.HasError, + RootSpanID: getSpansSubQueryDBResponse.SpanID, + RootName: targetSpan.Name, + } + } + } + return searchSpansResult, nil +} diff --git a/ee/query-service/app/db/reader.go b/ee/query-service/app/db/reader.go index e948ee430b..fc26ec3ce2 100644 --- a/ee/query-service/app/db/reader.go +++ b/ee/query-service/app/db/reader.go @@ -6,6 +6,7 @@ import ( "github.com/jmoiron/sqlx" basechr "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader" + "go.signoz.io/signoz/pkg/query-service/interfaces" ) type ClickhouseReader struct { @@ -14,8 +15,8 @@ type ClickhouseReader struct { *basechr.ClickHouseReader } -func NewDataConnector(localDB *sqlx.DB, promConfigPath string) *ClickhouseReader { - ch := basechr.NewReader(localDB, promConfigPath) +func NewDataConnector(localDB *sqlx.DB, promConfigPath string, lm interfaces.FeatureLookup) *ClickhouseReader { + ch := basechr.NewReader(localDB, promConfigPath, lm) return &ClickhouseReader{ conn: ch.GetConn(), appdb: localDB, diff --git a/ee/query-service/app/db/trace.go b/ee/query-service/app/db/trace.go new file mode 100644 index 0000000000..529a9a93fd --- /dev/null +++ b/ee/query-service/app/db/trace.go @@ -0,0 +1,222 @@ +package db + +import ( + "errors" + "strconv" + + "go.signoz.io/signoz/ee/query-service/model" + basemodel "go.signoz.io/signoz/pkg/query-service/model" + "go.uber.org/zap" +) + +// SmartTraceAlgorithm is an algorithm to find the target span and build a tree of spans around it with the given levelUp and levelDown parameters and the given spanLimit +func SmartTraceAlgorithm(payload []basemodel.SearchSpanResponseItem, targetSpanId string, levelUp int, levelDown int, spanLimit int) ([]basemodel.SearchSpansResult, error) { + var spans []*model.SpanForTraceDetails + + // Build a slice of spans from the payload + for _, spanItem := range payload { + var parentID string + if len(spanItem.References) > 0 && spanItem.References[0].RefType == "CHILD_OF" { + parentID = spanItem.References[0].SpanId + } + span := &model.SpanForTraceDetails{ + TimeUnixNano: spanItem.TimeUnixNano, + SpanID: spanItem.SpanID, + TraceID: spanItem.TraceID, + ServiceName: spanItem.ServiceName, + Name: spanItem.Name, + Kind: spanItem.Kind, + DurationNano: spanItem.DurationNano, + TagMap: spanItem.TagMap, + ParentID: parentID, + Events: spanItem.Events, + HasError: spanItem.HasError, + } + spans = append(spans, span) + } + + // Build span trees from the spans + roots, err := buildSpanTrees(&spans) + if err != nil { + return nil, err + } + targetSpan := &model.SpanForTraceDetails{} + + // Find the target span in the span trees + for _, root := range roots { + targetSpan, err = breadthFirstSearch(root, targetSpanId) + if targetSpan != nil { + break + } + if err != nil { + zap.S().Error("Error during BreadthFirstSearch(): ", err) + return nil, err + } + } + + // If the target span is not found, return span not found error + if targetSpan == nil { + return nil, errors.New("Span not found") + } + + // Build the final result + parents := []*model.SpanForTraceDetails{} + + // Get the parent spans of the target span up to the given levelUp parameter and spanLimit + preParent := targetSpan + for i := 0; i < levelUp+1; i++ { + if i == levelUp { + preParent.ParentID = "" + } + if spanLimit-len(preParent.Children) <= 0 { + parents = append(parents, preParent) + parents = append(parents, preParent.Children[:spanLimit]...) + spanLimit -= (len(preParent.Children[:spanLimit]) + 1) + preParent.ParentID = "" + break + } + parents = append(parents, preParent) + parents = append(parents, preParent.Children...) + spanLimit -= (len(preParent.Children) + 1) + preParent = preParent.ParentSpan + if preParent == nil { + break + } + } + + // Get the child spans of the target span until the given levelDown and spanLimit + preParents := []*model.SpanForTraceDetails{targetSpan} + children := []*model.SpanForTraceDetails{} + + for i := 0; i < levelDown && len(preParents) != 0 && spanLimit > 0; i++ { + parents := []*model.SpanForTraceDetails{} + for _, parent := range preParents { + if spanLimit-len(parent.Children) <= 0 { + children = append(children, parent.Children[:spanLimit]...) + spanLimit -= len(parent.Children[:spanLimit]) + break + } + children = append(children, parent.Children...) + parents = append(parents, parent.Children...) + } + preParents = parents + } + + // Store the final list of spans in the resultSpanSet map to avoid duplicates + resultSpansSet := make(map[*model.SpanForTraceDetails]struct{}) + resultSpansSet[targetSpan] = struct{}{} + for _, parent := range parents { + resultSpansSet[parent] = struct{}{} + } + for _, child := range children { + resultSpansSet[child] = struct{}{} + } + + searchSpansResult := []basemodel.SearchSpansResult{{ + Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError"}, + Events: make([][]interface{}, len(resultSpansSet)), + }, + } + + // Convert the resultSpansSet map to searchSpansResult + i := 0 // index for spans + for item := range resultSpansSet { + references := []basemodel.OtelSpanRef{ + { + TraceId: item.TraceID, + SpanId: item.ParentID, + RefType: "CHILD_OF", + }, + } + + referencesStringArray := []string{} + for _, item := range references { + referencesStringArray = append(referencesStringArray, item.ToString()) + } + keys := make([]string, 0, len(item.TagMap)) + values := make([]string, 0, len(item.TagMap)) + + for k, v := range item.TagMap { + keys = append(keys, k) + values = append(values, v) + } + if item.Events == nil { + item.Events = []string{} + } + searchSpansResult[0].Events[i] = []interface{}{ + item.TimeUnixNano, + item.SpanID, + item.TraceID, + item.ServiceName, + item.Name, + strconv.Itoa(int(item.Kind)), + strconv.FormatInt(item.DurationNano, 10), + keys, + values, + referencesStringArray, + item.Events, + item.HasError, + } + i++ // increment index + } + return searchSpansResult, nil +} + +// buildSpanTrees builds trees of spans from a list of spans. +func buildSpanTrees(spansPtr *[]*model.SpanForTraceDetails) ([]*model.SpanForTraceDetails, error) { + + // Build a map of spanID to span for fast lookup + var roots []*model.SpanForTraceDetails + spans := *spansPtr + mapOfSpans := make(map[string]*model.SpanForTraceDetails, len(spans)) + + for _, span := range spans { + if span.ParentID == "" { + roots = append(roots, span) + } + mapOfSpans[span.SpanID] = span + } + + // Build the span tree by adding children to the parent spans + for _, span := range spans { + if span.ParentID == "" { + continue + } + parent := mapOfSpans[span.ParentID] + + // If the parent span is not found, add current span to list of roots + if parent == nil { + // zap.S().Debug("Parent Span not found parent_id: ", span.ParentID) + roots = append(roots, span) + span.ParentID = "" + continue + } + + span.ParentSpan = parent + parent.Children = append(parent.Children, span) + } + + return roots, nil +} + +// breadthFirstSearch performs a breadth-first search on the span tree to find the target span. +func breadthFirstSearch(spansPtr *model.SpanForTraceDetails, targetId string) (*model.SpanForTraceDetails, error) { + queue := []*model.SpanForTraceDetails{spansPtr} + visited := make(map[string]bool) + + for len(queue) > 0 { + current := queue[0] + visited[current.SpanID] = true + queue = queue[1:] + if current.SpanID == targetId { + return current, nil + } + + for _, child := range current.Children { + if ok, _ := visited[child.SpanID]; !ok { + queue = append(queue, child) + } + } + } + return nil, nil +} diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 7002af3f41..501ad96aa9 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -98,7 +98,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { storage := os.Getenv("STORAGE") if storage == "clickhouse" { zap.S().Info("Using ClickHouse as datastore ...") - qb := db.NewDataConnector(localDB, serverOptions.PromConfigPath) + qb := db.NewDataConnector(localDB, serverOptions.PromConfigPath, lm) go qb.Start(readerReady) reader = qb } else { diff --git a/ee/query-service/constants/constants.go b/ee/query-service/constants/constants.go index ba9bb141a5..45fad74da6 100644 --- a/ee/query-service/constants/constants.go +++ b/ee/query-service/constants/constants.go @@ -10,6 +10,8 @@ const ( var LicenseSignozIo = "https://license.signoz.io/api/v1" +var SpanLimitStr = GetOrDefaultEnv("SPAN_LIMIT", "5000") + func GetOrDefaultEnv(key string, fallback string) string { v := os.Getenv(key) if len(v) == 0 { diff --git a/ee/query-service/model/plans.go b/ee/query-service/model/plans.go index e68217730a..c42712f693 100644 --- a/ee/query-service/model/plans.go +++ b/ee/query-service/model/plans.go @@ -17,11 +17,15 @@ var BasicPlan = basemodel.FeatureSet{ } var ProPlan = basemodel.FeatureSet{ - Pro: true, - SSO: true, + Pro: true, + SSO: true, + basemodel.SmartTraceDetail: true, + basemodel.CustomMetricsFunction: true, } var EnterprisePlan = basemodel.FeatureSet{ - Enterprise: true, - SSO: true, + Enterprise: true, + SSO: true, + basemodel.SmartTraceDetail: true, + basemodel.CustomMetricsFunction: true, } diff --git a/ee/query-service/model/trace.go b/ee/query-service/model/trace.go new file mode 100644 index 0000000000..708d6d1c5c --- /dev/null +++ b/ee/query-service/model/trace.go @@ -0,0 +1,22 @@ +package model + +type SpanForTraceDetails struct { + TimeUnixNano uint64 `json:"timestamp"` + SpanID string `json:"spanID"` + TraceID string `json:"traceID"` + ParentID string `json:"parentID"` + ParentSpan *SpanForTraceDetails `json:"parentSpan"` + ServiceName string `json:"serviceName"` + Name string `json:"name"` + Kind int32 `json:"kind"` + DurationNano int64 `json:"durationNano"` + TagMap map[string]string `json:"tagMap"` + Events []string `json:"event"` + HasError bool `json:"hasError"` + Children []*SpanForTraceDetails `json:"children"` +} + +type GetSpansSubQueryDBResponse struct { + SpanID string `ch:"spanID"` + TraceID string `ch:"traceID"` +} diff --git a/frontend/src/api/trace/getTraceItem.ts b/frontend/src/api/trace/getTraceItem.ts index bf93269669..054c809b33 100644 --- a/frontend/src/api/trace/getTraceItem.ts +++ b/frontend/src/api/trace/getTraceItem.ts @@ -1,15 +1,20 @@ import axios from 'api'; import { ErrorResponseHandler } from 'api/ErrorResponseHandler'; import { AxiosError } from 'axios'; +import { formUrlParams } from 'container/TraceDetail/utils'; import { ErrorResponse, SuccessResponse } from 'types/api'; -import { PayloadProps, Props } from 'types/api/trace/getTraceItem'; +import { GetTraceItemProps, PayloadProps } from 'types/api/trace/getTraceItem'; const getTraceItem = async ( - props: Props, + props: GetTraceItemProps, ): Promise | ErrorResponse> => { try { const response = await axios.request({ - url: `/traces/${props.id}`, + url: `/traces/${props.id}${formUrlParams({ + spanId: props.spanId, + levelUp: props.levelUp, + levelDown: props.levelDown, + })}`, method: 'get', }); diff --git a/frontend/src/container/Trace/TraceTable/index.tsx b/frontend/src/container/Trace/TraceTable/index.tsx index aa9fc1cf6b..b7d202bdcf 100644 --- a/frontend/src/container/Trace/TraceTable/index.tsx +++ b/frontend/src/container/Trace/TraceTable/index.tsx @@ -5,6 +5,7 @@ import { getSpanOrder, getSpanOrderParam, } from 'container/Trace/TraceTable/util'; +import { formUrlParams } from 'container/TraceDetail/utils'; import dayjs from 'dayjs'; import duration from 'dayjs/plugin/duration'; import history from 'lib/history'; @@ -48,7 +49,11 @@ function TraceTable(): JSX.Element { type TableType = FlatArray; const getLink = (record: TableType): string => { - return `${ROUTES.TRACE}/${record.traceID}?spanId=${record.spanID}`; + return `${ROUTES.TRACE}/${record.traceID}${formUrlParams({ + spanId: record.spanID, + levelUp: 0, + levelDown: 0, + })}`; }; const getValue = (value: string): JSX.Element => { diff --git a/frontend/src/container/TraceDetail/index.tsx b/frontend/src/container/TraceDetail/index.tsx index 816f59ec2f..debe73d3a1 100644 --- a/frontend/src/container/TraceDetail/index.tsx +++ b/frontend/src/container/TraceDetail/index.tsx @@ -29,6 +29,7 @@ import SelectedSpanDetails from './SelectedSpanDetails'; import * as styles from './styles'; import { FlameGraphMissingSpansContainer, GanttChartWrapper } from './styles'; import { + formUrlParams, getSortedData, getTreeLevelsCount, IIntervalUnit, @@ -50,7 +51,13 @@ function TraceDetail({ response }: TraceDetailProps): JSX.Element { // const [searchSpanString, setSearchSpanString] = useState(''); const [activeHoverId, setActiveHoverId] = useState(''); const [activeSelectedId, setActiveSelectedId] = useState(spanId || ''); - + const { levelDown, levelUp } = useMemo( + () => ({ + levelDown: urlQuery.get('levelDown'), + levelUp: urlQuery.get('levelUp'), + }), + [urlQuery], + ); const [treesData, setTreesData] = useState( spanToTreeUtil(response[0].events), ); @@ -77,10 +84,14 @@ function TraceDetail({ response }: TraceDetailProps): JSX.Element { if (activeSelectedId) { history.replace({ pathname: history.location.pathname, - search: `?spanId=${activeSelectedId}`, + search: `${formUrlParams({ + spanId: activeSelectedId, + levelUp, + levelDown, + })}`, }); } - }, [activeSelectedId]); + }, [activeSelectedId, levelDown, levelUp]); const getSelectedNode = useMemo(() => { return getNodeById(activeSelectedId, treesData); diff --git a/frontend/src/container/TraceDetail/utils.ts b/frontend/src/container/TraceDetail/utils.ts index 2541cf9bf0..898ea7e510 100644 --- a/frontend/src/container/TraceDetail/utils.ts +++ b/frontend/src/container/TraceDetail/utils.ts @@ -98,3 +98,28 @@ export const getTreeLevelsCount = (tree: ITraceTree): number => { return levels; }; + +export const formUrlParams = (params: Record): string => { + let urlParams = ''; + Object.entries(params).forEach(([key, value], index) => { + let encodedValue: string; + try { + encodedValue = decodeURIComponent(value); + encodedValue = encodeURIComponent(encodedValue); + } catch (error) { + encodedValue = ''; + } + if (index === 0) { + if (encodedValue) { + urlParams = `?${key}=${encodedValue}`; + } else { + urlParams = `?${key}=`; + } + } else if (encodedValue) { + urlParams = `${urlParams}&${key}=${encodedValue}`; + } else { + urlParams = `${urlParams}&${key}=`; + } + }); + return urlParams; +}; diff --git a/frontend/src/pages/TraceDetail/index.tsx b/frontend/src/pages/TraceDetail/index.tsx index d8f6634888..16316aa729 100644 --- a/frontend/src/pages/TraceDetail/index.tsx +++ b/frontend/src/pages/TraceDetail/index.tsx @@ -2,16 +2,26 @@ import { Typography } from 'antd'; import getTraceItem from 'api/trace/getTraceItem'; import Spinner from 'components/Spinner'; import TraceDetailContainer from 'container/TraceDetail'; -import React from 'react'; +import useUrlQuery from 'hooks/useUrlQuery'; +import React, { useMemo } from 'react'; import { useQuery } from 'react-query'; import { useParams } from 'react-router-dom'; import { Props as TraceDetailProps } from 'types/api/trace/getTraceItem'; function TraceDetail(): JSX.Element { const { id } = useParams(); + const urlQuery = useUrlQuery(); + const { spanId, levelUp, levelDown } = useMemo( + () => ({ + spanId: urlQuery.get('spanId'), + levelUp: urlQuery.get('levelUp'), + levelDown: urlQuery.get('levelDown'), + }), + [urlQuery], + ); const { data: traceDetailResponse, error, isLoading, isError } = useQuery( `getTraceItem/${id}`, - () => getTraceItem({ id }), + () => getTraceItem({ id, spanId, levelUp, levelDown }), { cacheTime: 3000, }, diff --git a/frontend/src/types/api/trace/getTraceItem.ts b/frontend/src/types/api/trace/getTraceItem.ts index 4b12d15b2f..9d8887bc56 100644 --- a/frontend/src/types/api/trace/getTraceItem.ts +++ b/frontend/src/types/api/trace/getTraceItem.ts @@ -2,6 +2,13 @@ export interface Props { id: string; } +export interface GetTraceItemProps { + id: string; + spanId: string | null; + levelUp: string | null; + levelDown: string | null; +} + export interface PayloadProps { [id: string]: { events: Span[]; diff --git a/go.mod b/go.mod index 8666887ad6..b25925512c 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/gosimple/slug v1.10.0 github.com/jmoiron/sqlx v1.3.4 github.com/json-iterator/go v1.1.12 + github.com/mailru/easyjson v0.7.7 github.com/mattn/go-sqlite3 v1.14.8 github.com/minio/minio-go/v6 v6.0.57 github.com/mitchellh/mapstructure v1.5.0 @@ -34,7 +35,9 @@ require ( require ( github.com/beevik/etree v1.1.0 // indirect github.com/form3tech-oss/jwt-go v3.2.2+incompatible // indirect + github.com/google/go-cmp v0.5.8 // indirect github.com/jonboulle/clockwork v0.2.2 // indirect + github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/cpuid v1.2.3 // indirect github.com/mattermost/xml-roundtrip-validator v0.1.0 // indirect github.com/minio/md5-simd v1.1.0 // indirect @@ -68,7 +71,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible github.com/golang/glog v0.0.0-20210429001901-424d2337a529 // indirect - github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.0.0 // indirect @@ -126,7 +129,7 @@ require ( go.uber.org/atomic v1.6.0 // indirect go.uber.org/multierr v1.5.0 // indirect golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 - golang.org/x/net v0.0.0-20211013171255-e13a2654a71e + golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20211110154304-99a53858aa08 // indirect diff --git a/go.sum b/go.sum index ba38dc1069..0782dacd7d 100644 --- a/go.sum +++ b/go.sum @@ -151,8 +151,9 @@ github.com/golang/glog v0.0.0-20210429001901-424d2337a529 h1:2voWjNECnrZRbfwXxHB github.com/golang/glog v0.0.0-20210429001901-424d2337a529/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= @@ -200,8 +201,9 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -296,6 +298,8 @@ github.com/jmoiron/sqlx v1.3.4 h1:wv+0IJZfL5z0uZoUjlpKgHkgaFSYD+r9CfrXjEXsO7w= github.com/jmoiron/sqlx v1.3.4/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ= github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -323,6 +327,8 @@ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E= github.com/lib/pq v1.10.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattermost/xml-roundtrip-validator v0.1.0 h1:RXbVD2UAl7A7nOTR4u7E3ILa4IbtvKBHw64LDsmu9hU= github.com/mattermost/xml-roundtrip-validator v0.1.0/go.mod h1:qccnGMcpgwcNaBnxqpJpWWUiPNr5H3O8eDgGV9gT5To= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= @@ -566,8 +572,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20211013171255-e13a2654a71e h1:Xj+JO91noE97IN6F/7WZxzC5QE6yENAQPrwIYhW3bsA= -golang.org/x/net v0.0.0-20211013171255-e13a2654a71e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f h1:OfiFi4JbukWwe3lzw+xunroH1mnC1e2Gy5cxNJApiSY= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 0aa037490e..0ccb5cb73c 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -21,6 +21,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/google/uuid" + "github.com/mailru/easyjson" "github.com/oklog/oklog/pkg/group" "github.com/pkg/errors" "github.com/prometheus/common/promlog" @@ -42,6 +43,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/logs" "go.signoz.io/signoz/pkg/query-service/constants" am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" + "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/utils" "go.uber.org/zap" @@ -79,13 +81,13 @@ var ( type ClickHouseReader struct { db clickhouse.Conn localDB *sqlx.DB - traceDB string + TraceDB string operationsTable string durationTable string indexTable string errorTable string usageExplorerTable string - spansTable string + SpansTable string dependencyGraphTable string topLevelOperationsTable string logsDB string @@ -99,12 +101,13 @@ type ClickHouseReader struct { promConfigFile string promConfig *config.Config alertManager am.Manager + featureFlags interfaces.FeatureLookup liveTailRefreshSeconds int } // NewTraceReader returns a TraceReader for the database -func NewReader(localDB *sqlx.DB, configFile string) *ClickHouseReader { +func NewReader(localDB *sqlx.DB, configFile string, featureFlag interfaces.FeatureLookup) *ClickHouseReader { datasource := os.Getenv("ClickHouseUrl") options := NewOptions(datasource, primaryNamespace, archiveNamespace) @@ -125,14 +128,14 @@ func NewReader(localDB *sqlx.DB, configFile string) *ClickHouseReader { return &ClickHouseReader{ db: db, localDB: localDB, - traceDB: options.primary.TraceDB, + TraceDB: options.primary.TraceDB, alertManager: alertManager, operationsTable: options.primary.OperationsTable, indexTable: options.primary.IndexTable, errorTable: options.primary.ErrorTable, usageExplorerTable: options.primary.UsageExplorerTable, durationTable: options.primary.DurationTable, - spansTable: options.primary.SpansTable, + SpansTable: options.primary.SpansTable, dependencyGraphTable: options.primary.DependencyGraphTable, topLevelOperationsTable: options.primary.TopLevelOperationsTable, logsDB: options.primary.LogsDB, @@ -141,6 +144,7 @@ func NewReader(localDB *sqlx.DB, configFile string) *ClickHouseReader { logsResourceKeys: options.primary.LogsResourceKeysTable, liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds, promConfigFile: configFile, + featureFlags: featureFlag, } } @@ -665,7 +669,7 @@ func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) { services := []string{} - query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.traceDB, r.indexTable) + query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.indexTable) rows, err := r.db.Query(ctx, query) @@ -690,7 +694,7 @@ func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, erro func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context) (*map[string][]string, *model.ApiError) { operations := map[string][]string{} - query := fmt.Sprintf(`SELECT DISTINCT name, serviceName FROM %s.%s`, r.traceDB, r.topLevelOperationsTable) + query := fmt.Sprintf(`SELECT DISTINCT name, serviceName FROM %s.%s`, r.TraceDB, r.topLevelOperationsTable) rows, err := r.db.Query(ctx, query) @@ -745,14 +749,14 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G count(*) as numCalls FROM %s.%s WHERE serviceName = @serviceName AND name In [@names] AND timestamp>= @start AND timestamp<= @end`, - r.traceDB, r.indexTable, + r.TraceDB, r.indexTable, ) errorQuery := fmt.Sprintf( `SELECT count(*) as numErrors FROM %s.%s WHERE serviceName = @serviceName AND name In [@names] AND timestamp>= @start AND timestamp<= @end AND statusCode=2`, - r.traceDB, r.indexTable, + r.TraceDB, r.indexTable, ) args := []interface{}{} @@ -835,7 +839,7 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams * count(*) as numCalls FROM %s.%s WHERE serviceName = @serviceName AND name In [@names] AND timestamp>= @start AND timestamp<= @end`, - r.traceDB, r.indexTable, + r.TraceDB, r.indexTable, ) args := []interface{}{} args = append(args, namedArgs...) @@ -861,7 +865,7 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams * count(*) as numErrors FROM %s.%s WHERE serviceName = @serviceName AND name In [@names] AND timestamp>= @start AND timestamp<= @end AND statusCode=2`, - r.traceDB, r.indexTable, + r.TraceDB, r.indexTable, ) args = []interface{}{} args = append(args, namedArgs...) @@ -1001,7 +1005,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode case constants.TraceID: continue case constants.ServiceName: - finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.traceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery += query finalQuery += " GROUP BY serviceName" var dBResponse []model.DBResponseServiceName @@ -1018,7 +1022,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.HttpCode: - finalQuery := fmt.Sprintf("SELECT httpCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.traceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT httpCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery += query finalQuery += " GROUP BY httpCode" var dBResponse []model.DBResponseHttpCode @@ -1035,7 +1039,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.HttpRoute: - finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.traceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery += query finalQuery += " GROUP BY httpRoute" var dBResponse []model.DBResponseHttpRoute @@ -1052,7 +1056,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.HttpUrl: - finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.traceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery += query finalQuery += " GROUP BY httpUrl" var dBResponse []model.DBResponseHttpUrl @@ -1069,7 +1073,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.HttpMethod: - finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.traceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery += query finalQuery += " GROUP BY httpMethod" var dBResponse []model.DBResponseHttpMethod @@ -1086,7 +1090,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.HttpHost: - finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.traceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery += query finalQuery += " GROUP BY httpHost" var dBResponse []model.DBResponseHttpHost @@ -1103,7 +1107,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.OperationRequest: - finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.traceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery += query finalQuery += " GROUP BY name" var dBResponse []model.DBResponseOperation @@ -1120,7 +1124,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.Component: - finalQuery := fmt.Sprintf("SELECT component, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.traceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT component, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery += query finalQuery += " GROUP BY component" var dBResponse []model.DBResponseComponent @@ -1137,7 +1141,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.Status: - finalQuery := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = true", r.traceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = true", r.TraceDB, r.indexTable) finalQuery += query var dBResponse []model.DBResponseTotal err := r.db.Select(ctx, &dBResponse, finalQuery, args...) @@ -1148,7 +1152,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query: %s", err)} } - finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = false", r.traceDB, r.indexTable) + finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = false", r.TraceDB, r.indexTable) finalQuery2 += query var dBResponse2 []model.DBResponseTotal err = r.db.Select(ctx, &dBResponse2, finalQuery2, args...) @@ -1168,7 +1172,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode traceFilterReponse.Status = map[string]uint64{"ok": 0, "error": 0} } case constants.Duration: - finalQuery := fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.traceDB, r.durationTable) + finalQuery := fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.durationTable) finalQuery += query finalQuery += " ORDER BY durationNano LIMIT 1" var dBResponse []model.DBResponseTotal @@ -1179,7 +1183,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode zap.S().Debug("Error in processing sql query: ", err) return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query: %s", err)} } - finalQuery = fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.traceDB, r.durationTable) + finalQuery = fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.durationTable) finalQuery += query finalQuery += " ORDER BY durationNano DESC LIMIT 1" var dBResponse2 []model.DBResponseTotal @@ -1197,7 +1201,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode traceFilterReponse.Duration["maxDuration"] = dBResponse2[0].NumTotal } case constants.RPCMethod: - finalQuery := fmt.Sprintf("SELECT rpcMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.traceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT rpcMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery += query finalQuery += " GROUP BY rpcMethod" var dBResponse []model.DBResponseRPCMethod @@ -1215,7 +1219,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } case constants.ResponseStatusCode: - finalQuery := fmt.Sprintf("SELECT responseStatusCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.traceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT responseStatusCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) finalQuery += query finalQuery += " GROUP BY responseStatusCode" var dBResponse []model.DBResponseStatusCodeMethod @@ -1263,7 +1267,7 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) { - queryTable := fmt.Sprintf("%s.%s", r.traceDB, r.indexTable) + queryTable := fmt.Sprintf("%s.%s", r.TraceDB, r.indexTable) excludeMap := make(map[string]struct{}) for _, e := range queryParams.Exclude { @@ -1333,7 +1337,7 @@ func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *mo if len(queryParams.OrderParam) != 0 { if queryParams.OrderParam == constants.Duration { - queryTable = fmt.Sprintf("%s.%s", r.traceDB, r.durationTable) + queryTable = fmt.Sprintf("%s.%s", r.TraceDB, r.durationTable) if queryParams.Order == constants.Descending { query = query + " ORDER BY durationNano DESC" } @@ -1515,7 +1519,7 @@ func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model tagFilters := []model.TagFilters{} - finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.traceDB, r.indexTable) + finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) // Alternative query: SELECT groupUniqArrayArray(mapKeys(tagMap)) as tagKeys FROM signoz_index_v2 finalQuery += query err := r.db.Select(ctx, &tagFilters, finalQuery, args...) @@ -1608,7 +1612,7 @@ func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model. tagValues := []model.TagValues{} - finalQuery := fmt.Sprintf(`SELECT tagMap[@key] as tagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.traceDB, r.indexTable) + finalQuery := fmt.Sprintf(`SELECT tagMap[@key] as tagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) finalQuery += query finalQuery += " GROUP BY tagMap[@key]" args = append(args, clickhouse.Named("key", queryParams.TagKey)) @@ -1649,7 +1653,7 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo name FROM %s.%s WHERE serviceName = @serviceName AND timestamp>= @start AND timestamp<= @end`, - r.traceDB, r.indexTable, + r.TraceDB, r.indexTable, ) args := []interface{}{} args = append(args, namedArgs...) @@ -1685,9 +1689,9 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU var query string if len(queryParams.ServiceName) != 0 { namedArgs = append(namedArgs, clickhouse.Named("serviceName", queryParams.ServiceName)) - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL @interval HOUR) as time, sum(count) as count FROM %s.%s WHERE service_name=@serviceName AND timestamp>=@start AND timestamp<=@end GROUP BY time ORDER BY time ASC", r.traceDB, r.usageExplorerTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL @interval HOUR) as time, sum(count) as count FROM %s.%s WHERE service_name=@serviceName AND timestamp>=@start AND timestamp<=@end GROUP BY time ORDER BY time ASC", r.TraceDB, r.usageExplorerTable) } else { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL @interval HOUR) as time, sum(count) as count FROM %s.%s WHERE timestamp>=@start AND timestamp<=@end GROUP BY time ORDER BY time ASC", r.traceDB, r.usageExplorerTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL @interval HOUR) as time, sum(count) as count FROM %s.%s WHERE timestamp>=@start AND timestamp<=@end GROUP BY time ORDER BY time ASC", r.TraceDB, r.usageExplorerTable) } err := r.db.Select(ctx, &usageItems, query, namedArgs...) @@ -1710,13 +1714,15 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU return &usageItems, nil } -func (r *ClickHouseReader) SearchTraces(ctx context.Context, traceId string) (*[]model.SearchSpansResult, error) { +func (r *ClickHouseReader) SearchTraces(ctx context.Context, traceId string, spanId string, levelUp int, levelDown int, spanLimit int, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) { - var searchScanReponses []model.SearchSpanDBReponseItem + var searchScanResponses []model.SearchSpanDBResponseItem - query := fmt.Sprintf("SELECT timestamp, traceID, model FROM %s.%s WHERE traceID=$1", r.traceDB, r.spansTable) + query := fmt.Sprintf("SELECT timestamp, traceID, model FROM %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable) - err := r.db.Select(ctx, &searchScanReponses, query, traceId) + start := time.Now() + + err := r.db.Select(ctx, &searchScanResponses, query, traceId) zap.S().Info(query) @@ -1724,30 +1730,43 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, traceId string) (*[ zap.S().Debug("Error in processing sql query: ", err) return nil, fmt.Errorf("Error in processing sql query") } - + end := time.Now() + zap.S().Debug("getTraceSQLQuery took: ", end.Sub(start)) searchSpansResult := []model.SearchSpansResult{{ Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError"}, - Events: make([][]interface{}, len(searchScanReponses)), + Events: make([][]interface{}, len(searchScanResponses)), }, } - for i, item := range searchScanReponses { - var jsonItem model.SearchSpanReponseItem - json.Unmarshal([]byte(item.Model), &jsonItem) + searchSpanResponses := []model.SearchSpanResponseItem{} + start = time.Now() + for _, item := range searchScanResponses { + var jsonItem model.SearchSpanResponseItem + easyjson.Unmarshal([]byte(item.Model), &jsonItem) jsonItem.TimeUnixNano = uint64(item.Timestamp.UnixNano() / 1000000) - spanEvents := jsonItem.GetValues() - searchSpansResult[0].Events[i] = spanEvents + searchSpanResponses = append(searchSpanResponses, jsonItem) + } + end = time.Now() + zap.S().Debug("getTraceSQLQuery unmarshal took: ", end.Sub(start)) + + err = r.featureFlags.CheckFeature(model.SmartTraceDetail) + smartAlgoEnabled := err == nil + if len(searchScanResponses) > spanLimit && spanId != "" && smartAlgoEnabled { + start = time.Now() + searchSpansResult, err = smartTraceAlgorithm(searchSpanResponses, spanId, levelUp, levelDown, spanLimit) + if err != nil { + return nil, err + } + end = time.Now() + zap.S().Debug("smartTraceAlgo took: ", end.Sub(start)) + } else { + for i, item := range searchSpanResponses { + spanEvents := item.GetValues() + searchSpansResult[0].Events[i] = spanEvents + } } return &searchSpansResult, nil - -} -func interfaceArrayToStringArray(array []interface{}) []string { - var strArray []string - for _, item := range array { - strArray = append(strArray, item.(string)) - } - return strArray } func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) { @@ -1781,7 +1800,7 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams * GROUP BY src, dest`, - r.traceDB, r.dependencyGraphTable, + r.TraceDB, r.dependencyGraphTable, ) zap.S().Debug(query, args) @@ -1841,41 +1860,41 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query if queryParams.GroupBy != "" { switch queryParams.GroupBy { case constants.ServiceName: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, serviceName as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, serviceName as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) case constants.HttpCode: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpCode as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpCode as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) case constants.HttpMethod: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpMethod as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpMethod as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) case constants.HttpUrl: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpUrl as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpUrl as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) case constants.HttpRoute: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpRoute as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpRoute as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) case constants.HttpHost: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpHost as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpHost as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) case constants.DBName: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbName as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbName as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) case constants.DBOperation: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbOperation as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbOperation as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) case constants.OperationRequest: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, name as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, name as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) case constants.MsgSystem: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, msgSystem as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, msgSystem as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) case constants.MsgOperation: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, msgOperation as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, msgOperation as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) case constants.DBSystem: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbSystem as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbSystem as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) case constants.Component: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, component as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, component as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) case constants.RPCMethod: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, rpcMethod as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, rpcMethod as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) case constants.ResponseStatusCode: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, responseStatusCode as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, responseStatusCode as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) default: return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("groupBy type: %s not supported", queryParams.GroupBy)} } } else { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.traceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) } if len(queryParams.TraceID) > 0 { @@ -2458,7 +2477,7 @@ func (r *ClickHouseReader) ListErrors(ctx context.Context, queryParams *model.Li var getErrorResponses []model.Error - query := fmt.Sprintf("SELECT any(exceptionType) as exceptionType, any(exceptionMessage) as exceptionMessage, count() AS exceptionCount, min(timestamp) as firstSeen, max(timestamp) as lastSeen, any(serviceName) as serviceName, groupID FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU GROUP BY groupID", r.traceDB, r.errorTable) + query := fmt.Sprintf("SELECT any(exceptionType) as exceptionType, any(exceptionMessage) as exceptionMessage, count() AS exceptionCount, min(timestamp) as firstSeen, max(timestamp) as lastSeen, any(serviceName) as serviceName, groupID FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU GROUP BY groupID", r.TraceDB, r.errorTable) args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))} if len(queryParams.OrderParam) != 0 { if queryParams.Order == constants.Descending { @@ -2492,7 +2511,7 @@ func (r *ClickHouseReader) CountErrors(ctx context.Context, queryParams *model.C var errorCount uint64 - query := fmt.Sprintf("SELECT count(distinct(groupID)) FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.traceDB, r.errorTable) + query := fmt.Sprintf("SELECT count(distinct(groupID)) FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.errorTable) args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))} err := r.db.QueryRow(ctx, query, args...).Scan(&errorCount) @@ -2514,7 +2533,7 @@ func (r *ClickHouseReader) GetErrorFromErrorID(ctx context.Context, queryParams } var getErrorWithSpanReponse []model.ErrorWithSpan - query := fmt.Sprintf("SELECT * FROM %s.%s WHERE timestamp = @timestamp AND groupID = @groupID AND errorID = @errorID LIMIT 1", r.traceDB, r.errorTable) + query := fmt.Sprintf("SELECT * FROM %s.%s WHERE timestamp = @timestamp AND groupID = @groupID AND errorID = @errorID LIMIT 1", r.TraceDB, r.errorTable) args := []interface{}{clickhouse.Named("errorID", queryParams.ErrorID), clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))} err := r.db.Select(ctx, &getErrorWithSpanReponse, query, args...) @@ -2537,7 +2556,7 @@ func (r *ClickHouseReader) GetErrorFromGroupID(ctx context.Context, queryParams var getErrorWithSpanReponse []model.ErrorWithSpan - query := fmt.Sprintf("SELECT * FROM %s.%s WHERE timestamp = @timestamp AND groupID = @groupID LIMIT 1", r.traceDB, r.errorTable) + query := fmt.Sprintf("SELECT * FROM %s.%s WHERE timestamp = @timestamp AND groupID = @groupID LIMIT 1", r.TraceDB, r.errorTable) args := []interface{}{clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))} err := r.db.Select(ctx, &getErrorWithSpanReponse, query, args...) @@ -2585,7 +2604,7 @@ func (r *ClickHouseReader) getNextErrorID(ctx context.Context, queryParams *mode var getNextErrorIDReponse []model.NextPrevErrorIDsDBResponse - query := fmt.Sprintf("SELECT errorID as nextErrorID, timestamp as nextTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp >= @timestamp AND errorID != @errorID ORDER BY timestamp ASC LIMIT 2", r.traceDB, r.errorTable) + query := fmt.Sprintf("SELECT errorID as nextErrorID, timestamp as nextTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp >= @timestamp AND errorID != @errorID ORDER BY timestamp ASC LIMIT 2", r.TraceDB, r.errorTable) args := []interface{}{clickhouse.Named("errorID", queryParams.ErrorID), clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))} err := r.db.Select(ctx, &getNextErrorIDReponse, query, args...) @@ -2606,7 +2625,7 @@ func (r *ClickHouseReader) getNextErrorID(ctx context.Context, queryParams *mode if getNextErrorIDReponse[0].Timestamp.UnixNano() == getNextErrorIDReponse[1].Timestamp.UnixNano() { var getNextErrorIDReponse []model.NextPrevErrorIDsDBResponse - query := fmt.Sprintf("SELECT errorID as nextErrorID, timestamp as nextTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp = @timestamp AND errorID > @errorID ORDER BY errorID ASC LIMIT 1", r.traceDB, r.errorTable) + query := fmt.Sprintf("SELECT errorID as nextErrorID, timestamp as nextTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp = @timestamp AND errorID > @errorID ORDER BY errorID ASC LIMIT 1", r.TraceDB, r.errorTable) args := []interface{}{clickhouse.Named("errorID", queryParams.ErrorID), clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))} err := r.db.Select(ctx, &getNextErrorIDReponse, query, args...) @@ -2620,7 +2639,7 @@ func (r *ClickHouseReader) getNextErrorID(ctx context.Context, queryParams *mode if len(getNextErrorIDReponse) == 0 { var getNextErrorIDReponse []model.NextPrevErrorIDsDBResponse - query := fmt.Sprintf("SELECT errorID as nextErrorID, timestamp as nextTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp > @timestamp ORDER BY timestamp ASC LIMIT 1", r.traceDB, r.errorTable) + query := fmt.Sprintf("SELECT errorID as nextErrorID, timestamp as nextTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp > @timestamp ORDER BY timestamp ASC LIMIT 1", r.TraceDB, r.errorTable) args := []interface{}{clickhouse.Named("errorID", queryParams.ErrorID), clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))} err := r.db.Select(ctx, &getNextErrorIDReponse, query, args...) @@ -2654,7 +2673,7 @@ func (r *ClickHouseReader) getPrevErrorID(ctx context.Context, queryParams *mode var getPrevErrorIDReponse []model.NextPrevErrorIDsDBResponse - query := fmt.Sprintf("SELECT errorID as prevErrorID, timestamp as prevTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp <= @timestamp AND errorID != @errorID ORDER BY timestamp DESC LIMIT 2", r.traceDB, r.errorTable) + query := fmt.Sprintf("SELECT errorID as prevErrorID, timestamp as prevTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp <= @timestamp AND errorID != @errorID ORDER BY timestamp DESC LIMIT 2", r.TraceDB, r.errorTable) args := []interface{}{clickhouse.Named("errorID", queryParams.ErrorID), clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))} err := r.db.Select(ctx, &getPrevErrorIDReponse, query, args...) @@ -2675,7 +2694,7 @@ func (r *ClickHouseReader) getPrevErrorID(ctx context.Context, queryParams *mode if getPrevErrorIDReponse[0].Timestamp.UnixNano() == getPrevErrorIDReponse[1].Timestamp.UnixNano() { var getPrevErrorIDReponse []model.NextPrevErrorIDsDBResponse - query := fmt.Sprintf("SELECT errorID as prevErrorID, timestamp as prevTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp = @timestamp AND errorID < @errorID ORDER BY errorID DESC LIMIT 1", r.traceDB, r.errorTable) + query := fmt.Sprintf("SELECT errorID as prevErrorID, timestamp as prevTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp = @timestamp AND errorID < @errorID ORDER BY errorID DESC LIMIT 1", r.TraceDB, r.errorTable) args := []interface{}{clickhouse.Named("errorID", queryParams.ErrorID), clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))} err := r.db.Select(ctx, &getPrevErrorIDReponse, query, args...) @@ -2689,7 +2708,7 @@ func (r *ClickHouseReader) getPrevErrorID(ctx context.Context, queryParams *mode if len(getPrevErrorIDReponse) == 0 { var getPrevErrorIDReponse []model.NextPrevErrorIDsDBResponse - query := fmt.Sprintf("SELECT errorID as prevErrorID, timestamp as prevTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp < @timestamp ORDER BY timestamp DESC LIMIT 1", r.traceDB, r.errorTable) + query := fmt.Sprintf("SELECT errorID as prevErrorID, timestamp as prevTimestamp FROM %s.%s WHERE groupID = @groupID AND timestamp < @timestamp ORDER BY timestamp DESC LIMIT 1", r.TraceDB, r.errorTable) args := []interface{}{clickhouse.Named("errorID", queryParams.ErrorID), clickhouse.Named("groupID", queryParams.GroupID), clickhouse.Named("timestamp", strconv.FormatInt(queryParams.Timestamp.UnixNano(), 10))} err := r.db.Select(ctx, &getPrevErrorIDReponse, query, args...) @@ -2830,6 +2849,11 @@ func (r *ClickHouseReader) GetMetricAutocompleteMetricNames(ctx context.Context, } +func (r *ClickHouseReader) GetMetricResultEE(ctx context.Context, query string) ([]*model.Series, string, error) { + zap.S().Error("GetMetricResultEE is not implemented for opensource version") + return nil, "", fmt.Errorf("GetMetricResultEE is not implemented for opensource version") +} + // GetMetricResult runs the query and returns list of time series func (r *ClickHouseReader) GetMetricResult(ctx context.Context, query string) ([]*model.Series, error) { @@ -3001,7 +3025,7 @@ func (r *ClickHouseReader) GetLogsInfoInLastHeartBeatInterval(ctx context.Contex func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Context) (*model.TagsInfo, error) { - queryStr := fmt.Sprintf("select tagMap['service.name'] as serviceName, tagMap['deployment.environment'] as env, tagMap['telemetry.sdk.language'] as language from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", r.traceDB, r.indexTable, 1) + queryStr := fmt.Sprintf("select tagMap['service.name'] as serviceName, tagMap['deployment.environment'] as env, tagMap['telemetry.sdk.language'] as language from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", r.TraceDB, r.indexTable, 1) tagTelemetryDataList := []model.TagTelemetryData{} err := r.db.Select(ctx, &tagTelemetryDataList, queryStr) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index fcf25b93f0..60dbbf6987 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -213,7 +213,7 @@ func writeHttpResponse(w http.ResponseWriter, data interface{}) { func (aH *APIHandler) RegisterMetricsRoutes(router *mux.Router) { subRouter := router.PathPrefix("/api/v2/metrics").Subrouter() - subRouter.HandleFunc("/query_range", ViewAccess(aH.queryRangeMetricsV2)).Methods(http.MethodPost) + subRouter.HandleFunc("/query_range", ViewAccess(aH.QueryRangeMetricsV2)).Methods(http.MethodPost) subRouter.HandleFunc("/autocomplete/list", ViewAccess(aH.metricAutocompleteMetricName)).Methods(http.MethodGet) subRouter.HandleFunc("/autocomplete/tagKey", ViewAccess(aH.metricAutocompleteTagKey)).Methods(http.MethodGet) subRouter.HandleFunc("/autocomplete/tagValue", ViewAccess(aH.metricAutocompleteTagValue)).Methods(http.MethodGet) @@ -354,7 +354,7 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) { router.HandleFunc("/api/v1/service/overview", ViewAccess(aH.getServiceOverview)).Methods(http.MethodPost) router.HandleFunc("/api/v1/service/top_operations", ViewAccess(aH.getTopOperations)).Methods(http.MethodPost) router.HandleFunc("/api/v1/service/top_level_operations", ViewAccess(aH.getServicesTopLevelOps)).Methods(http.MethodPost) - router.HandleFunc("/api/v1/traces/{traceId}", ViewAccess(aH.searchTraces)).Methods(http.MethodGet) + router.HandleFunc("/api/v1/traces/{traceId}", ViewAccess(aH.SearchTraces)).Methods(http.MethodGet) router.HandleFunc("/api/v1/usage", ViewAccess(aH.getUsage)).Methods(http.MethodGet) router.HandleFunc("/api/v1/dependency_graph", ViewAccess(aH.dependencyGraph)).Methods(http.MethodPost) router.HandleFunc("/api/v1/settings/ttl", AdminAccess(aH.setTTL)).Methods(http.MethodPost) @@ -486,7 +486,7 @@ func (aH *APIHandler) metricAutocompleteTagValue(w http.ResponseWriter, r *http. aH.Respond(w, tagValueList) } -func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request) { metricsQueryRangeParams, apiErrorObj := parser.ParseMetricQueryRangeParams(r) if apiErrorObj != nil { @@ -1360,12 +1360,15 @@ func (aH *APIHandler) getServicesList(w http.ResponseWriter, r *http.Request) { } -func (aH *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) SearchTraces(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - traceId := vars["traceId"] + traceId, spanId, levelUpInt, levelDownInt, err := ParseSearchTracesParams(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params") + return + } - result, err := aH.reader.SearchTraces(r.Context(), traceId) + result, err := aH.reader.SearchTraces(r.Context(), traceId, spanId, levelUpInt, levelDownInt, 0, nil) if aH.HandleError(w, err, http.StatusBadRequest) { return } diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index bfd4042d22..fc3154c9fc 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -225,6 +225,30 @@ func parseGetServicesRequest(r *http.Request) (*model.GetServicesParams, error) return postData, nil } +func ParseSearchTracesParams(r *http.Request) (string, string, int, int, error) { + vars := mux.Vars(r) + traceId := vars["traceId"] + spanId := r.URL.Query().Get("spanId") + levelUp := r.URL.Query().Get("levelUp") + levelDown := r.URL.Query().Get("levelDown") + if levelUp == "" || levelUp == "null" { + levelUp = "0" + } + if levelDown == "" || levelDown == "null" { + levelDown = "0" + } + + levelUpInt, err := strconv.Atoi(levelUp) + if err != nil { + return "", "", 0, 0, err + } + levelDownInt, err := strconv.Atoi(levelDown) + if err != nil { + return "", "", 0, 0, err + } + return traceId, spanId, levelUpInt, levelDownInt, nil +} + func DoesExistInSlice(item string, list []string) bool { for _, element := range list { if item == element { diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index cbb8a807fa..b79b7d011f 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -88,7 +88,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { storage := os.Getenv("STORAGE") if storage == "clickhouse" { zap.S().Info("Using ClickHouse as datastore ...") - clickhouseReader := clickhouseReader.NewReader(localDB, serverOptions.PromConfigPath) + clickhouseReader := clickhouseReader.NewReader(localDB, serverOptions.PromConfigPath, fm) go clickhouseReader.Start(readerReady) reader = clickhouseReader } else { diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 4a83b16c80..3200b10c2f 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -46,7 +46,7 @@ type Reader interface { GetNextPrevErrorIDs(ctx context.Context, params *model.GetErrorParams) (*model.NextPrevErrorIDs, *model.ApiError) // Search Interfaces - SearchTraces(ctx context.Context, traceID string) (*[]model.SearchSpansResult, error) + SearchTraces(ctx context.Context, traceID string, spanId string, levelUp int, levelDown int, spanLimit int, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) // Setter Interfaces SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) @@ -55,6 +55,7 @@ type Reader interface { GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) GetMetricAutocompleteTagValue(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) GetMetricResult(ctx context.Context, query string) ([]*model.Series, error) + GetMetricResultEE(ctx context.Context, query string) ([]*model.Series, string, error) GetTotalSpans(ctx context.Context) (uint64, error) GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error) diff --git a/pkg/query-service/model/featureSet.go b/pkg/query-service/model/featureSet.go index 1b59450a44..bba153e861 100644 --- a/pkg/query-service/model/featureSet.go +++ b/pkg/query-service/model/featureSet.go @@ -3,6 +3,8 @@ package model type FeatureSet map[string]bool const Basic = "BASIC_PLAN" +const SmartTraceDetail = "SMART_TRACE_DETAIL" +const CustomMetricsFunction = "CUSTOM_METRICS_FUNCTION" var BasicPlan = FeatureSet{ Basic: true, diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 23654fb899..27b466b9ea 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -187,7 +187,7 @@ type GetFilterSpansResponse struct { TotalSpans uint64 `json:"totalSpans"` } -type SearchSpanDBReponseItem struct { +type SearchSpanDBResponseItem struct { Timestamp time.Time `ch:"timestamp"` TraceID string `ch:"traceID"` Model string `ch:"model"` @@ -200,18 +200,21 @@ type Event struct { IsError bool `json:"isError,omitempty"` } -type SearchSpanReponseItem struct { +//easyjson:json +type SearchSpanResponseItem struct { TimeUnixNano uint64 `json:"timestamp"` - SpanID string `json:"spanID"` - TraceID string `json:"traceID"` + DurationNano int64 `json:"durationNano"` + SpanID string `json:"spanId"` + RootSpanID string `json:"rootSpanId"` + TraceID string `json:"traceId"` + HasError bool `json:"hasError"` + Kind int32 `json:"kind"` ServiceName string `json:"serviceName"` Name string `json:"name"` - Kind int32 `json:"kind"` References []OtelSpanRef `json:"references,omitempty"` - DurationNano int64 `json:"durationNano"` TagMap map[string]string `json:"tagMap"` Events []string `json:"event"` - HasError bool `json:"hasError"` + RootName string `json:"rootName"` } type OtelSpanRef struct { @@ -220,14 +223,14 @@ type OtelSpanRef struct { RefType string `json:"refType,omitempty"` } -func (ref *OtelSpanRef) toString() string { +func (ref *OtelSpanRef) ToString() string { retString := fmt.Sprintf(`{TraceId=%s, SpanId=%s, RefType=%s}`, ref.TraceId, ref.SpanId, ref.RefType) return retString } -func (item *SearchSpanReponseItem) GetValues() []interface{} { +func (item *SearchSpanResponseItem) GetValues() []interface{} { references := []OtelSpanRef{} jsonbody, _ := json.Marshal(item.References) @@ -235,7 +238,7 @@ func (item *SearchSpanReponseItem) GetValues() []interface{} { referencesStringArray := []string{} for _, item := range references { - referencesStringArray = append(referencesStringArray, item.toString()) + referencesStringArray = append(referencesStringArray, item.ToString()) } if item.Events == nil { diff --git a/pkg/query-service/model/response_easyjson.go b/pkg/query-service/model/response_easyjson.go new file mode 100644 index 0000000000..8c00b2a80b --- /dev/null +++ b/pkg/query-service/model/response_easyjson.go @@ -0,0 +1,328 @@ +// Code generated by easyjson for marshaling/unmarshaling. DO NOT EDIT. + +package model + +import ( + json "encoding/json" + easyjson "github.com/mailru/easyjson" + jlexer "github.com/mailru/easyjson/jlexer" + jwriter "github.com/mailru/easyjson/jwriter" +) + +// suppress unused package warning +var ( + _ *json.RawMessage + _ *jlexer.Lexer + _ *jwriter.Writer + _ easyjson.Marshaler +) + +func easyjson6ff3ac1dDecodeGoSignozIoSignozPkgQueryServiceModel(in *jlexer.Lexer, out *SearchSpanResponseItem) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "timestamp": + out.TimeUnixNano = uint64(in.Uint64()) + case "durationNano": + out.DurationNano = int64(in.Int64()) + case "spanId": + out.SpanID = string(in.String()) + case "rootSpanId": + out.RootSpanID = string(in.String()) + case "traceId": + out.TraceID = string(in.String()) + case "hasError": + out.HasError = bool(in.Bool()) + case "kind": + out.Kind = int32(in.Int32()) + case "serviceName": + out.ServiceName = string(in.String()) + case "name": + out.Name = string(in.String()) + case "references": + if in.IsNull() { + in.Skip() + out.References = nil + } else { + in.Delim('[') + if out.References == nil { + if !in.IsDelim(']') { + out.References = make([]OtelSpanRef, 0, 1) + } else { + out.References = []OtelSpanRef{} + } + } else { + out.References = (out.References)[:0] + } + for !in.IsDelim(']') { + var v1 OtelSpanRef + easyjson6ff3ac1dDecodeGoSignozIoSignozPkgQueryServiceModel1(in, &v1) + out.References = append(out.References, v1) + in.WantComma() + } + in.Delim(']') + } + case "tagMap": + if in.IsNull() { + in.Skip() + } else { + in.Delim('{') + out.TagMap = make(map[string]string) + for !in.IsDelim('}') { + key := string(in.String()) + in.WantColon() + var v2 string + v2 = string(in.String()) + (out.TagMap)[key] = v2 + in.WantComma() + } + in.Delim('}') + } + case "event": + if in.IsNull() { + in.Skip() + out.Events = nil + } else { + in.Delim('[') + if out.Events == nil { + if !in.IsDelim(']') { + out.Events = make([]string, 0, 4) + } else { + out.Events = []string{} + } + } else { + out.Events = (out.Events)[:0] + } + for !in.IsDelim(']') { + var v3 string + v3 = string(in.String()) + out.Events = append(out.Events, v3) + in.WantComma() + } + in.Delim(']') + } + case "rootName": + out.RootName = string(in.String()) + default: + in.SkipRecursive() + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} +func easyjson6ff3ac1dEncodeGoSignozIoSignozPkgQueryServiceModel(out *jwriter.Writer, in SearchSpanResponseItem) { + out.RawByte('{') + first := true + _ = first + { + const prefix string = ",\"timestamp\":" + out.RawString(prefix[1:]) + out.Uint64(uint64(in.TimeUnixNano)) + } + { + const prefix string = ",\"durationNano\":" + out.RawString(prefix) + out.Int64(int64(in.DurationNano)) + } + { + const prefix string = ",\"spanId\":" + out.RawString(prefix) + out.String(string(in.SpanID)) + } + { + const prefix string = ",\"rootSpanId\":" + out.RawString(prefix) + out.String(string(in.RootSpanID)) + } + { + const prefix string = ",\"traceId\":" + out.RawString(prefix) + out.String(string(in.TraceID)) + } + { + const prefix string = ",\"hasError\":" + out.RawString(prefix) + out.Bool(bool(in.HasError)) + } + { + const prefix string = ",\"kind\":" + out.RawString(prefix) + out.Int32(int32(in.Kind)) + } + { + const prefix string = ",\"serviceName\":" + out.RawString(prefix) + out.String(string(in.ServiceName)) + } + { + const prefix string = ",\"name\":" + out.RawString(prefix) + out.String(string(in.Name)) + } + if len(in.References) != 0 { + const prefix string = ",\"references\":" + out.RawString(prefix) + { + out.RawByte('[') + for v4, v5 := range in.References { + if v4 > 0 { + out.RawByte(',') + } + easyjson6ff3ac1dEncodeGoSignozIoSignozPkgQueryServiceModel1(out, v5) + } + out.RawByte(']') + } + } + { + const prefix string = ",\"tagMap\":" + out.RawString(prefix) + if in.TagMap == nil && (out.Flags&jwriter.NilMapAsEmpty) == 0 { + out.RawString(`null`) + } else { + out.RawByte('{') + v6First := true + for v6Name, v6Value := range in.TagMap { + if v6First { + v6First = false + } else { + out.RawByte(',') + } + out.String(string(v6Name)) + out.RawByte(':') + out.String(string(v6Value)) + } + out.RawByte('}') + } + } + { + const prefix string = ",\"event\":" + out.RawString(prefix) + if in.Events == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { + out.RawString("null") + } else { + out.RawByte('[') + for v7, v8 := range in.Events { + if v7 > 0 { + out.RawByte(',') + } + out.String(string(v8)) + } + out.RawByte(']') + } + } + { + const prefix string = ",\"rootName\":" + out.RawString(prefix) + out.String(string(in.RootName)) + } + out.RawByte('}') +} + +// MarshalJSON supports json.Marshaler interface +func (v SearchSpanResponseItem) MarshalJSON() ([]byte, error) { + w := jwriter.Writer{} + easyjson6ff3ac1dEncodeGoSignozIoSignozPkgQueryServiceModel(&w, v) + return w.Buffer.BuildBytes(), w.Error +} + +// MarshalEasyJSON supports easyjson.Marshaler interface +func (v SearchSpanResponseItem) MarshalEasyJSON(w *jwriter.Writer) { + easyjson6ff3ac1dEncodeGoSignozIoSignozPkgQueryServiceModel(w, v) +} + +// UnmarshalJSON supports json.Unmarshaler interface +func (v *SearchSpanResponseItem) UnmarshalJSON(data []byte) error { + r := jlexer.Lexer{Data: data} + easyjson6ff3ac1dDecodeGoSignozIoSignozPkgQueryServiceModel(&r, v) + return r.Error() +} + +// UnmarshalEasyJSON supports easyjson.Unmarshaler interface +func (v *SearchSpanResponseItem) UnmarshalEasyJSON(l *jlexer.Lexer) { + easyjson6ff3ac1dDecodeGoSignozIoSignozPkgQueryServiceModel(l, v) +} +func easyjson6ff3ac1dDecodeGoSignozIoSignozPkgQueryServiceModel1(in *jlexer.Lexer, out *OtelSpanRef) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "traceId": + out.TraceId = string(in.String()) + case "spanId": + out.SpanId = string(in.String()) + case "refType": + out.RefType = string(in.String()) + default: + in.SkipRecursive() + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} +func easyjson6ff3ac1dEncodeGoSignozIoSignozPkgQueryServiceModel1(out *jwriter.Writer, in OtelSpanRef) { + out.RawByte('{') + first := true + _ = first + if in.TraceId != "" { + const prefix string = ",\"traceId\":" + first = false + out.RawString(prefix[1:]) + out.String(string(in.TraceId)) + } + if in.SpanId != "" { + const prefix string = ",\"spanId\":" + if first { + first = false + out.RawString(prefix[1:]) + } else { + out.RawString(prefix) + } + out.String(string(in.SpanId)) + } + if in.RefType != "" { + const prefix string = ",\"refType\":" + if first { + first = false + out.RawString(prefix[1:]) + } else { + out.RawString(prefix) + } + out.String(string(in.RefType)) + } + out.RawByte('}') +}