mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 06:58:58 +08:00
feat(trace-details): query service changes for trace details (#6906)
* feat(trace-details): query service changes for trace details * feat(trace-detail): refactor query service trace apis * feat(trace-detail): minor bug fixes * feat(trace-detail): address review comments * feat(trace-detail): address review comments
This commit is contained in:
parent
390b04c015
commit
1e61e6c2f6
@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"go.signoz.io/signoz/pkg/cache"
|
||||
basechr "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
|
||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
)
|
||||
@ -27,8 +28,10 @@ func NewDataConnector(
|
||||
cluster string,
|
||||
useLogsNewSchema bool,
|
||||
useTraceNewSchema bool,
|
||||
fluxIntervalForTraceDetail time.Duration,
|
||||
cache cache.Cache,
|
||||
) *ClickhouseReader {
|
||||
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema)
|
||||
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
|
||||
return &ClickhouseReader{
|
||||
conn: ch.GetConn(),
|
||||
appdb: localDB,
|
||||
|
@ -79,6 +79,7 @@ type ServerOptions struct {
|
||||
DialTimeout time.Duration
|
||||
CacheConfigPath string
|
||||
FluxInterval string
|
||||
FluxIntervalForTraceDetail string
|
||||
Cluster string
|
||||
GatewayUrl string
|
||||
UseLogsNewSchema bool
|
||||
@ -145,6 +146,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
modelDao.SetFlagProvider(lm)
|
||||
readerReady := make(chan bool)
|
||||
|
||||
fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var reader interfaces.DataConnector
|
||||
storage := os.Getenv("STORAGE")
|
||||
if storage == "clickhouse" {
|
||||
@ -159,6 +165,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
serverOptions.Cluster,
|
||||
serverOptions.UseLogsNewSchema,
|
||||
serverOptions.UseTraceNewSchema,
|
||||
fluxIntervalForTraceDetail,
|
||||
serverOptions.SigNoz.Cache,
|
||||
)
|
||||
go qb.Start(readerReady)
|
||||
reader = qb
|
||||
@ -250,7 +258,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
telemetry.GetInstance().SetSaasOperator(constants.SaasSegmentKey)
|
||||
|
||||
fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -99,7 +99,7 @@ func main() {
|
||||
|
||||
var useLogsNewSchema bool
|
||||
var useTraceNewSchema bool
|
||||
var cacheConfigPath, fluxInterval string
|
||||
var cacheConfigPath, fluxInterval, fluxIntervalForTraceDetail string
|
||||
var enableQueryServiceLogOTLPExport bool
|
||||
var preferSpanMetrics bool
|
||||
|
||||
@ -121,6 +121,7 @@ func main() {
|
||||
flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)")
|
||||
flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)")
|
||||
flag.StringVar(&fluxInterval, "flux-interval", "5m", "(the interval to exclude data from being cached to avoid incorrect cache for data in motion)")
|
||||
flag.StringVar(&fluxIntervalForTraceDetail, "flux-interval-trace-detail", "2m", "(the interval to exclude data from being cached to avoid incorrect cache for trace data in motion)")
|
||||
flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)")
|
||||
flag.StringVar(&cluster, "cluster", "cluster", "(cluster name - defaults to 'cluster')")
|
||||
flag.StringVar(&gatewayUrl, "gateway-url", "", "(url to the gateway)")
|
||||
@ -165,6 +166,7 @@ func main() {
|
||||
DialTimeout: dialTimeout,
|
||||
CacheConfigPath: cacheConfigPath,
|
||||
FluxInterval: fluxInterval,
|
||||
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
|
||||
Cluster: cluster,
|
||||
GatewayUrl: gatewayUrl,
|
||||
UseLogsNewSchema: useLogsNewSchema,
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"go.signoz.io/signoz/pkg/cache"
|
||||
|
||||
promModel "github.com/prometheus/common/model"
|
||||
"go.uber.org/zap"
|
||||
@ -41,6 +42,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/app/logs"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/resource"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/services"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/traces/tracedetail"
|
||||
"go.signoz.io/signoz/pkg/query-service/auth"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
@ -156,6 +158,9 @@ type ClickHouseReader struct {
|
||||
traceLocalTableName string
|
||||
traceResourceTableV3 string
|
||||
traceSummaryTable string
|
||||
|
||||
fluxIntervalForTraceDetail time.Duration
|
||||
cache cache.Cache
|
||||
}
|
||||
|
||||
// NewTraceReader returns a TraceReader for the database
|
||||
@ -169,6 +174,8 @@ func NewReader(
|
||||
cluster string,
|
||||
useLogsNewSchema bool,
|
||||
useTraceNewSchema bool,
|
||||
fluxIntervalForTraceDetail time.Duration,
|
||||
cache cache.Cache,
|
||||
) *ClickHouseReader {
|
||||
|
||||
datasource := os.Getenv("ClickHouseUrl")
|
||||
@ -179,7 +186,7 @@ func NewReader(
|
||||
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
|
||||
}
|
||||
|
||||
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema)
|
||||
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
|
||||
}
|
||||
|
||||
func NewReaderFromClickhouseConnection(
|
||||
@ -191,6 +198,8 @@ func NewReaderFromClickhouseConnection(
|
||||
cluster string,
|
||||
useLogsNewSchema bool,
|
||||
useTraceNewSchema bool,
|
||||
fluxIntervalForTraceDetail time.Duration,
|
||||
cache cache.Cache,
|
||||
) *ClickHouseReader {
|
||||
alertManager, err := am.New()
|
||||
if err != nil {
|
||||
@ -277,6 +286,9 @@ func NewReaderFromClickhouseConnection(
|
||||
traceTableName: traceTableName,
|
||||
traceResourceTableV3: options.primary.TraceResourceTableV3,
|
||||
traceSummaryTable: options.primary.TraceSummaryTable,
|
||||
|
||||
fluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
@ -1442,6 +1454,372 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.Searc
|
||||
return &searchSpansResult, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetSpansForTrace(ctx context.Context, traceID string, traceDetailsQuery string) ([]model.SpanItemV2, *model.ApiError) {
|
||||
var traceSummary model.TraceSummary
|
||||
summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable)
|
||||
err := r.db.QueryRow(ctx, summaryQuery, traceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return []model.SpanItemV2{}, nil
|
||||
}
|
||||
zap.L().Error("Error in processing trace summary sql query", zap.Error(err))
|
||||
return nil, model.ExecutionError(fmt.Errorf("error in processing trace summary sql query: %w", err))
|
||||
}
|
||||
|
||||
var searchScanResponses []model.SpanItemV2
|
||||
queryStartTime := time.Now()
|
||||
err = r.db.Select(ctx, &searchScanResponses, traceDetailsQuery, traceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10))
|
||||
zap.L().Info(traceDetailsQuery)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, model.ExecutionError(fmt.Errorf("error in processing trace data sql query: %w", err))
|
||||
}
|
||||
zap.L().Info("trace details query took: ", zap.Duration("duration", time.Since(queryStartTime)), zap.String("traceID", traceID))
|
||||
|
||||
return searchScanResponses, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadataCache(ctx context.Context, traceID string) (*model.GetWaterfallSpansForTraceWithMetadataCache, error) {
|
||||
cachedTraceData := new(model.GetWaterfallSpansForTraceWithMetadataCache)
|
||||
cacheStatus, err := r.cache.Retrieve(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), cachedTraceData, false)
|
||||
if err != nil {
|
||||
zap.L().Debug("error in retrieving getWaterfallSpansForTraceWithMetadata cache", zap.Error(err), zap.String("traceID", traceID))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cacheStatus != cache.RetrieveStatusHit {
|
||||
return nil, errors.Errorf("cache status for getWaterfallSpansForTraceWithMetadata : %s, traceID: %s", cacheStatus, traceID)
|
||||
}
|
||||
|
||||
if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail {
|
||||
zap.L().Info("the trace end time falls under the flux interval, skipping getWaterfallSpansForTraceWithMetadata cache", zap.String("traceID", traceID))
|
||||
return nil, errors.Errorf("the trace end time falls under the flux interval, skipping getWaterfallSpansForTraceWithMetadata cache, traceID: %s", traceID)
|
||||
}
|
||||
|
||||
zap.L().Info("cache is successfully hit, applying cache for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID))
|
||||
return cachedTraceData, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Context, traceID string, req *model.GetWaterfallSpansForTraceWithMetadataParams) (*model.GetWaterfallSpansForTraceWithMetadataResponse, *model.ApiError) {
|
||||
response := new(model.GetWaterfallSpansForTraceWithMetadataResponse)
|
||||
var startTime, endTime, durationNano, totalErrorSpans, totalSpans uint64
|
||||
var spanIdToSpanNodeMap = map[string]*model.Span{}
|
||||
var traceRoots []*model.Span
|
||||
var serviceNameToTotalDurationMap = map[string]uint64{}
|
||||
var serviceNameIntervalMap = map[string][]tracedetail.Interval{}
|
||||
|
||||
cachedTraceData, err := r.GetWaterfallSpansForTraceWithMetadataCache(ctx, traceID)
|
||||
if err == nil {
|
||||
startTime = cachedTraceData.StartTime
|
||||
endTime = cachedTraceData.EndTime
|
||||
durationNano = cachedTraceData.DurationNano
|
||||
spanIdToSpanNodeMap = cachedTraceData.SpanIdToSpanNodeMap
|
||||
serviceNameToTotalDurationMap = cachedTraceData.ServiceNameToTotalDurationMap
|
||||
traceRoots = cachedTraceData.TraceRoots
|
||||
totalSpans = cachedTraceData.TotalSpans
|
||||
totalErrorSpans = cachedTraceData.TotalErrorSpans
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
zap.L().Info("cache miss for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID))
|
||||
|
||||
searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(searchScanResponses) == 0 {
|
||||
return response, nil
|
||||
}
|
||||
totalSpans = uint64(len(searchScanResponses))
|
||||
|
||||
processingBeforeCache := time.Now()
|
||||
for _, item := range searchScanResponses {
|
||||
ref := []model.OtelSpanRef{}
|
||||
err := json.Unmarshal([]byte(item.References), &ref)
|
||||
if err != nil {
|
||||
zap.L().Error("getWaterfallSpansForTraceWithMetadata: error unmarshalling references", zap.Error(err), zap.String("traceID", traceID))
|
||||
return nil, model.BadRequest(fmt.Errorf("getWaterfallSpansForTraceWithMetadata: error unmarshalling references %w", err))
|
||||
}
|
||||
|
||||
// merge attributes_number and attributes_bool to attributes_string
|
||||
for k, v := range item.Attributes_bool {
|
||||
item.Attributes_string[k] = fmt.Sprintf("%v", v)
|
||||
}
|
||||
for k, v := range item.Attributes_number {
|
||||
item.Attributes_string[k] = fmt.Sprintf("%v", v)
|
||||
}
|
||||
for k, v := range item.Resources_string {
|
||||
item.Attributes_string[k] = v
|
||||
}
|
||||
|
||||
jsonItem := model.Span{
|
||||
SpanID: item.SpanID,
|
||||
TraceID: item.TraceID,
|
||||
ServiceName: item.ServiceName,
|
||||
Name: item.Name,
|
||||
Kind: int32(item.Kind),
|
||||
DurationNano: item.DurationNano,
|
||||
HasError: item.HasError,
|
||||
StatusMessage: item.StatusMessage,
|
||||
StatusCodeString: item.StatusCodeString,
|
||||
SpanKind: item.SpanKind,
|
||||
References: ref,
|
||||
Events: item.Events,
|
||||
TagMap: item.Attributes_string,
|
||||
Children: make([]*model.Span, 0),
|
||||
}
|
||||
|
||||
// convert start timestamp to millis
|
||||
jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)
|
||||
|
||||
// collect the intervals for service for execution time calculation
|
||||
serviceNameIntervalMap[jsonItem.ServiceName] =
|
||||
append(serviceNameIntervalMap[jsonItem.ServiceName], tracedetail.Interval{StartTime: jsonItem.TimeUnixNano, Duration: jsonItem.DurationNano / 1000000, Service: jsonItem.ServiceName})
|
||||
|
||||
// append to the span node map
|
||||
spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem
|
||||
|
||||
// metadata calculation
|
||||
if startTime == 0 || jsonItem.TimeUnixNano < startTime {
|
||||
startTime = jsonItem.TimeUnixNano
|
||||
}
|
||||
if endTime == 0 || (jsonItem.TimeUnixNano+(jsonItem.DurationNano/1000000)) > endTime {
|
||||
endTime = jsonItem.TimeUnixNano + (jsonItem.DurationNano / 1000000)
|
||||
}
|
||||
if durationNano == 0 || jsonItem.DurationNano > durationNano {
|
||||
durationNano = jsonItem.DurationNano
|
||||
}
|
||||
if jsonItem.HasError {
|
||||
totalErrorSpans = totalErrorSpans + 1
|
||||
}
|
||||
}
|
||||
|
||||
// traverse through the map and append each node to the children array of the parent node
|
||||
// and add the missing spans
|
||||
for _, spanNode := range spanIdToSpanNodeMap {
|
||||
hasParentSpanNode := false
|
||||
for _, reference := range spanNode.References {
|
||||
if reference.RefType == "CHILD_OF" && reference.SpanId != "" {
|
||||
hasParentSpanNode = true
|
||||
|
||||
if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists {
|
||||
parentNode.Children = append(parentNode.Children, spanNode)
|
||||
} else {
|
||||
// insert the missing span
|
||||
missingSpan := model.Span{
|
||||
SpanID: reference.SpanId,
|
||||
TraceID: spanNode.TraceID,
|
||||
ServiceName: "",
|
||||
Name: "Missing Span",
|
||||
TimeUnixNano: spanNode.TimeUnixNano,
|
||||
Kind: 0,
|
||||
DurationNano: spanNode.DurationNano,
|
||||
HasError: false,
|
||||
StatusMessage: "",
|
||||
StatusCodeString: "",
|
||||
SpanKind: "",
|
||||
Children: make([]*model.Span, 0),
|
||||
}
|
||||
missingSpan.Children = append(missingSpan.Children, spanNode)
|
||||
spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan
|
||||
traceRoots = append(traceRoots, &missingSpan)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !hasParentSpanNode && !tracedetail.ContainsWaterfallSpan(traceRoots, spanNode) {
|
||||
traceRoots = append(traceRoots, spanNode)
|
||||
}
|
||||
}
|
||||
|
||||
// sort the trace roots to add missing spans at the right order
|
||||
sort.Slice(traceRoots, func(i, j int) bool {
|
||||
if traceRoots[i].TimeUnixNano == traceRoots[j].TimeUnixNano {
|
||||
return traceRoots[i].Name < traceRoots[j].Name
|
||||
}
|
||||
return traceRoots[i].TimeUnixNano < traceRoots[j].TimeUnixNano
|
||||
})
|
||||
|
||||
serviceNameToTotalDurationMap = tracedetail.CalculateServiceTime(serviceNameIntervalMap)
|
||||
|
||||
traceCache := model.GetWaterfallSpansForTraceWithMetadataCache{
|
||||
StartTime: startTime,
|
||||
EndTime: endTime,
|
||||
DurationNano: durationNano,
|
||||
TotalSpans: totalSpans,
|
||||
TotalErrorSpans: totalErrorSpans,
|
||||
SpanIdToSpanNodeMap: spanIdToSpanNodeMap,
|
||||
ServiceNameToTotalDurationMap: serviceNameToTotalDurationMap,
|
||||
TraceRoots: traceRoots,
|
||||
}
|
||||
|
||||
zap.L().Info("getWaterfallSpansForTraceWithMetadata: processing pre cache", zap.Duration("duration", time.Since(processingBeforeCache)), zap.String("traceID", traceID))
|
||||
cacheErr := r.cache.Store(ctx, fmt.Sprintf("getWaterfallSpansForTraceWithMetadata-%v", traceID), &traceCache, time.Minute*5)
|
||||
if cacheErr != nil {
|
||||
zap.L().Debug("failed to store cache for getWaterfallSpansForTraceWithMetadata", zap.String("traceID", traceID), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
processingPostCache := time.Now()
|
||||
selectedSpans, uncollapsedSpans, rootServiceName, rootServiceEntryPoint := tracedetail.GetSelectedSpans(req.UncollapsedSpans, req.SelectedSpanID, traceRoots, spanIdToSpanNodeMap, req.IsSelectedSpanIDUnCollapsed)
|
||||
zap.L().Info("getWaterfallSpansForTraceWithMetadata: processing post cache", zap.Duration("duration", time.Since(processingPostCache)), zap.String("traceID", traceID))
|
||||
|
||||
response.Spans = selectedSpans
|
||||
response.UncollapsedSpans = uncollapsedSpans
|
||||
response.StartTimestampMillis = startTime
|
||||
response.EndTimestampMillis = endTime
|
||||
response.TotalSpansCount = totalSpans
|
||||
response.TotalErrorSpansCount = totalErrorSpans
|
||||
response.RootServiceName = rootServiceName
|
||||
response.RootServiceEntryPoint = rootServiceEntryPoint
|
||||
response.ServiceNameToTotalDurationMap = serviceNameToTotalDurationMap
|
||||
response.HasMissingSpans = len(traceRoots) > 1
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetFlamegraphSpansForTraceCache(ctx context.Context, traceID string) (*model.GetFlamegraphSpansForTraceCache, error) {
|
||||
cachedTraceData := new(model.GetFlamegraphSpansForTraceCache)
|
||||
cacheStatus, err := r.cache.Retrieve(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), cachedTraceData, false)
|
||||
if err != nil {
|
||||
zap.L().Debug("error in retrieving getFlamegraphSpansForTrace cache", zap.Error(err), zap.String("traceID", traceID))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cacheStatus != cache.RetrieveStatusHit {
|
||||
return nil, errors.Errorf("cache status for getFlamegraphSpansForTrace : %s, traceID: %s", cacheStatus, traceID)
|
||||
}
|
||||
|
||||
if time.Since(time.UnixMilli(int64(cachedTraceData.EndTime))) < r.fluxIntervalForTraceDetail {
|
||||
zap.L().Info("the trace end time falls under the flux interval, skipping getFlamegraphSpansForTrace cache", zap.String("traceID", traceID))
|
||||
return nil, errors.Errorf("the trace end time falls under the flux interval, skipping getFlamegraphSpansForTrace cache, traceID: %s", traceID)
|
||||
}
|
||||
|
||||
zap.L().Info("cache is successfully hit, applying cache for getFlamegraphSpansForTrace", zap.String("traceID", traceID))
|
||||
return cachedTraceData, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError) {
|
||||
trace := new(model.GetFlamegraphSpansForTraceResponse)
|
||||
var startTime, endTime, durationNano uint64
|
||||
var spanIdToSpanNodeMap = map[string]*model.FlamegraphSpan{}
|
||||
// map[traceID][level]span
|
||||
var selectedSpans = [][]*model.FlamegraphSpan{}
|
||||
var traceRoots []*model.FlamegraphSpan
|
||||
|
||||
// get the trace tree from cache!
|
||||
cachedTraceData, err := r.GetFlamegraphSpansForTraceCache(ctx, traceID)
|
||||
|
||||
if err == nil {
|
||||
startTime = cachedTraceData.StartTime
|
||||
endTime = cachedTraceData.EndTime
|
||||
durationNano = cachedTraceData.DurationNano
|
||||
selectedSpans = cachedTraceData.SelectedSpans
|
||||
traceRoots = cachedTraceData.TraceRoots
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
zap.L().Info("cache miss for getFlamegraphSpansForTrace", zap.String("traceID", traceID))
|
||||
|
||||
searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error,references, resource_string_service$$name, name FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(searchScanResponses) == 0 {
|
||||
return trace, nil
|
||||
}
|
||||
|
||||
processingBeforeCache := time.Now()
|
||||
for _, item := range searchScanResponses {
|
||||
ref := []model.OtelSpanRef{}
|
||||
err := json.Unmarshal([]byte(item.References), &ref)
|
||||
if err != nil {
|
||||
zap.L().Error("Error unmarshalling references", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error in unmarshalling references: %w", err)}
|
||||
}
|
||||
|
||||
jsonItem := model.FlamegraphSpan{
|
||||
SpanID: item.SpanID,
|
||||
TraceID: item.TraceID,
|
||||
ServiceName: item.ServiceName,
|
||||
Name: item.Name,
|
||||
DurationNano: item.DurationNano,
|
||||
HasError: item.HasError,
|
||||
References: ref,
|
||||
Children: make([]*model.FlamegraphSpan, 0),
|
||||
}
|
||||
|
||||
jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)
|
||||
spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem
|
||||
|
||||
// metadata calculation
|
||||
if startTime == 0 || jsonItem.TimeUnixNano < startTime {
|
||||
startTime = jsonItem.TimeUnixNano
|
||||
}
|
||||
if endTime == 0 || (jsonItem.TimeUnixNano+(jsonItem.DurationNano/1000000)) > endTime {
|
||||
endTime = jsonItem.TimeUnixNano + (jsonItem.DurationNano / 1000000)
|
||||
}
|
||||
if durationNano == 0 || jsonItem.DurationNano > durationNano {
|
||||
durationNano = jsonItem.DurationNano
|
||||
}
|
||||
}
|
||||
|
||||
// traverse through the map and append each node to the children array of the parent node
|
||||
// and add missing spans
|
||||
for _, spanNode := range spanIdToSpanNodeMap {
|
||||
hasParentSpanNode := false
|
||||
for _, reference := range spanNode.References {
|
||||
if reference.RefType == "CHILD_OF" && reference.SpanId != "" {
|
||||
hasParentSpanNode = true
|
||||
if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists {
|
||||
parentNode.Children = append(parentNode.Children, spanNode)
|
||||
} else {
|
||||
// insert the missing spans
|
||||
missingSpan := model.FlamegraphSpan{
|
||||
SpanID: reference.SpanId,
|
||||
TraceID: spanNode.TraceID,
|
||||
ServiceName: "",
|
||||
Name: "Missing Span",
|
||||
TimeUnixNano: spanNode.TimeUnixNano,
|
||||
DurationNano: spanNode.DurationNano,
|
||||
HasError: false,
|
||||
Children: make([]*model.FlamegraphSpan, 0),
|
||||
}
|
||||
missingSpan.Children = append(missingSpan.Children, spanNode)
|
||||
spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan
|
||||
traceRoots = append(traceRoots, &missingSpan)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !hasParentSpanNode && !tracedetail.ContainsFlamegraphSpan(traceRoots, spanNode) {
|
||||
traceRoots = append(traceRoots, spanNode)
|
||||
}
|
||||
}
|
||||
|
||||
selectedSpans = tracedetail.GetSelectedSpansForFlamegraph(traceRoots, spanIdToSpanNodeMap)
|
||||
traceCache := model.GetFlamegraphSpansForTraceCache{
|
||||
StartTime: startTime,
|
||||
EndTime: endTime,
|
||||
DurationNano: durationNano,
|
||||
SelectedSpans: selectedSpans,
|
||||
TraceRoots: traceRoots,
|
||||
}
|
||||
|
||||
zap.L().Info("getFlamegraphSpansForTrace: processing pre cache", zap.Duration("duration", time.Since(processingBeforeCache)), zap.String("traceID", traceID))
|
||||
cacheErr := r.cache.Store(ctx, fmt.Sprintf("getFlamegraphSpansForTrace-%v", traceID), &traceCache, time.Minute*5)
|
||||
if cacheErr != nil {
|
||||
zap.L().Debug("failed to store cache for getFlamegraphSpansForTrace", zap.String("traceID", traceID), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
processingPostCache := time.Now()
|
||||
selectedSpansForRequest := tracedetail.GetSelectedSpansForFlamegraphForRequest(req.SelectedSpanID, selectedSpans)
|
||||
zap.L().Info("getFlamegraphSpansForTrace: processing post cache", zap.Duration("duration", time.Since(processingPostCache)), zap.String("traceID", traceID))
|
||||
|
||||
trace.Spans = selectedSpansForRequest
|
||||
trace.StartTimestampMillis = startTime
|
||||
trace.EndTimestampMillis = endTime
|
||||
return trace, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) {
|
||||
|
||||
response := []model.ServiceMapDependencyResponseItem{}
|
||||
|
@ -546,6 +546,8 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
|
||||
|
||||
router.HandleFunc("/api/v2/traces/fields", am.ViewAccess(aH.traceFields)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v2/traces/fields", am.EditAccess(aH.updateTraceField)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v2/traces/flamegraph/{traceId}", am.ViewAccess(aH.GetFlamegraphSpansForTrace)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v2/traces/waterfall/{traceId}", am.ViewAccess(aH.GetWaterfallSpansForTraceWithMetadata)).Methods(http.MethodPost)
|
||||
|
||||
router.HandleFunc("/api/v1/version", am.OpenAccess(aH.getVersion)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/featureFlags", am.OpenAccess(aH.getFeatureFlags)).Methods(http.MethodGet)
|
||||
@ -1777,6 +1779,52 @@ func (aH *APIHandler) SearchTraces(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) GetWaterfallSpansForTraceWithMetadata(w http.ResponseWriter, r *http.Request) {
|
||||
traceID := mux.Vars(r)["traceId"]
|
||||
if traceID == "" {
|
||||
RespondError(w, model.BadRequest(errors.New("traceID is required")), nil)
|
||||
return
|
||||
}
|
||||
|
||||
req := new(model.GetWaterfallSpansForTraceWithMetadataParams)
|
||||
err := json.NewDecoder(r.Body).Decode(&req)
|
||||
if err != nil {
|
||||
RespondError(w, model.BadRequest(err), nil)
|
||||
return
|
||||
}
|
||||
|
||||
result, apiErr := aH.reader.GetWaterfallSpansForTraceWithMetadata(r.Context(), traceID, req)
|
||||
if apiErr != nil {
|
||||
RespondError(w, apiErr, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.WriteJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) GetFlamegraphSpansForTrace(w http.ResponseWriter, r *http.Request) {
|
||||
traceID := mux.Vars(r)["traceId"]
|
||||
if traceID == "" {
|
||||
RespondError(w, model.BadRequest(errors.New("traceID is required")), nil)
|
||||
return
|
||||
}
|
||||
|
||||
req := new(model.GetFlamegraphSpansForTraceParams)
|
||||
err := json.NewDecoder(r.Body).Decode(&req)
|
||||
if err != nil {
|
||||
RespondError(w, model.BadRequest(err), nil)
|
||||
return
|
||||
}
|
||||
|
||||
result, apiErr := aH.reader.GetFlamegraphSpansForTrace(r.Context(), traceID, req)
|
||||
if apiErr != nil {
|
||||
RespondError(w, apiErr, nil)
|
||||
return
|
||||
}
|
||||
|
||||
aH.WriteJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) listErrors(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query, err := parseListErrorsRequest(r)
|
||||
|
@ -1384,6 +1384,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
"",
|
||||
true,
|
||||
true,
|
||||
time.Duration(time.Second),
|
||||
nil,
|
||||
)
|
||||
|
||||
q := &querier{
|
||||
|
@ -1438,6 +1438,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
"",
|
||||
true,
|
||||
true,
|
||||
time.Duration(time.Second),
|
||||
nil,
|
||||
)
|
||||
|
||||
q := &querier{
|
||||
|
@ -67,6 +67,7 @@ type ServerOptions struct {
|
||||
DialTimeout time.Duration
|
||||
CacheConfigPath string
|
||||
FluxInterval string
|
||||
FluxIntervalForTraceDetail string
|
||||
Cluster string
|
||||
UseLogsNewSchema bool
|
||||
UseTraceNewSchema bool
|
||||
@ -120,6 +121,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
|
||||
readerReady := make(chan bool)
|
||||
|
||||
fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var reader interfaces.Reader
|
||||
storage := os.Getenv("STORAGE")
|
||||
if storage == "clickhouse" {
|
||||
@ -134,6 +140,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
serverOptions.Cluster,
|
||||
serverOptions.UseLogsNewSchema,
|
||||
serverOptions.UseTraceNewSchema,
|
||||
fluxIntervalForTraceDetail,
|
||||
serverOptions.SigNoz.Cache,
|
||||
)
|
||||
go clickhouseReader.Start(readerReady)
|
||||
reader = clickhouseReader
|
||||
@ -148,6 +156,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var c cache.Cache
|
||||
if serverOptions.CacheConfigPath != "" {
|
||||
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
|
||||
|
116
pkg/query-service/app/traces/tracedetail/flamegraph.go
Normal file
116
pkg/query-service/app/traces/tracedetail/flamegraph.go
Normal file
@ -0,0 +1,116 @@
|
||||
package tracedetail
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
)
|
||||
|
||||
var (
|
||||
SPAN_LIMIT_PER_REQUEST_FOR_FLAMEGRAPH float64 = 50
|
||||
)
|
||||
|
||||
func ContainsFlamegraphSpan(slice []*model.FlamegraphSpan, item *model.FlamegraphSpan) bool {
|
||||
for _, v := range slice {
|
||||
if v.SpanID == item.SpanID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func BfsTraversalForTrace(span *model.FlamegraphSpan, level int64) map[int64][]*model.FlamegraphSpan {
|
||||
bfs := map[int64][]*model.FlamegraphSpan{}
|
||||
bfs[level] = []*model.FlamegraphSpan{span}
|
||||
|
||||
for _, child := range span.Children {
|
||||
childBfsMap := BfsTraversalForTrace(child, level+1)
|
||||
for _level, nodes := range childBfsMap {
|
||||
bfs[_level] = append(bfs[_level], nodes...)
|
||||
}
|
||||
}
|
||||
span.Level = level
|
||||
span.Children = make([]*model.FlamegraphSpan, 0)
|
||||
|
||||
return bfs
|
||||
}
|
||||
|
||||
func FindIndexForSelectedSpan(spans [][]*model.FlamegraphSpan, selectedSpanId string) int {
|
||||
var selectedSpanLevel int = 0
|
||||
|
||||
for index, _spans := range spans {
|
||||
if len(_spans) > 0 && _spans[0].SpanID == selectedSpanId {
|
||||
selectedSpanLevel = index
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return selectedSpanLevel
|
||||
}
|
||||
|
||||
func GetSelectedSpansForFlamegraph(traceRoots []*model.FlamegraphSpan, spanIdToSpanNodeMap map[string]*model.FlamegraphSpan) [][]*model.FlamegraphSpan {
|
||||
|
||||
var traceIdLevelledFlamegraph = map[string]map[int64][]*model.FlamegraphSpan{}
|
||||
selectedSpans := [][]*model.FlamegraphSpan{}
|
||||
|
||||
// sort the trace roots to add missing spans at the right order
|
||||
sort.Slice(traceRoots, func(i, j int) bool {
|
||||
if traceRoots[i].TimeUnixNano == traceRoots[j].TimeUnixNano {
|
||||
return traceRoots[i].Name < traceRoots[j].Name
|
||||
}
|
||||
return traceRoots[i].TimeUnixNano < traceRoots[j].TimeUnixNano
|
||||
})
|
||||
|
||||
for _, rootSpanID := range traceRoots {
|
||||
if rootNode, exists := spanIdToSpanNodeMap[rootSpanID.SpanID]; exists {
|
||||
bfsMapForTrace := BfsTraversalForTrace(rootNode, 0)
|
||||
traceIdLevelledFlamegraph[rootSpanID.SpanID] = bfsMapForTrace
|
||||
}
|
||||
}
|
||||
|
||||
for _, trace := range traceRoots {
|
||||
keys := make([]int64, 0, len(traceIdLevelledFlamegraph[trace.SpanID]))
|
||||
for key := range traceIdLevelledFlamegraph[trace.SpanID] {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
||||
sort.Slice(keys, func(i, j int) bool {
|
||||
return keys[i] < keys[j]
|
||||
})
|
||||
|
||||
for _, level := range keys {
|
||||
if ok, exists := traceIdLevelledFlamegraph[trace.SpanID][level]; exists {
|
||||
selectedSpans = append(selectedSpans, ok)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return selectedSpans
|
||||
}
|
||||
|
||||
func GetSelectedSpansForFlamegraphForRequest(selectedSpanID string, selectedSpans [][]*model.FlamegraphSpan) [][]*model.FlamegraphSpan {
|
||||
var selectedIndex = 0
|
||||
|
||||
if selectedSpanID != "" {
|
||||
selectedIndex = FindIndexForSelectedSpan(selectedSpans, selectedSpanID)
|
||||
}
|
||||
|
||||
lowerLimit := selectedIndex - int(SPAN_LIMIT_PER_REQUEST_FOR_FLAMEGRAPH*0.4)
|
||||
upperLimit := selectedIndex + int(SPAN_LIMIT_PER_REQUEST_FOR_FLAMEGRAPH*0.6)
|
||||
|
||||
if lowerLimit < 0 {
|
||||
upperLimit = upperLimit - lowerLimit
|
||||
lowerLimit = 0
|
||||
}
|
||||
|
||||
if upperLimit > len(selectedSpans) {
|
||||
lowerLimit = lowerLimit - (upperLimit - len(selectedSpans))
|
||||
upperLimit = len(selectedSpans)
|
||||
}
|
||||
|
||||
if lowerLimit < 0 {
|
||||
lowerLimit = 0
|
||||
}
|
||||
|
||||
return selectedSpans[lowerLimit:upperLimit]
|
||||
}
|
214
pkg/query-service/app/traces/tracedetail/waterfall.go
Normal file
214
pkg/query-service/app/traces/tracedetail/waterfall.go
Normal file
@ -0,0 +1,214 @@
|
||||
package tracedetail
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"sort"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
)
|
||||
|
||||
var (
|
||||
SPAN_LIMIT_PER_REQUEST_FOR_WATERFALL float64 = 500
|
||||
)
|
||||
|
||||
type Interval struct {
|
||||
StartTime uint64
|
||||
Duration uint64
|
||||
Service string
|
||||
}
|
||||
|
||||
func mergeIntervals(intervals []Interval) []Interval {
|
||||
if len(intervals) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var merged []Interval
|
||||
current := intervals[0]
|
||||
|
||||
for i := 1; i < len(intervals); i++ {
|
||||
next := intervals[i]
|
||||
if current.StartTime+current.Duration >= next.StartTime {
|
||||
endTime := max(current.StartTime+current.Duration, next.StartTime+next.Duration)
|
||||
current.Duration = endTime - current.StartTime
|
||||
} else {
|
||||
merged = append(merged, current)
|
||||
current = next
|
||||
}
|
||||
}
|
||||
// Add the last interval
|
||||
merged = append(merged, current)
|
||||
|
||||
return merged
|
||||
}
|
||||
|
||||
func ContainsWaterfallSpan(slice []*model.Span, item *model.Span) bool {
|
||||
for _, v := range slice {
|
||||
if v.SpanID == item.SpanID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func findIndexForSelectedSpanFromPreOrder(spans []*model.Span, selectedSpanId string) int {
|
||||
var selectedSpanIndex = -1
|
||||
|
||||
for index, span := range spans {
|
||||
if span.SpanID == selectedSpanId {
|
||||
selectedSpanIndex = index
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return selectedSpanIndex
|
||||
}
|
||||
|
||||
func getPathFromRootToSelectedSpanId(node *model.Span, selectedSpanId string, uncollapsedSpans []string, isSelectedSpanIDUnCollapsed bool) (bool, []string) {
|
||||
spansFromRootToNode := []string{}
|
||||
|
||||
if node.SpanID == selectedSpanId {
|
||||
if isSelectedSpanIDUnCollapsed {
|
||||
spansFromRootToNode = append(spansFromRootToNode, node.SpanID)
|
||||
}
|
||||
return true, spansFromRootToNode
|
||||
}
|
||||
|
||||
isPresentInSubtreeForTheNode := false
|
||||
for _, child := range node.Children {
|
||||
isPresentInThisSubtree, _spansFromRootToNode := getPathFromRootToSelectedSpanId(child, selectedSpanId, uncollapsedSpans, isSelectedSpanIDUnCollapsed)
|
||||
// if the interested node is present in the given subtree then add the span node to uncollapsed node list
|
||||
if isPresentInThisSubtree {
|
||||
if !slices.Contains(uncollapsedSpans, node.SpanID) {
|
||||
spansFromRootToNode = append(spansFromRootToNode, node.SpanID)
|
||||
}
|
||||
isPresentInSubtreeForTheNode = true
|
||||
spansFromRootToNode = append(spansFromRootToNode, _spansFromRootToNode...)
|
||||
}
|
||||
}
|
||||
return isPresentInSubtreeForTheNode, spansFromRootToNode
|
||||
}
|
||||
|
||||
func traverseTrace(span *model.Span, uncollapsedSpans []string, level uint64, isPartOfPreOrder bool, hasSibling bool, selectedSpanId string) []*model.Span {
|
||||
preOrderTraversal := []*model.Span{}
|
||||
|
||||
// sort the children to maintain the order across requests
|
||||
sort.Slice(span.Children, func(i, j int) bool {
|
||||
if span.Children[i].TimeUnixNano == span.Children[j].TimeUnixNano {
|
||||
return span.Children[i].Name < span.Children[j].Name
|
||||
}
|
||||
return span.Children[i].TimeUnixNano < span.Children[j].TimeUnixNano
|
||||
})
|
||||
|
||||
span.SubTreeNodeCount = 0
|
||||
nodeWithoutChildren := model.Span{
|
||||
SpanID: span.SpanID,
|
||||
TraceID: span.TraceID,
|
||||
ServiceName: span.ServiceName,
|
||||
TimeUnixNano: span.TimeUnixNano,
|
||||
Name: span.Name,
|
||||
Kind: int32(span.Kind),
|
||||
DurationNano: span.DurationNano,
|
||||
HasError: span.HasError,
|
||||
StatusMessage: span.StatusMessage,
|
||||
StatusCodeString: span.StatusCodeString,
|
||||
SpanKind: span.SpanKind,
|
||||
References: span.References,
|
||||
Events: span.Events,
|
||||
TagMap: span.TagMap,
|
||||
Children: make([]*model.Span, 0),
|
||||
HasChildren: len(span.Children) > 0,
|
||||
Level: level,
|
||||
HasSiblings: hasSibling,
|
||||
SubTreeNodeCount: 0,
|
||||
}
|
||||
|
||||
if isPartOfPreOrder {
|
||||
preOrderTraversal = append(preOrderTraversal, &nodeWithoutChildren)
|
||||
}
|
||||
|
||||
for index, child := range span.Children {
|
||||
_childTraversal := traverseTrace(child, uncollapsedSpans, level+1, isPartOfPreOrder && slices.Contains(uncollapsedSpans, span.SpanID), index != (len(span.Children)-1), selectedSpanId)
|
||||
preOrderTraversal = append(preOrderTraversal, _childTraversal...)
|
||||
nodeWithoutChildren.SubTreeNodeCount += child.SubTreeNodeCount + 1
|
||||
span.SubTreeNodeCount += child.SubTreeNodeCount + 1
|
||||
}
|
||||
|
||||
nodeWithoutChildren.SubTreeNodeCount += 1
|
||||
return preOrderTraversal
|
||||
|
||||
}
|
||||
|
||||
func CalculateServiceTime(serviceIntervals map[string][]Interval) map[string]uint64 {
|
||||
totalTimes := make(map[string]uint64)
|
||||
|
||||
for service, serviceIntervals := range serviceIntervals {
|
||||
sort.Slice(serviceIntervals, func(i, j int) bool {
|
||||
return serviceIntervals[i].StartTime < serviceIntervals[j].StartTime
|
||||
})
|
||||
mergedIntervals := mergeIntervals(serviceIntervals)
|
||||
totalTime := uint64(0)
|
||||
for _, interval := range mergedIntervals {
|
||||
totalTime += interval.Duration
|
||||
}
|
||||
totalTimes[service] = totalTime
|
||||
}
|
||||
|
||||
return totalTimes
|
||||
}
|
||||
|
||||
func GetSelectedSpans(uncollapsedSpans []string, selectedSpanID string, traceRoots []*model.Span, spanIdToSpanNodeMap map[string]*model.Span, isSelectedSpanIDUnCollapsed bool) ([]*model.Span, []string, string, string) {
|
||||
|
||||
var preOrderTraversal = []*model.Span{}
|
||||
var rootServiceName, rootServiceEntryPoint string
|
||||
updatedUncollapsedSpans := uncollapsedSpans
|
||||
|
||||
selectedSpanIndex := -1
|
||||
for _, rootSpanID := range traceRoots {
|
||||
if rootNode, exists := spanIdToSpanNodeMap[rootSpanID.SpanID]; exists {
|
||||
_, spansFromRootToNode := getPathFromRootToSelectedSpanId(rootNode, selectedSpanID, updatedUncollapsedSpans, isSelectedSpanIDUnCollapsed)
|
||||
updatedUncollapsedSpans = append(updatedUncollapsedSpans, spansFromRootToNode...)
|
||||
|
||||
_preOrderTraversal := traverseTrace(rootNode, updatedUncollapsedSpans, 0, true, false, selectedSpanID)
|
||||
_selectedSpanIndex := findIndexForSelectedSpanFromPreOrder(_preOrderTraversal, selectedSpanID)
|
||||
|
||||
if _selectedSpanIndex != -1 {
|
||||
selectedSpanIndex = _selectedSpanIndex + len(preOrderTraversal)
|
||||
}
|
||||
|
||||
preOrderTraversal = append(preOrderTraversal, _preOrderTraversal...)
|
||||
|
||||
if rootServiceName == "" {
|
||||
rootServiceName = rootNode.ServiceName
|
||||
}
|
||||
|
||||
if rootServiceEntryPoint == "" {
|
||||
rootServiceEntryPoint = rootNode.Name
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if we couldn't find the selectedSpan in the trace then defaulting the selected index to 0
|
||||
if selectedSpanIndex == -1 && selectedSpanID != "" {
|
||||
selectedSpanIndex = 0
|
||||
}
|
||||
|
||||
// get the 0.4*[span limit] before the interested span index
|
||||
startIndex := selectedSpanIndex - int(SPAN_LIMIT_PER_REQUEST_FOR_WATERFALL*0.4)
|
||||
// get the 0.6*[span limit] after the intrested span index
|
||||
endIndex := selectedSpanIndex + int(SPAN_LIMIT_PER_REQUEST_FOR_WATERFALL*0.6)
|
||||
|
||||
// adjust the sliding window according to the available left and right spaces.
|
||||
if startIndex < 0 {
|
||||
endIndex = endIndex - startIndex
|
||||
startIndex = 0
|
||||
}
|
||||
if endIndex > len(preOrderTraversal) {
|
||||
startIndex = startIndex - (endIndex - len(preOrderTraversal))
|
||||
endIndex = len(preOrderTraversal)
|
||||
}
|
||||
if startIndex < 0 {
|
||||
startIndex = 0
|
||||
}
|
||||
|
||||
return preOrderTraversal[startIndex:endIndex], updatedUncollapsedSpans, rootServiceName, rootServiceEntryPoint
|
||||
}
|
@ -41,6 +41,8 @@ type Reader interface {
|
||||
|
||||
// Search Interfaces
|
||||
SearchTraces(ctx context.Context, params *model.SearchTracesParams, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error)
|
||||
GetWaterfallSpansForTraceWithMetadata(ctx context.Context, traceID string, req *model.GetWaterfallSpansForTraceWithMetadataParams) (*model.GetWaterfallSpansForTraceWithMetadataResponse, *model.ApiError)
|
||||
GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError)
|
||||
|
||||
// Setter Interfaces
|
||||
SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
|
||||
|
@ -45,7 +45,7 @@ func main() {
|
||||
var useLogsNewSchema bool
|
||||
var useTraceNewSchema bool
|
||||
// the url used to build link in the alert messages in slack and other systems
|
||||
var ruleRepoURL, cacheConfigPath, fluxInterval string
|
||||
var ruleRepoURL, cacheConfigPath, fluxInterval, fluxIntervalForTraceDetail string
|
||||
var cluster string
|
||||
|
||||
var preferSpanMetrics bool
|
||||
@ -63,6 +63,7 @@ func main() {
|
||||
flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)")
|
||||
flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)")
|
||||
flag.StringVar(&fluxInterval, "flux-interval", "5m", "(the interval to exclude data from being cached to avoid incorrect cache for data in motion)")
|
||||
flag.StringVar(&fluxIntervalForTraceDetail, "flux-interval-trace-detail", "2m", "(the interval to exclude data from being cached to avoid incorrect cache for trace data in motion)")
|
||||
flag.StringVar(&cluster, "cluster", "cluster", "(cluster name - defaults to 'cluster')")
|
||||
// Allow using the consistent naming with the signoz collector
|
||||
flag.StringVar(&cluster, "cluster-name", "cluster", "(cluster name - defaults to 'cluster')")
|
||||
@ -108,6 +109,7 @@ func main() {
|
||||
DialTimeout: dialTimeout,
|
||||
CacheConfigPath: cacheConfigPath,
|
||||
FluxInterval: fluxInterval,
|
||||
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
|
||||
Cluster: cluster,
|
||||
UseLogsNewSchema: useLogsNewSchema,
|
||||
UseTraceNewSchema: useTraceNewSchema,
|
||||
|
36
pkg/query-service/model/cacheable.go
Normal file
36
pkg/query-service/model/cacheable.go
Normal file
@ -0,0 +1,36 @@
|
||||
package model
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
type GetWaterfallSpansForTraceWithMetadataCache struct {
|
||||
StartTime uint64 `json:"startTime"`
|
||||
EndTime uint64 `json:"endTime"`
|
||||
DurationNano uint64 `json:"durationNano"`
|
||||
TotalSpans uint64 `json:"totalSpans"`
|
||||
TotalErrorSpans uint64 `json:"totalErrorSpans"`
|
||||
ServiceNameToTotalDurationMap map[string]uint64 `json:"serviceNameToTotalDurationMap"`
|
||||
SpanIdToSpanNodeMap map[string]*Span `json:"spanIdToSpanNodeMap"`
|
||||
TraceRoots []*Span `json:"traceRoots"`
|
||||
}
|
||||
|
||||
func (c *GetWaterfallSpansForTraceWithMetadataCache) MarshalBinary() (data []byte, err error) {
|
||||
return json.Marshal(c)
|
||||
}
|
||||
func (c *GetWaterfallSpansForTraceWithMetadataCache) UnmarshalBinary(data []byte) error {
|
||||
return json.Unmarshal(data, c)
|
||||
}
|
||||
|
||||
type GetFlamegraphSpansForTraceCache struct {
|
||||
StartTime uint64 `json:"startTime"`
|
||||
EndTime uint64 `json:"endTime"`
|
||||
DurationNano uint64 `json:"durationNano"`
|
||||
SelectedSpans [][]*FlamegraphSpan `json:"selectedSpans"`
|
||||
TraceRoots []*FlamegraphSpan `json:"traceRoots"`
|
||||
}
|
||||
|
||||
func (c *GetFlamegraphSpansForTraceCache) MarshalBinary() (data []byte, err error) {
|
||||
return json.Marshal(c)
|
||||
}
|
||||
func (c *GetFlamegraphSpansForTraceCache) UnmarshalBinary(data []byte) error {
|
||||
return json.Unmarshal(data, c)
|
||||
}
|
@ -315,6 +315,16 @@ type SearchTracesParams struct {
|
||||
MaxSpansInTrace int `json:"maxSpansInTrace"`
|
||||
}
|
||||
|
||||
type GetWaterfallSpansForTraceWithMetadataParams struct {
|
||||
SelectedSpanID string `json:"selectedSpanId"`
|
||||
IsSelectedSpanIDUnCollapsed bool `json:"isSelectedSpanIDUnCollapsed"`
|
||||
UncollapsedSpans []string `json:"uncollapsedSpans"`
|
||||
}
|
||||
|
||||
type GetFlamegraphSpansForTraceParams struct {
|
||||
SelectedSpanID string `json:"selectedSpanId"`
|
||||
}
|
||||
|
||||
type SpanFilterParams struct {
|
||||
TraceID []string `json:"traceID"`
|
||||
Status []string `json:"status"`
|
||||
|
@ -118,6 +118,13 @@ func ForbiddenError(err error) *ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
func ExecutionError(err error) *ApiError {
|
||||
return &ApiError{
|
||||
Typ: ErrorExec,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
func WrapApiError(err *ApiError, msg string) *ApiError {
|
||||
return &ApiError{
|
||||
Typ: err.Type(),
|
||||
@ -269,6 +276,67 @@ type SearchSpanResponseItem struct {
|
||||
SpanKind string `json:"spanKind"`
|
||||
}
|
||||
|
||||
type Span struct {
|
||||
TimeUnixNano uint64 `json:"timestamp"`
|
||||
DurationNano uint64 `json:"durationNano"`
|
||||
SpanID string `json:"spanId"`
|
||||
RootSpanID string `json:"rootSpanId"`
|
||||
TraceID string `json:"traceId"`
|
||||
HasError bool `json:"hasError"`
|
||||
Kind int32 `json:"kind"`
|
||||
ServiceName string `json:"serviceName"`
|
||||
Name string `json:"name"`
|
||||
References []OtelSpanRef `json:"references,omitempty"`
|
||||
TagMap map[string]string `json:"tagMap"`
|
||||
Events []string `json:"event"`
|
||||
RootName string `json:"rootName"`
|
||||
StatusMessage string `json:"statusMessage"`
|
||||
StatusCodeString string `json:"statusCodeString"`
|
||||
SpanKind string `json:"spanKind"`
|
||||
Children []*Span `json:"children"`
|
||||
|
||||
// the below two fields are for frontend to render the spans
|
||||
SubTreeNodeCount uint64 `json:"subTreeNodeCount"`
|
||||
HasChildren bool `json:"hasChildren"`
|
||||
HasSiblings bool `json:"hasSiblings"`
|
||||
Level uint64 `json:"level"`
|
||||
}
|
||||
|
||||
type FlamegraphSpan struct {
|
||||
TimeUnixNano uint64 `json:"timestamp"`
|
||||
DurationNano uint64 `json:"durationNano"`
|
||||
SpanID string `json:"spanId"`
|
||||
TraceID string `json:"traceId"`
|
||||
HasError bool `json:"hasError"`
|
||||
ServiceName string `json:"serviceName"`
|
||||
Name string `json:"name"`
|
||||
Level int64 `json:"level"`
|
||||
References []OtelSpanRef `json:"references,omitempty"`
|
||||
Children []*FlamegraphSpan `json:"children"`
|
||||
}
|
||||
|
||||
type GetWaterfallSpansForTraceWithMetadataResponse struct {
|
||||
StartTimestampMillis uint64 `json:"startTimestampMillis"`
|
||||
EndTimestampMillis uint64 `json:"endTimestampMillis"`
|
||||
DurationNano uint64 `json:"durationNano"`
|
||||
RootServiceName string `json:"rootServiceName"`
|
||||
RootServiceEntryPoint string `json:"rootServiceEntryPoint"`
|
||||
TotalSpansCount uint64 `json:"totalSpansCount"`
|
||||
TotalErrorSpansCount uint64 `json:"totalErrorSpansCount"`
|
||||
ServiceNameToTotalDurationMap map[string]uint64 `json:"serviceNameToTotalDurationMap"`
|
||||
Spans []*Span `json:"spans"`
|
||||
HasMissingSpans bool `json:"hasMissingSpans"`
|
||||
// this is needed for frontend and query service sync
|
||||
UncollapsedSpans []string `json:"uncollapsedSpans"`
|
||||
}
|
||||
|
||||
type GetFlamegraphSpansForTraceResponse struct {
|
||||
StartTimestampMillis uint64 `json:"startTimestampMillis"`
|
||||
EndTimestampMillis uint64 `json:"endTimestampMillis"`
|
||||
DurationNano uint64 `json:"durationNano"`
|
||||
Spans [][]*FlamegraphSpan `json:"spans"`
|
||||
}
|
||||
|
||||
type OtelSpanRef struct {
|
||||
TraceId string `json:"traceId,omitempty"`
|
||||
SpanId string `json:"spanId,omitempty"`
|
||||
|
@ -20,6 +20,7 @@ type SpanItemV2 struct {
|
||||
StatusMessage string `ch:"status_message"`
|
||||
StatusCodeString string `ch:"status_code_string"`
|
||||
SpanKind string `ch:"kind_string"`
|
||||
ParentSpanId string `ch:"parent_span_id"`
|
||||
}
|
||||
|
||||
type TraceSummary struct {
|
||||
|
@ -1241,7 +1241,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
||||
}
|
||||
|
||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||
@ -1340,7 +1340,7 @@ func TestThresholdRuleNoData(t *testing.T) {
|
||||
}
|
||||
|
||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||
@ -1448,7 +1448,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
|
||||
}
|
||||
|
||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||
@ -1573,7 +1573,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
|
||||
}
|
||||
|
||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true)
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||
|
@ -47,6 +47,8 @@ func NewMockClickhouseReader(
|
||||
"",
|
||||
true,
|
||||
true,
|
||||
time.Duration(time.Second),
|
||||
nil,
|
||||
)
|
||||
|
||||
return reader, mockDB
|
||||
|
Loading…
x
Reference in New Issue
Block a user