diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index ebbee2a7b8..71388ad1ee 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -28,6 +28,7 @@ const ( defaultSpansTable string = "distributed_signoz_spans" defaultDependencyGraphTable string = "distributed_dependency_graph_minutes_v2" defaultTopLevelOperationsTable string = "distributed_top_level_operations" + defaultSpanAttributeTable string = "distributed_span_attributes" defaultLogsDB string = "signoz_logs" defaultLogsTable string = "distributed_logs" defaultLogsLocalTable string = "logs" @@ -63,6 +64,7 @@ type namespaceConfig struct { UsageExplorerTable string SpansTable string ErrorTable string + SpanAttributeTable string DependencyGraphTable string TopLevelOperationsTable string LogsDB string @@ -132,6 +134,7 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s DurationTable: defaultDurationTable, UsageExplorerTable: defaultUsageExplorerTable, SpansTable: defaultSpansTable, + SpanAttributeTable: defaultSpanAttributeTable, DependencyGraphTable: defaultDependencyGraphTable, TopLevelOperationsTable: defaultTopLevelOperationsTable, LogsDB: defaultLogsDB, diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 39c84b93b8..1b0b771139 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -95,6 +95,7 @@ type ClickHouseReader struct { errorTable string usageExplorerTable string SpansTable string + spanAttributeTable string dependencyGraphTable string topLevelOperationsTable string logsDB string @@ -145,6 +146,7 @@ func NewReader(localDB *sqlx.DB, configFile string, featureFlag interfaces.Featu usageExplorerTable: options.primary.UsageExplorerTable, durationTable: options.primary.DurationTable, SpansTable: options.primary.SpansTable, + spanAttributeTable: options.primary.SpanAttributeTable, dependencyGraphTable: options.primary.DependencyGraphTable, topLevelOperationsTable: options.primary.TopLevelOperationsTable, logsDB: options.primary.LogsDB, @@ -4218,3 +4220,163 @@ func (r *ClickHouseReader) CheckClickHouse(ctx context.Context) error { return nil } + +func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) { + var query string + var err error + var rows driver.Rows + var response v3.AggregateAttributeResponse + where := "" + switch req.Operator { + case v3.AggregateOperatorCountDistinct: + where = "tagKey ILIKE $1" + case + v3.AggregateOperatorRateSum, + v3.AggregateOperatorRateMax, + v3.AggregateOperatorRateAvg, + v3.AggregateOperatorRate, + v3.AggregateOperatorRateMin, + v3.AggregateOperatorP05, + v3.AggregateOperatorP10, + v3.AggregateOperatorP20, + v3.AggregateOperatorP25, + v3.AggregateOperatorP50, + v3.AggregateOperatorP75, + v3.AggregateOperatorP90, + v3.AggregateOperatorP95, + v3.AggregateOperatorP99, + v3.AggregateOperatorAvg, + v3.AggregateOperatorSum, + v3.AggregateOperatorMin, + v3.AggregateOperatorMax: + where = "tagKey ILIKE $1 AND dataType='float64'" + case + v3.AggregateOpeatorCount, + v3.AggregateOperatorNoOp: + return &v3.AggregateAttributeResponse{}, nil + default: + return nil, fmt.Errorf("unsupported aggregate operator") + } + query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType, isColumn FROM %s.%s WHERE %s", r.TraceDB, r.spanAttributeTable, where) + if req.Limit != 0 { + query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) + } + rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText)) + + if err != nil { + zap.S().Error(err) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + var tagKey string + var dataType string + var tagType string + var isColumn bool + for rows.Next() { + if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + key := v3.AttributeKey{ + Key: tagKey, + DataType: v3.AttributeKeyDataType(dataType), + Type: v3.AttributeKeyType(tagType), + IsColumn: isColumn, + } + response.AttributeKeys = append(response.AttributeKeys, key) + } + return &response, nil +} + +func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + + var query string + var err error + var rows driver.Rows + var response v3.FilterAttributeKeyResponse + + query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType, isColumn FROM %s.%s WHERE tagKey ILIKE $1", r.TraceDB, r.spanAttributeTable) + + if req.Limit != 0 { + query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) + } + rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText)) + + if err != nil { + zap.S().Error(err) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + var tagKey string + var dataType string + var tagType string + var isColumn bool + for rows.Next() { + if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + key := v3.AttributeKey{ + Key: tagKey, + DataType: v3.AttributeKeyDataType(dataType), + Type: v3.AttributeKeyType(tagType), + IsColumn: isColumn, + } + response.AttributeKeys = append(response.AttributeKeys, key) + } + return &response, nil +} + +func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { + + var query string + var err error + var rows driver.Rows + var attributeValues v3.FilterAttributeValueResponse + // if dataType or tagType is not present return empty response + if len(req.FilterAttributeKeyDataType) == 0 || len(req.TagType) == 0 || req.FilterAttributeKey == "body" { + return &v3.FilterAttributeValueResponse{}, nil + } + switch req.FilterAttributeKeyDataType { + case v3.AttributeKeyDataTypeString: + query = fmt.Sprintf("SELECT DISTINCT stringTagValue from %s.%s WHERE tagKey = $1 AND stringTagValue ILIKE $2 AND tagType=$3 limit $4", r.TraceDB, r.spanAttributeTable) + rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, fmt.Sprintf("%%%s%%", req.SearchText), req.TagType, req.Limit) + if err != nil { + zap.S().Error(err) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + var strAttributeValue string + for rows.Next() { + if err := rows.Scan(&strAttributeValue); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + attributeValues.StringAttributeValues = append(attributeValues.StringAttributeValues, strAttributeValue) + } + case v3.AttributeKeyDataTypeFloat64, v3.AttributeKeyDataTypeInt64: + query = fmt.Sprintf("SELECT DISTINCT float64TagValue from %s.%s where tagKey = $1 AND toString(float64TagValue) ILIKE $2 AND tagType=$3 limit $4", r.TraceDB, r.spanAttributeTable) + rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, fmt.Sprintf("%%%s%%", req.SearchText), req.TagType, req.Limit) + if err != nil { + zap.S().Error(err) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + var numberAttributeValue sql.NullFloat64 + for rows.Next() { + if err := rows.Scan(&numberAttributeValue); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + if numberAttributeValue.Valid { + attributeValues.NumberAttributeValues = append(attributeValues.NumberAttributeValues, numberAttributeValue.Float64) + } + } + case v3.AttributeKeyDataTypeBool: + attributeValues.BoolAttributeValues = []bool{true, false} + default: + return nil, fmt.Errorf("invalid data type") + } + + return &attributeValues, nil +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 9310cc5ad8..3e0728cc09 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2370,7 +2370,7 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r * case v3.DataSourceLogs: response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req) case v3.DataSourceTraces: - // TODO: implement + response, err = aH.reader.GetTraceAggregateAttributes(r.Context(), req) default: RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil) return @@ -2399,7 +2399,7 @@ func (aH *APIHandler) autoCompleteAttributeKeys(w http.ResponseWriter, r *http.R case v3.DataSourceLogs: response, err = aH.reader.GetLogAttributeKeys(r.Context(), req) case v3.DataSourceTraces: - // TODO: implement + response, err = aH.reader.GetTraceAttributeKeys(r.Context(), req) default: RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil) return @@ -2428,7 +2428,7 @@ func (aH *APIHandler) autoCompleteAttributeValues(w http.ResponseWriter, r *http case v3.DataSourceLogs: response, err = aH.reader.GetLogAttributeValues(r.Context(), req) case v3.DataSourceTraces: - // TODO: implement + response, err = aH.reader.GetTraceAttributeValues(r.Context(), req) default: RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil) return diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 78d4031724..1393e28406 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -35,6 +35,9 @@ type Reader interface { // clickhouse only. GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError) GetSpanFilters(ctx context.Context, query *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError) + GetTraceAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) + GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) + GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*model.TagFilters, *model.ApiError) GetTagValues(ctx context.Context, query *model.TagFilterParams) (*model.TagValues, *model.ApiError) GetFilteredSpans(ctx context.Context, query *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index c1dddc60f6..7c62275dc5 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -221,7 +221,7 @@ func (q AttributeKeyDataType) Validate() error { case AttributeKeyDataTypeString, AttributeKeyDataTypeInt64, AttributeKeyDataTypeFloat64, AttributeKeyDataTypeBool: return nil default: - return fmt.Errorf("invalid tag type: %s", q) + return fmt.Errorf("invalid tag data type: %s", q) } }