mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 17:39:03 +08:00
feat: faster traceID based filtering (#5607)
* feat: faster traceID based filtering * chore: add error log
This commit is contained in:
parent
f300518d61
commit
aef935a817
@ -5001,3 +5001,27 @@ func (r *ClickHouseReader) LiveTailLogsV3(ctx context.Context, query string, tim
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) GetMinAndMaxTimestampForTraceID(ctx context.Context, traceID []string) (int64, int64, error) {
|
||||||
|
var minTime, maxTime time.Time
|
||||||
|
|
||||||
|
query := fmt.Sprintf("SELECT min(timestamp), max(timestamp) FROM %s.%s WHERE traceID IN ('%s')",
|
||||||
|
r.TraceDB, r.SpansTable, strings.Join(traceID, "','"))
|
||||||
|
|
||||||
|
zap.L().Debug("GetMinAndMaxTimestampForTraceID", zap.String("query", query))
|
||||||
|
|
||||||
|
err := r.db.QueryRow(ctx, query).Scan(&minTime, &maxTime)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("Error while executing query", zap.Error(err))
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if minTime.IsZero() || maxTime.IsZero() {
|
||||||
|
zap.L().Debug("minTime or maxTime is zero")
|
||||||
|
return 0, 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
zap.L().Debug("GetMinAndMaxTimestampForTraceID", zap.Any("minTime", minTime), zap.Any("maxTime", maxTime))
|
||||||
|
|
||||||
|
return minTime.UnixNano(), maxTime.UnixNano(), nil
|
||||||
|
}
|
||||||
|
@ -3213,6 +3213,22 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WARN: Only works for AND operator in traces query
|
||||||
|
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
|
||||||
|
// check if traceID is used as filter (with equal/similar operator) in traces query if yes add timestamp filter to queryRange params
|
||||||
|
isUsed, traceIDs := tracesV3.TraceIdFilterUsedWithEqual(queryRangeParams)
|
||||||
|
if isUsed == true && len(traceIDs) > 0 {
|
||||||
|
zap.L().Debug("traceID used as filter in traces query")
|
||||||
|
// query signoz_spans table with traceID to get min and max timestamp
|
||||||
|
min, max, err := aH.reader.GetMinAndMaxTimestampForTraceID(ctx, traceIDs)
|
||||||
|
if err == nil {
|
||||||
|
// add timestamp filter to queryRange params
|
||||||
|
tracesV3.AddTimestampFilters(min, max, queryRangeParams)
|
||||||
|
zap.L().Debug("post adding timestamp filter in traces query", zap.Any("queryRangeParams", queryRangeParams))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
result, errQuriesByName, err = aH.querier.QueryRange(ctx, queryRangeParams, spanKeys)
|
result, errQuriesByName, err = aH.querier.QueryRange(ctx, queryRangeParams, spanKeys)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -3482,6 +3498,22 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WARN: Only works for AND operator in traces query
|
||||||
|
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
|
||||||
|
// check if traceID is used as filter (with equal/similar operator) in traces query if yes add timestamp filter to queryRange params
|
||||||
|
isUsed, traceIDs := tracesV3.TraceIdFilterUsedWithEqual(queryRangeParams)
|
||||||
|
if isUsed == true && len(traceIDs) > 0 {
|
||||||
|
zap.L().Debug("traceID used as filter in traces query")
|
||||||
|
// query signoz_spans table with traceID to get min and max timestamp
|
||||||
|
min, max, err := aH.reader.GetMinAndMaxTimestampForTraceID(ctx, traceIDs)
|
||||||
|
if err == nil {
|
||||||
|
// add timestamp filter to queryRange params
|
||||||
|
tracesV3.AddTimestampFilters(min, max, queryRangeParams)
|
||||||
|
zap.L().Debug("post adding timestamp filter in traces query", zap.Any("queryRangeParams", queryRangeParams))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
result, errQuriesByName, err = aH.querierV2.QueryRange(ctx, queryRangeParams, spanKeys)
|
result, errQuriesByName, err = aH.querierV2.QueryRange(ctx, queryRangeParams, spanKeys)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
183
pkg/query-service/app/traces/v3/utils.go
Normal file
183
pkg/query-service/app/traces/v3/utils.go
Normal file
@ -0,0 +1,183 @@
|
|||||||
|
package v3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// check if traceId filter is used in traces query and return the list of traceIds
|
||||||
|
func TraceIdFilterUsedWithEqual(params *v3.QueryRangeParamsV3) (bool, []string) {
|
||||||
|
compositeQuery := params.CompositeQuery
|
||||||
|
if compositeQuery == nil {
|
||||||
|
return false, []string{}
|
||||||
|
}
|
||||||
|
var traceIds []string
|
||||||
|
var traceIdFilterUsed bool
|
||||||
|
|
||||||
|
// Build queries for each builder query
|
||||||
|
for queryName, query := range compositeQuery.BuilderQueries {
|
||||||
|
if query.Expression != queryName && query.DataSource != v3.DataSourceTraces {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// check filter attribute
|
||||||
|
if query.Filters != nil && len(query.Filters.Items) != 0 {
|
||||||
|
for _, item := range query.Filters.Items {
|
||||||
|
|
||||||
|
if item.Key.Key == "traceID" && (item.Operator == v3.FilterOperatorIn ||
|
||||||
|
item.Operator == v3.FilterOperatorEqual) {
|
||||||
|
traceIdFilterUsed = true
|
||||||
|
// validate value
|
||||||
|
var err error
|
||||||
|
val := item.Value
|
||||||
|
val, err = utils.ValidateAndCastValue(val, item.Key.DataType)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("invalid value for key", zap.String("key", item.Key.Key), zap.Error(err))
|
||||||
|
return false, []string{}
|
||||||
|
}
|
||||||
|
if val != nil {
|
||||||
|
fmtVal := extractFormattedStringValues(val)
|
||||||
|
traceIds = append(traceIds, fmtVal...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
zap.L().Debug("traceIds", zap.Any("traceIds", traceIds))
|
||||||
|
return traceIdFilterUsed, traceIds
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractFormattedStringValues(v interface{}) []string {
|
||||||
|
// if it's pointer convert it to a value
|
||||||
|
v = getPointerValue(v)
|
||||||
|
|
||||||
|
switch x := v.(type) {
|
||||||
|
case string:
|
||||||
|
return []string{x}
|
||||||
|
|
||||||
|
case []interface{}:
|
||||||
|
if len(x) == 0 {
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
switch x[0].(type) {
|
||||||
|
case string:
|
||||||
|
values := []string{}
|
||||||
|
for _, val := range x {
|
||||||
|
values = append(values, val.(string))
|
||||||
|
}
|
||||||
|
return values
|
||||||
|
default:
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPointerValue(v interface{}) interface{} {
|
||||||
|
switch x := v.(type) {
|
||||||
|
case *uint8:
|
||||||
|
return *x
|
||||||
|
case *uint16:
|
||||||
|
return *x
|
||||||
|
case *uint32:
|
||||||
|
return *x
|
||||||
|
case *uint64:
|
||||||
|
return *x
|
||||||
|
case *int:
|
||||||
|
return *x
|
||||||
|
case *int8:
|
||||||
|
return *x
|
||||||
|
case *int16:
|
||||||
|
return *x
|
||||||
|
case *int32:
|
||||||
|
return *x
|
||||||
|
case *int64:
|
||||||
|
return *x
|
||||||
|
case *float32:
|
||||||
|
return *x
|
||||||
|
case *float64:
|
||||||
|
return *x
|
||||||
|
case *string:
|
||||||
|
return *x
|
||||||
|
case *bool:
|
||||||
|
return *x
|
||||||
|
case []interface{}:
|
||||||
|
values := []interface{}{}
|
||||||
|
for _, val := range x {
|
||||||
|
values = append(values, getPointerValue(val))
|
||||||
|
}
|
||||||
|
return values
|
||||||
|
default:
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func AddTimestampFilters(minTime int64, maxTime int64, params *v3.QueryRangeParamsV3) {
|
||||||
|
if minTime == 0 && maxTime == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
compositeQuery := params.CompositeQuery
|
||||||
|
if compositeQuery == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Build queries for each builder query
|
||||||
|
for queryName, query := range compositeQuery.BuilderQueries {
|
||||||
|
if query.Expression != queryName && query.DataSource != v3.DataSourceTraces {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
addTimeStampFilter := false
|
||||||
|
|
||||||
|
// check filter attribute
|
||||||
|
if query.Filters != nil && len(query.Filters.Items) != 0 {
|
||||||
|
for _, item := range query.Filters.Items {
|
||||||
|
if item.Key.Key == "traceID" && (item.Operator == v3.FilterOperatorIn ||
|
||||||
|
item.Operator == v3.FilterOperatorEqual) {
|
||||||
|
addTimeStampFilter = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// add timestamp filter to query only if traceID filter along with equal/similar operator is used
|
||||||
|
if addTimeStampFilter {
|
||||||
|
timeFilters := []v3.FilterItem{
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "timestamp",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
IsColumn: true,
|
||||||
|
},
|
||||||
|
Value: strconv.FormatUint(uint64(minTime), 10),
|
||||||
|
Operator: v3.FilterOperatorGreaterThanOrEq,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "timestamp",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
IsColumn: true,
|
||||||
|
},
|
||||||
|
Value: strconv.FormatUint(uint64(maxTime), 10),
|
||||||
|
Operator: v3.FilterOperatorLessThanOrEq,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// add new timestamp filter to query
|
||||||
|
if query.Filters == nil {
|
||||||
|
query.Filters = &v3.FilterSet{
|
||||||
|
Items: timeFilters,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
query.Filters.Items = append(query.Filters.Items, timeFilters...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -103,6 +103,8 @@ type Reader interface {
|
|||||||
CheckClickHouse(ctx context.Context) error
|
CheckClickHouse(ctx context.Context) error
|
||||||
|
|
||||||
GetMetricMetadata(context.Context, string, string) (*v3.MetricMetadataResponse, error)
|
GetMetricMetadata(context.Context, string, string) (*v3.MetricMetadataResponse, error)
|
||||||
|
|
||||||
|
GetMinAndMaxTimestampForTraceID(ctx context.Context, traceID []string) (int64, int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Querier interface {
|
type Querier interface {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user