diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index 5022088f5a..ff615255d2 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -48,6 +48,9 @@ const ( defaultTraceLocalTableName string = "signoz_index_v3" defaultTraceResourceTableV3 string = "distributed_traces_v3_resource" defaultTraceSummaryTable string = "distributed_trace_summary" + + defaultMetadataDB string = "signoz_metadata" + defaultMetadataTable string = "distributed_attributes_metadata" ) // NamespaceConfig is Clickhouse's internal configuration data @@ -88,6 +91,9 @@ type namespaceConfig struct { TraceLocalTableNameV3 string TraceResourceTableV3 string TraceSummaryTable string + + MetadataDB string + MetadataTable string } // Connecto defines how to connect to the database @@ -141,6 +147,9 @@ func NewOptions( TraceLocalTableNameV3: defaultTraceLocalTableName, TraceResourceTableV3: defaultTraceResourceTableV3, TraceSummaryTable: defaultTraceSummaryTable, + + MetadataDB: defaultMetadataDB, + MetadataTable: defaultMetadataTable, }, others: make(map[string]*namespaceConfig, len(otherNamespaces)), } diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index e98dd0368c..7b9965e431 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -161,6 +161,8 @@ type ClickHouseReader struct { fluxIntervalForTraceDetail time.Duration cache cache.Cache + metadataDB string + metadataTable string } // NewTraceReader returns a TraceReader for the database @@ -256,6 +258,8 @@ func NewReaderFromClickhouseConnection( fluxIntervalForTraceDetail: fluxIntervalForTraceDetail, cache: cache, + metadataDB: options.primary.MetadataDB, + metadataTable: options.primary.MetadataTable, } } @@ -1454,7 +1458,7 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con var serviceNameIntervalMap = map[string][]tracedetail.Interval{} var hasMissingSpans bool - userEmail , emailErr := auth.GetEmailFromJwt(ctx) + userEmail, emailErr := auth.GetEmailFromJwt(ctx) cachedTraceData, err := r.GetWaterfallSpansForTraceWithMetadataCache(ctx, traceID) if err == nil { startTime = cachedTraceData.StartTime @@ -1530,8 +1534,8 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con if startTime == 0 || startTimeUnixNano < startTime { startTime = startTimeUnixNano } - if endTime == 0 || (startTimeUnixNano + jsonItem.DurationNano ) > endTime { - endTime = (startTimeUnixNano + jsonItem.DurationNano ) + if endTime == 0 || (startTimeUnixNano+jsonItem.DurationNano) > endTime { + endTime = (startTimeUnixNano + jsonItem.DurationNano) } if durationNano == 0 || jsonItem.DurationNano > durationNano { durationNano = jsonItem.DurationNano @@ -1708,12 +1712,12 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace } // metadata calculation - startTimeUnixNano := uint64(item.TimeUnixNano.UnixNano()) + startTimeUnixNano := uint64(item.TimeUnixNano.UnixNano()) if startTime == 0 || startTimeUnixNano < startTime { startTime = startTimeUnixNano } - if endTime == 0 || ( startTimeUnixNano + jsonItem.DurationNano ) > endTime { - endTime = (startTimeUnixNano + jsonItem.DurationNano ) + if endTime == 0 || (startTimeUnixNano+jsonItem.DurationNano) > endTime { + endTime = (startTimeUnixNano + jsonItem.DurationNano) } if durationNano == 0 || jsonItem.DurationNano > durationNano { durationNano = jsonItem.DurationNano @@ -1777,7 +1781,7 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace trace.Spans = selectedSpansForRequest trace.StartTimestampMillis = startTime / 1000000 - trace.EndTimestampMillis = endTime / 1000000 + trace.EndTimestampMillis = endTime / 1000000 return trace, nil } @@ -4111,6 +4115,97 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt return &response, nil } +func (r *ClickHouseReader) FetchRelatedValues(ctx context.Context, req *v3.FilterAttributeValueRequest) ([]string, error) { + var andConditions []string + + andConditions = append(andConditions, fmt.Sprintf("unix_milli >= %d", req.StartTimeMillis)) + andConditions = append(andConditions, fmt.Sprintf("unix_milli <= %d", req.EndTimeMillis)) + + if len(req.ExistingFilterItems) != 0 { + for _, item := range req.ExistingFilterItems { + // we only support string for related values + if item.Key.DataType != v3.AttributeKeyDataTypeString { + continue + } + + var colName string + switch item.Key.Type { + case v3.AttributeKeyTypeResource: + colName = "resource_attributes" + case v3.AttributeKeyTypeTag: + colName = "attributes" + default: + // we only support resource and tag for related values as of now + continue + } + // IN doesn't make use of map value index, we convert it to = or != + operator := item.Operator + if v3.FilterOperator(strings.ToLower(string(item.Operator))) == v3.FilterOperatorIn { + operator = "=" + } else if v3.FilterOperator(strings.ToLower(string(item.Operator))) == v3.FilterOperatorNotIn { + operator = "!=" + } + addCondition := func(val string) { + andConditions = append(andConditions, fmt.Sprintf("mapContains(%s, '%s') AND %s['%s'] %s %s", colName, item.Key.Key, colName, item.Key.Key, operator, val)) + } + switch v := item.Value.(type) { + case string: + fmtVal := utils.ClickHouseFormattedValue(v) + addCondition(fmtVal) + case []string: + for _, val := range v { + fmtVal := utils.ClickHouseFormattedValue(val) + addCondition(fmtVal) + } + case []interface{}: + for _, val := range v { + fmtVal := utils.ClickHouseFormattedValue(val) + addCondition(fmtVal) + } + } + } + } + whereClause := strings.Join(andConditions, " AND ") + + var selectColumn string + switch req.TagType { + case v3.TagTypeResource: + selectColumn = "resource_attributes" + "['" + req.FilterAttributeKey + "']" + case v3.TagTypeTag: + selectColumn = "attributes" + "['" + req.FilterAttributeKey + "']" + default: + selectColumn = "attributes" + "['" + req.FilterAttributeKey + "']" + } + + filterSubQuery := fmt.Sprintf( + "SELECT DISTINCT %s FROM %s.%s WHERE %s LIMIT 100", + selectColumn, + r.metadataDB, + r.metadataTable, + whereClause, + ) + zap.L().Debug("filterSubQuery for related values", zap.String("query", filterSubQuery)) + + rows, err := r.db.Query(ctx, filterSubQuery) + if err != nil { + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + var attributeValues []string + for rows.Next() { + var value string + if err := rows.Scan(&value); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + if value != "" { + attributeValues = append(attributeValues, value) + } + } + + return attributeValues, nil +} + func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { var err error var filterValueColumn string @@ -4212,6 +4307,11 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi } } + relatedValues, _ := r.FetchRelatedValues(ctx, req) + attributeValues.RelatedValues = &v3.FilterAttributeValueResponse{ + StringAttributeValues: relatedValues, + } + return &attributeValues, nil } @@ -4892,6 +4992,11 @@ func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3. } } + relatedValues, _ := r.FetchRelatedValues(ctx, req) + attributeValues.RelatedValues = &v3.FilterAttributeValueResponse{ + StringAttributeValues: relatedValues, + } + return &attributeValues, nil } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 6036e11f9f..ceec1725b8 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -380,6 +380,7 @@ func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMid withCacheControl(AutoCompleteCacheControlAge, aH.autoCompleteAttributeKeys))).Methods(http.MethodGet) subRouter.HandleFunc("/autocomplete/attribute_values", am.ViewAccess( withCacheControl(AutoCompleteCacheControlAge, aH.autoCompleteAttributeValues))).Methods(http.MethodGet) + subRouter.HandleFunc("/autocomplete/attribute_values", am.ViewAccess(aH.autoCompleteAttributeValuesPost)).Methods(http.MethodPost) subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeV3)).Methods(http.MethodPost) subRouter.HandleFunc("/query_range/format", am.ViewAccess(aH.QueryRangeV3Format)).Methods(http.MethodPost) @@ -4600,6 +4601,35 @@ func (aH *APIHandler) autoCompleteAttributeValues(w http.ResponseWriter, r *http aH.Respond(w, response) } +func (aH *APIHandler) autoCompleteAttributeValuesPost(w http.ResponseWriter, r *http.Request) { + var response *v3.FilterAttributeValueResponse + req, err := parseFilterAttributeValueRequestBody(r) + + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + switch req.DataSource { + case v3.DataSourceMetrics: + response, err = aH.reader.GetMetricAttributeValues(r.Context(), req) + case v3.DataSourceLogs: + response, err = aH.reader.GetLogAttributeValues(r.Context(), req) + case v3.DataSourceTraces: + response, err = aH.reader.GetTraceAttributeValues(r.Context(), req) + default: + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil) + return + } + + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + aH.Respond(w, response) +} + func (aH *APIHandler) getSpanKeysV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3) (map[string]v3.AttributeKey, error) { data := map[string]v3.AttributeKey{} for _, query := range queryRangeParams.CompositeQuery.BuilderQueries { diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 8cfc621662..78746478f6 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -769,6 +769,25 @@ func parseFilterAttributeValueRequest(r *http.Request) (*v3.FilterAttributeValue return &req, nil } +func parseFilterAttributeValueRequestBody(r *http.Request) (*v3.FilterAttributeValueRequest, error) { + + var req v3.FilterAttributeValueRequest + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return nil, err + } + + if err := req.Validate(); err != nil { + return nil, err + } + + // offset by two windows periods for start for better results + req.StartTimeMillis = req.StartTimeMillis - time.Hour.Milliseconds()*6*2 + req.EndTimeMillis = req.EndTimeMillis + time.Hour.Milliseconds()*6 + + return &req, nil +} + func validateQueryRangeParamsV3(qp *v3.QueryRangeParamsV3) error { err := qp.CompositeQuery.Validate() if err != nil { diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index b9570fc52e..b5eb4facad 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -297,6 +297,8 @@ func (q AttributeKeyDataType) String() string { // for a selected aggregate operator, aggregate attribute, filter attribute key // and search text. type FilterAttributeValueRequest struct { + StartTimeMillis int64 `json:"startTimeMillis"` + EndTimeMillis int64 `json:"endTimeMillis"` DataSource DataSource `json:"dataSource"` AggregateOperator AggregateOperator `json:"aggregateOperator"` AggregateAttribute string `json:"aggregateAttribute"` @@ -305,6 +307,50 @@ type FilterAttributeValueRequest struct { TagType TagType `json:"tagType"` SearchText string `json:"searchText"` Limit int `json:"limit"` + ExistingFilterItems []FilterItem `json:"existingFilterItems"` + MetricNames []string `json:"metricNames"` +} + +func (f *FilterAttributeValueRequest) Validate() error { + if f.FilterAttributeKey == "" { + return fmt.Errorf("filterAttributeKey is required") + } + + if f.StartTimeMillis == 0 { + return fmt.Errorf("startTimeMillis is required") + } + + if f.EndTimeMillis == 0 { + return fmt.Errorf("endTimeMillis is required") + } + + if f.Limit == 0 { + f.Limit = 100 + } + + if f.Limit > 1000 { + return fmt.Errorf("limit must be less than 1000") + } + + if f.ExistingFilterItems != nil { + for _, value := range f.ExistingFilterItems { + if value.Key.Key == "" { + return fmt.Errorf("existingFilterItems must contain a valid key") + } + } + } + + if err := f.DataSource.Validate(); err != nil { + return fmt.Errorf("invalid data source: %w", err) + } + + if f.DataSource != DataSourceMetrics { + if err := f.AggregateOperator.Validate(); err != nil { + return fmt.Errorf("invalid aggregate operator: %w", err) + } + } + + return nil } type AggregateAttributeResponse struct { @@ -369,6 +415,8 @@ type FilterAttributeValueResponse struct { StringAttributeValues []string `json:"stringAttributeValues"` NumberAttributeValues []interface{} `json:"numberAttributeValues"` BoolAttributeValues []bool `json:"boolAttributeValues"` + + RelatedValues *FilterAttributeValueResponse `json:"relatedValues"` } type QueryRangeParamsV3 struct {