From 8f2e8cccb443bb77898e8cf4730219b766d568e0 Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Sat, 8 Mar 2025 11:42:20 +0530 Subject: [PATCH] New autocomplete endpoint with filters (#7241) * feat: new autocomplete endpoint with filters Signed-off-by: Shivanshu Raj Shrivastava --- .../app/clickhouseReader/options.go | 7 ++ .../app/clickhouseReader/reader.go | 109 ++++++++++++++++++ pkg/query-service/app/http_handler.go | 35 ++++++ pkg/query-service/app/parser.go | 19 +++ pkg/query-service/model/v3/v3.go | 54 ++++++++- 5 files changed, 221 insertions(+), 3 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index 5022088f5a..c8f649171c 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -48,6 +48,9 @@ const ( defaultTraceLocalTableName string = "signoz_index_v3" defaultTraceResourceTableV3 string = "distributed_traces_v3_resource" defaultTraceSummaryTable string = "distributed_trace_summary" + + defaultMetadataDB string = "signoz_metadata" + defaultMetadataTable string = "distributed_attributes_metadata" ) // NamespaceConfig is Clickhouse's internal configuration data @@ -88,6 +91,8 @@ type namespaceConfig struct { TraceLocalTableNameV3 string TraceResourceTableV3 string TraceSummaryTable string + MetadataDB string + MetadataTable string } // Connecto defines how to connect to the database @@ -141,6 +146,8 @@ func NewOptions( TraceLocalTableNameV3: defaultTraceLocalTableName, TraceResourceTableV3: defaultTraceResourceTableV3, TraceSummaryTable: defaultTraceSummaryTable, + MetadataDB: defaultMetadataDB, + MetadataTable: defaultMetadataTable, }, others: make(map[string]*namespaceConfig, len(otherNamespaces)), } diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index bfc65f62b2..f7af28296b 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -164,6 +164,8 @@ type ClickHouseReader struct { fluxIntervalForTraceDetail time.Duration cache cache.Cache + metadataDB string + metadataTable string } // NewTraceReader returns a TraceReader for the database @@ -259,6 +261,8 @@ func NewReaderFromClickhouseConnection( fluxIntervalForTraceDetail: fluxIntervalForTraceDetail, cache: cache, + metadataDB: options.primary.MetadataDB, + metadataTable: options.primary.MetadataTable, } } @@ -4126,6 +4130,97 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt return &response, nil } +func (r *ClickHouseReader) FetchRelatedValues(ctx context.Context, req *v3.FilterAttributeValueRequest) ([]string, error) { + var andConditions []string + + andConditions = append(andConditions, fmt.Sprintf("unix_milli >= %d", req.StartTimeMillis)) + andConditions = append(andConditions, fmt.Sprintf("unix_milli <= %d", req.EndTimeMillis)) + + if len(req.ExistingFilterItems) != 0 { + for _, item := range req.ExistingFilterItems { + // we only support string for related values + if item.Key.DataType != v3.AttributeKeyDataTypeString { + continue + } + + var colName string + switch item.Key.Type { + case v3.AttributeKeyTypeResource: + colName = "resource_attributes" + case v3.AttributeKeyTypeTag: + colName = "attributes" + default: + // we only support resource and tag for related values as of now + continue + } + // IN doesn't make use of map value index, we convert it to = or != + operator := item.Operator + if v3.FilterOperator(strings.ToLower(string(item.Operator))) == v3.FilterOperatorIn { + operator = "=" + } else if v3.FilterOperator(strings.ToLower(string(item.Operator))) == v3.FilterOperatorNotIn { + operator = "!=" + } + addCondition := func(val string) { + andConditions = append(andConditions, fmt.Sprintf("mapContains(%s, '%s') AND %s['%s'] %s %s", colName, item.Key.Key, colName, item.Key.Key, operator, val)) + } + switch v := item.Value.(type) { + case string: + fmtVal := utils.ClickHouseFormattedValue(v) + addCondition(fmtVal) + case []string: + for _, val := range v { + fmtVal := utils.ClickHouseFormattedValue(val) + addCondition(fmtVal) + } + case []interface{}: + for _, val := range v { + fmtVal := utils.ClickHouseFormattedValue(val) + addCondition(fmtVal) + } + } + } + } + whereClause := strings.Join(andConditions, " AND ") + + var selectColumn string + switch req.TagType { + case v3.TagTypeResource: + selectColumn = "resource_attributes" + "['" + req.FilterAttributeKey + "']" + case v3.TagTypeTag: + selectColumn = "attributes" + "['" + req.FilterAttributeKey + "']" + default: + selectColumn = "attributes" + "['" + req.FilterAttributeKey + "']" + } + + filterSubQuery := fmt.Sprintf( + "SELECT DISTINCT %s FROM %s.%s WHERE %s LIMIT 100", + selectColumn, + r.metadataDB, + r.metadataTable, + whereClause, + ) + zap.L().Debug("filterSubQuery for related values", zap.String("query", filterSubQuery)) + + rows, err := r.db.Query(ctx, filterSubQuery) + if err != nil { + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + var attributeValues []string + for rows.Next() { + var value string + if err := rows.Scan(&value); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + if value != "" { + attributeValues = append(attributeValues, value) + } + } + + return attributeValues, nil +} + func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { var err error var filterValueColumn string @@ -4227,6 +4322,13 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi } } + if req.IncludeRelated { + relatedValues, _ := r.FetchRelatedValues(ctx, req) + attributeValues.RelatedValues = &v3.FilterAttributeValueResponse{ + StringAttributeValues: relatedValues, + } + } + return &attributeValues, nil } @@ -4907,6 +5009,13 @@ func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3. } } + if req.IncludeRelated { + relatedValues, _ := r.FetchRelatedValues(ctx, req) + attributeValues.RelatedValues = &v3.FilterAttributeValueResponse{ + StringAttributeValues: relatedValues, + } + } + return &attributeValues, nil } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 066874966f..a85844241f 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -395,6 +395,12 @@ func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMid withCacheControl(AutoCompleteCacheControlAge, aH.autoCompleteAttributeKeys))).Methods(http.MethodGet) subRouter.HandleFunc("/autocomplete/attribute_values", am.ViewAccess( withCacheControl(AutoCompleteCacheControlAge, aH.autoCompleteAttributeValues))).Methods(http.MethodGet) + + // autocomplete with filters using new endpoints + // Note: eventually all autocomplete APIs should be migrated to new endpoint with appropriate filters, deprecating the older ones + + subRouter.HandleFunc("/auto_complete/attribute_values", am.ViewAccess(aH.autoCompleteAttributeValuesPost)).Methods(http.MethodPost) + subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeV3)).Methods(http.MethodPost) subRouter.HandleFunc("/query_range/format", am.ViewAccess(aH.QueryRangeV3Format)).Methods(http.MethodPost) @@ -4834,6 +4840,35 @@ func (aH *APIHandler) autoCompleteAttributeValues(w http.ResponseWriter, r *http aH.Respond(w, response) } +func (aH *APIHandler) autoCompleteAttributeValuesPost(w http.ResponseWriter, r *http.Request) { + var response *v3.FilterAttributeValueResponse + req, err := parseFilterAttributeValueRequestBody(r) + + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + switch req.DataSource { + case v3.DataSourceMetrics: + response, err = aH.reader.GetMetricAttributeValues(r.Context(), req) + case v3.DataSourceLogs: + response, err = aH.reader.GetLogAttributeValues(r.Context(), req) + case v3.DataSourceTraces: + response, err = aH.reader.GetTraceAttributeValues(r.Context(), req) + default: + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil) + return + } + + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + aH.Respond(w, response) +} + func (aH *APIHandler) getSpanKeysV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3) (map[string]v3.AttributeKey, error) { data := map[string]v3.AttributeKey{} for _, query := range queryRangeParams.CompositeQuery.BuilderQueries { diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 14029d5d94..5b8d847e59 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -741,6 +741,25 @@ func parseFilterAttributeKeyRequest(r *http.Request) (*v3.FilterAttributeKeyRequ return &req, nil } +func parseFilterAttributeValueRequestBody(r *http.Request) (*v3.FilterAttributeValueRequest, error) { + + var req v3.FilterAttributeValueRequest + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return nil, err + } + + if err := req.Validate(); err != nil { + return nil, err + } + + // offset by two windows periods for start for better results + req.StartTimeMillis = req.StartTimeMillis - time.Hour.Milliseconds()*6*2 + req.EndTimeMillis = req.EndTimeMillis + time.Hour.Milliseconds()*6 + + return &req, nil +} + func parseFilterAttributeValueRequest(r *http.Request) (*v3.FilterAttributeValueRequest, error) { var req v3.FilterAttributeValueRequest diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 30877a3df9..d6a8695488 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -297,6 +297,8 @@ func (q AttributeKeyDataType) String() string { // for a selected aggregate operator, aggregate attribute, filter attribute key // and search text. type FilterAttributeValueRequest struct { + StartTimeMillis int64 `json:"startTimeMillis"` + EndTimeMillis int64 `json:"endTimeMillis"` DataSource DataSource `json:"dataSource"` AggregateOperator AggregateOperator `json:"aggregateOperator"` AggregateAttribute string `json:"aggregateAttribute"` @@ -305,6 +307,51 @@ type FilterAttributeValueRequest struct { TagType TagType `json:"tagType"` SearchText string `json:"searchText"` Limit int `json:"limit"` + ExistingFilterItems []FilterItem `json:"existingFilterItems"` + MetricNames []string `json:"metricNames"` + IncludeRelated bool `json:"includeRelated"` +} + +func (f *FilterAttributeValueRequest) Validate() error { + if f.FilterAttributeKey == "" { + return fmt.Errorf("filterAttributeKey is required") + } + + if f.StartTimeMillis == 0 { + return fmt.Errorf("startTimeMillis is required") + } + + if f.EndTimeMillis == 0 { + return fmt.Errorf("endTimeMillis is required") + } + + if f.Limit == 0 { + f.Limit = 100 + } + + if f.Limit > 1000 { + return fmt.Errorf("limit must be less than 1000") + } + + if f.ExistingFilterItems != nil { + for _, value := range f.ExistingFilterItems { + if value.Key.Key == "" { + return fmt.Errorf("existingFilterItems must contain a valid key") + } + } + } + + if err := f.DataSource.Validate(); err != nil { + return fmt.Errorf("invalid data source: %w", err) + } + + if f.DataSource != DataSourceMetrics { + if err := f.AggregateOperator.Validate(); err != nil { + return fmt.Errorf("invalid aggregate operator: %w", err) + } + } + + return nil } type AggregateAttributeResponse struct { @@ -366,9 +413,10 @@ func (a AttributeKey) Validate() error { } type FilterAttributeValueResponse struct { - StringAttributeValues []string `json:"stringAttributeValues"` - NumberAttributeValues []interface{} `json:"numberAttributeValues"` - BoolAttributeValues []bool `json:"boolAttributeValues"` + StringAttributeValues []string `json:"stringAttributeValues"` + NumberAttributeValues []interface{} `json:"numberAttributeValues"` + BoolAttributeValues []bool `json:"boolAttributeValues"` + RelatedValues *FilterAttributeValueResponse `json:"relatedValues,omitempty"` } type QueryRangeParamsV3 struct {