diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 67d5328416..8eaa1590b1 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -21,6 +21,7 @@ import ( "github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/valuer" + "github.com/mailru/easyjson" "github.com/uptrace/bun" "github.com/google/uuid" @@ -40,6 +41,7 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/app/logs" "github.com/SigNoz/signoz/pkg/query-service/app/resource" "github.com/SigNoz/signoz/pkg/query-service/app/services" + "github.com/SigNoz/signoz/pkg/query-service/app/traces/smart" "github.com/SigNoz/signoz/pkg/query-service/app/traces/tracedetail" "github.com/SigNoz/signoz/pkg/query-service/common" "github.com/SigNoz/signoz/pkg/query-service/constants" @@ -6791,3 +6793,262 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, metric return cachedMetadata, nil } + +func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) { + searchSpansResult := []model.SearchSpansResult{ + { + Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"}, + IsSubTree: false, + Events: make([][]interface{}, 0), + }, + } + + var traceSummary model.TraceSummary + summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable) + err := r.db.QueryRow(ctx, summaryQuery, params.TraceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans) + if err != nil { + if err == sql.ErrNoRows { + return &searchSpansResult, nil + } + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, fmt.Errorf("error in processing sql query") + } + + if traceSummary.NumSpans > uint64(params.MaxSpansInTrace) { + zap.L().Error("Max spans allowed in a trace limit reached", zap.Int("MaxSpansInTrace", params.MaxSpansInTrace), + zap.Uint64("Count", traceSummary.NumSpans)) + claims, ok := authtypes.ClaimsFromContext(ctx) + if ok { + data := map[string]interface{}{ + "traceSize": traceSummary.NumSpans, + "maxSpansInTraceLimit": params.MaxSpansInTrace, + "algo": "smart", + } + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_MAX_SPANS_ALLOWED_LIMIT_REACHED, data, claims.Email, true, false) + } + return nil, fmt.Errorf("max spans allowed in trace limit reached, please contact support for more details") + } + + claims, ok := authtypes.ClaimsFromContext(ctx) + if ok { + data := map[string]interface{}{ + "traceSize": traceSummary.NumSpans, + "algo": "smart", + } + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_TRACE_DETAIL_API, data, claims.Email, true, false) + } + + var startTime, endTime, durationNano uint64 + var searchScanResponses []model.SpanItemV2 + + query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3", r.TraceDB, r.traceTableName) + + start := time.Now() + + err = r.db.Select(ctx, &searchScanResponses, query, params.TraceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10)) + + zap.L().Info(query) + + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, fmt.Errorf("error in processing sql query") + } + end := time.Now() + zap.L().Debug("getTraceSQLQuery took: ", zap.Duration("duration", end.Sub(start))) + + searchSpansResult[0].Events = make([][]interface{}, len(searchScanResponses)) + + searchSpanResponses := []model.SearchSpanResponseItem{} + start = time.Now() + for _, item := range searchScanResponses { + ref := []model.OtelSpanRef{} + err := json.Unmarshal([]byte(item.References), &ref) + if err != nil { + zap.L().Error("Error unmarshalling references", zap.Error(err)) + return nil, err + } + + // merge attributes_number and attributes_bool to attributes_string + for k, v := range item.Attributes_bool { + item.Attributes_string[k] = fmt.Sprintf("%v", v) + } + for k, v := range item.Attributes_number { + item.Attributes_string[k] = fmt.Sprintf("%v", v) + } + for k, v := range item.Resources_string { + item.Attributes_string[k] = v + } + + jsonItem := model.SearchSpanResponseItem{ + SpanID: item.SpanID, + TraceID: item.TraceID, + ServiceName: item.ServiceName, + Name: item.Name, + Kind: int32(item.Kind), + DurationNano: int64(item.DurationNano), + HasError: item.HasError, + StatusMessage: item.StatusMessage, + StatusCodeString: item.StatusCodeString, + SpanKind: item.SpanKind, + References: ref, + Events: item.Events, + TagMap: item.Attributes_string, + } + + jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000) + + searchSpanResponses = append(searchSpanResponses, jsonItem) + if startTime == 0 || jsonItem.TimeUnixNano < startTime { + startTime = jsonItem.TimeUnixNano + } + if endTime == 0 || jsonItem.TimeUnixNano > endTime { + endTime = jsonItem.TimeUnixNano + } + if durationNano == 0 || uint64(jsonItem.DurationNano) > durationNano { + durationNano = uint64(jsonItem.DurationNano) + } + } + end = time.Now() + zap.L().Debug("getTraceSQLQuery unmarshal took: ", zap.Duration("duration", end.Sub(start))) + + if len(searchScanResponses) > params.SpansRenderLimit { + start = time.Now() + searchSpansResult, err = smart.SmartTraceAlgorithm(searchSpanResponses, params.SpanID, params.LevelUp, params.LevelDown, params.SpansRenderLimit) + if err != nil { + return nil, err + } + end = time.Now() + zap.L().Debug("smartTraceAlgo took: ", zap.Duration("duration", end.Sub(start))) + claims, ok := authtypes.ClaimsFromContext(ctx) + if ok { + data := map[string]interface{}{ + "traceSize": len(searchScanResponses), + "spansRenderLimit": params.SpansRenderLimit, + "algo": "smart", + } + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LARGE_TRACE_OPENED, data, claims.Email, true, false) + } + } else { + for i, item := range searchSpanResponses { + spanEvents := item.GetValues() + searchSpansResult[0].Events[i] = spanEvents + } + } + + searchSpansResult[0].StartTimestampMillis = startTime - (durationNano / 1000000) + searchSpansResult[0].EndTimestampMillis = endTime + (durationNano / 1000000) + + return &searchSpansResult, nil +} + +func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) { + + if r.useTraceNewSchema { + return r.SearchTracesV2(ctx, params) + } + + var countSpans uint64 + countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable) + err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans) + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, fmt.Errorf("error in processing sql query") + } + + if countSpans > uint64(params.MaxSpansInTrace) { + zap.L().Error("Max spans allowed in a trace limit reached", zap.Int("MaxSpansInTrace", params.MaxSpansInTrace), + zap.Uint64("Count", countSpans)) + claims, ok := authtypes.ClaimsFromContext(ctx) + if ok { + data := map[string]interface{}{ + "traceSize": countSpans, + "maxSpansInTraceLimit": params.MaxSpansInTrace, + "algo": "smart", + } + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_MAX_SPANS_ALLOWED_LIMIT_REACHED, data, claims.Email, true, false) + } + return nil, fmt.Errorf("max spans allowed in trace limit reached, please contact support for more details") + } + + claims, ok := authtypes.ClaimsFromContext(ctx) + if ok { + data := map[string]interface{}{ + "traceSize": countSpans, + "algo": "smart", + } + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_TRACE_DETAIL_API, data, claims.Email, true, false) + } + + var startTime, endTime, durationNano uint64 + var searchScanResponses []model.SearchSpanDBResponseItem + + query := fmt.Sprintf("SELECT timestamp, traceID, model FROM %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable) + + start := time.Now() + + err = r.db.Select(ctx, &searchScanResponses, query, params.TraceID) + + zap.L().Info(query) + + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, fmt.Errorf("error in processing sql query") + } + end := time.Now() + zap.L().Debug("getTraceSQLQuery took: ", zap.Duration("duration", end.Sub(start))) + searchSpansResult := []model.SearchSpansResult{{ + Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"}, + Events: make([][]interface{}, len(searchScanResponses)), + IsSubTree: false, + }, + } + + searchSpanResponses := []model.SearchSpanResponseItem{} + start = time.Now() + for _, item := range searchScanResponses { + var jsonItem model.SearchSpanResponseItem + easyjson.Unmarshal([]byte(item.Model), &jsonItem) + jsonItem.TimeUnixNano = uint64(item.Timestamp.UnixNano() / 1000000) + searchSpanResponses = append(searchSpanResponses, jsonItem) + if startTime == 0 || jsonItem.TimeUnixNano < startTime { + startTime = jsonItem.TimeUnixNano + } + if endTime == 0 || jsonItem.TimeUnixNano > endTime { + endTime = jsonItem.TimeUnixNano + } + if durationNano == 0 || uint64(jsonItem.DurationNano) > durationNano { + durationNano = uint64(jsonItem.DurationNano) + } + } + end = time.Now() + zap.L().Debug("getTraceSQLQuery unmarshal took: ", zap.Duration("duration", end.Sub(start))) + + if len(searchScanResponses) > params.SpansRenderLimit { + start = time.Now() + searchSpansResult, err = smart.SmartTraceAlgorithm(searchSpanResponses, params.SpanID, params.LevelUp, params.LevelDown, params.SpansRenderLimit) + if err != nil { + return nil, err + } + end = time.Now() + zap.L().Debug("smartTraceAlgo took: ", zap.Duration("duration", end.Sub(start))) + claims, ok := authtypes.ClaimsFromContext(ctx) + if ok { + data := map[string]interface{}{ + "traceSize": len(searchScanResponses), + "spansRenderLimit": params.SpansRenderLimit, + "algo": "smart", + } + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LARGE_TRACE_OPENED, data, claims.Email, true, false) + } + } else { + for i, item := range searchSpanResponses { + spanEvents := item.GetValues() + searchSpansResult[0].Events[i] = spanEvents + } + } + + searchSpansResult[0].StartTimestampMillis = startTime - (durationNano / 1000000) + searchSpansResult[0].EndTimestampMillis = endTime + (durationNano / 1000000) + + return &searchSpansResult, nil +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 86b30e9ed7..2401e32828 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -549,6 +549,7 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) { router.HandleFunc("/api/v1/services/list", am.ViewAccess(aH.getServicesList)).Methods(http.MethodGet) router.HandleFunc("/api/v1/service/top_operations", am.ViewAccess(aH.getTopOperations)).Methods(http.MethodPost) router.HandleFunc("/api/v1/service/top_level_operations", am.ViewAccess(aH.getServicesTopLevelOps)).Methods(http.MethodPost) + router.HandleFunc("/api/v1/traces/{traceId}", am.ViewAccess(aH.SearchTraces)).Methods(http.MethodGet) router.HandleFunc("/api/v1/usage", am.ViewAccess(aH.getUsage)).Methods(http.MethodGet) router.HandleFunc("/api/v1/dependency_graph", am.ViewAccess(aH.dependencyGraph)).Methods(http.MethodPost) router.HandleFunc("/api/v1/settings/ttl", am.AdminAccess(aH.setTTL)).Methods(http.MethodPost) @@ -1724,6 +1725,22 @@ func (aH *APIHandler) getServicesList(w http.ResponseWriter, r *http.Request) { } +func (aH *APIHandler) SearchTraces(w http.ResponseWriter, r *http.Request) { + params, err := ParseSearchTracesParams(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params") + return + } + + result, err := aH.reader.SearchTraces(r.Context(), params) + if aH.HandleError(w, err, http.StatusBadRequest) { + return + } + + aH.WriteJSON(w, r, result) + +} + func (aH *APIHandler) GetWaterfallSpansForTraceWithMetadata(w http.ResponseWriter, r *http.Request) { traceID := mux.Vars(r)["traceId"] if traceID == "" { diff --git a/ee/query-service/model/trace.go b/pkg/query-service/app/traces/smart/model.go similarity index 86% rename from ee/query-service/model/trace.go rename to pkg/query-service/app/traces/smart/model.go index 708d6d1c5c..c121da5a79 100644 --- a/ee/query-service/model/trace.go +++ b/pkg/query-service/app/traces/smart/model.go @@ -1,4 +1,4 @@ -package model +package smart type SpanForTraceDetails struct { TimeUnixNano uint64 `json:"timestamp"` @@ -15,8 +15,3 @@ type SpanForTraceDetails struct { HasError bool `json:"hasError"` Children []*SpanForTraceDetails `json:"children"` } - -type GetSpansSubQueryDBResponse struct { - SpanID string `ch:"spanID"` - TraceID string `ch:"traceID"` -} diff --git a/ee/query-service/app/db/trace.go b/pkg/query-service/app/traces/smart/trace.go similarity index 87% rename from ee/query-service/app/db/trace.go rename to pkg/query-service/app/traces/smart/trace.go index 3a3420c306..3f38747eef 100644 --- a/ee/query-service/app/db/trace.go +++ b/pkg/query-service/app/traces/smart/trace.go @@ -1,17 +1,16 @@ -package db +package smart import ( "errors" "strconv" - "github.com/SigNoz/signoz/ee/query-service/model" basemodel "github.com/SigNoz/signoz/pkg/query-service/model" "go.uber.org/zap" ) // SmartTraceAlgorithm is an algorithm to find the target span and build a tree of spans around it with the given levelUp and levelDown parameters and the given spanLimit func SmartTraceAlgorithm(payload []basemodel.SearchSpanResponseItem, targetSpanId string, levelUp int, levelDown int, spanLimit int) ([]basemodel.SearchSpansResult, error) { - var spans []*model.SpanForTraceDetails + var spans []*SpanForTraceDetails // if targetSpanId is null or not present then randomly select a span as targetSpanId if (targetSpanId == "" || targetSpanId == "null") && len(payload) > 0 { @@ -24,7 +23,7 @@ func SmartTraceAlgorithm(payload []basemodel.SearchSpanResponseItem, targetSpanI if len(spanItem.References) > 0 && spanItem.References[0].RefType == "CHILD_OF" { parentID = spanItem.References[0].SpanId } - span := &model.SpanForTraceDetails{ + span := &SpanForTraceDetails{ TimeUnixNano: spanItem.TimeUnixNano, SpanID: spanItem.SpanID, TraceID: spanItem.TraceID, @@ -45,7 +44,7 @@ func SmartTraceAlgorithm(payload []basemodel.SearchSpanResponseItem, targetSpanI if err != nil { return nil, err } - targetSpan := &model.SpanForTraceDetails{} + targetSpan := &SpanForTraceDetails{} // Find the target span in the span trees for _, root := range roots { @@ -65,7 +64,7 @@ func SmartTraceAlgorithm(payload []basemodel.SearchSpanResponseItem, targetSpanI } // Build the final result - parents := []*model.SpanForTraceDetails{} + parents := []*SpanForTraceDetails{} // Get the parent spans of the target span up to the given levelUp parameter and spanLimit preParent := targetSpan @@ -90,11 +89,11 @@ func SmartTraceAlgorithm(payload []basemodel.SearchSpanResponseItem, targetSpanI } // Get the child spans of the target span until the given levelDown and spanLimit - preParents := []*model.SpanForTraceDetails{targetSpan} - children := []*model.SpanForTraceDetails{} + preParents := []*SpanForTraceDetails{targetSpan} + children := []*SpanForTraceDetails{} for i := 0; i < levelDown && len(preParents) != 0 && spanLimit > 0; i++ { - parents := []*model.SpanForTraceDetails{} + parents := []*SpanForTraceDetails{} for _, parent := range preParents { if spanLimit-len(parent.Children) <= 0 { children = append(children, parent.Children[:spanLimit]...) @@ -108,7 +107,7 @@ func SmartTraceAlgorithm(payload []basemodel.SearchSpanResponseItem, targetSpanI } // Store the final list of spans in the resultSpanSet map to avoid duplicates - resultSpansSet := make(map[*model.SpanForTraceDetails]struct{}) + resultSpansSet := make(map[*SpanForTraceDetails]struct{}) resultSpansSet[targetSpan] = struct{}{} for _, parent := range parents { resultSpansSet[parent] = struct{}{} @@ -169,12 +168,12 @@ func SmartTraceAlgorithm(payload []basemodel.SearchSpanResponseItem, targetSpanI } // buildSpanTrees builds trees of spans from a list of spans. -func buildSpanTrees(spansPtr *[]*model.SpanForTraceDetails) ([]*model.SpanForTraceDetails, error) { +func buildSpanTrees(spansPtr *[]*SpanForTraceDetails) ([]*SpanForTraceDetails, error) { // Build a map of spanID to span for fast lookup - var roots []*model.SpanForTraceDetails + var roots []*SpanForTraceDetails spans := *spansPtr - mapOfSpans := make(map[string]*model.SpanForTraceDetails, len(spans)) + mapOfSpans := make(map[string]*SpanForTraceDetails, len(spans)) for _, span := range spans { if span.ParentID == "" { @@ -206,8 +205,8 @@ func buildSpanTrees(spansPtr *[]*model.SpanForTraceDetails) ([]*model.SpanForTra } // breadthFirstSearch performs a breadth-first search on the span tree to find the target span. -func breadthFirstSearch(spansPtr *model.SpanForTraceDetails, targetId string) (*model.SpanForTraceDetails, error) { - queue := []*model.SpanForTraceDetails{spansPtr} +func breadthFirstSearch(spansPtr *SpanForTraceDetails, targetId string) (*SpanForTraceDetails, error) { + queue := []*SpanForTraceDetails{spansPtr} visited := make(map[string]bool) for len(queue) > 0 { diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 4d0af440e1..7379a9d281 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -39,6 +39,7 @@ type Reader interface { GetNextPrevErrorIDs(ctx context.Context, params *model.GetErrorParams) (*model.NextPrevErrorIDs, *model.ApiError) // Search Interfaces + SearchTraces(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) GetWaterfallSpansForTraceWithMetadata(ctx context.Context, traceID string, req *model.GetWaterfallSpansForTraceWithMetadataParams) (*model.GetWaterfallSpansForTraceWithMetadataResponse, *model.ApiError) GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError)