diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index cf66408d8a..23e9f6a9fa 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -3700,6 +3700,73 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, req return &response, nil } +func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + + var query string + var err error + var rows driver.Rows + var response v3.FilterAttributeKeyResponse + + // skips the internal attributes i.e attributes starting with __ + query = fmt.Sprintf("SELECT DISTINCT arrayJoin(tagKeys) as distinctTagKey from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from %s.%s WHERE metric_name=$1) WHERE distinctTagKey ILIKE $2 AND distinctTagKey NOT LIKE '\\_\\_%%'", signozMetricDBName, signozTSTableName) + if req.Limit != 0 { + query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) + } + rows, err = r.db.Query(ctx, query, req.AggregateAttribute, fmt.Sprintf("%%%s%%", req.SearchText)) + if err != nil { + zap.S().Error(err) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + var attributeKey string + for rows.Next() { + if err := rows.Scan(&attributeKey); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + key := v3.AttributeKey{ + Key: attributeKey, + DataType: v3.AttributeKeyDataTypeString, // https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72. + Type: v3.AttributeKeyTypeTag, + } + response.AttributeKeys = append(response.AttributeKeys, key) + } + + return &response, nil +} + +func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { + + var query string + var err error + var rows driver.Rows + var attributeValues v3.FilterAttributeValueResponse + + query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels, $1)) from %s.%s WHERE metric_name=$2 AND JSONExtractString(labels, $3) ILIKE $4", signozMetricDBName, signozTSTableName) + if req.Limit != 0 { + query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) + } + rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, req.AggregateAttribute, req.FilterAttributeKey, fmt.Sprintf("%%%s%%", req.SearchText)) + + if err != nil { + zap.S().Error(err) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + var atrributeValue string + for rows.Next() { + if err := rows.Scan(&atrributeValue); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + // https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72 + // this may change in future if we use OTLP as the data model + attributeValues.StringAttributeValues = append(attributeValues.StringAttributeValues, atrributeValue) + } + + return &attributeValues, nil +} + func (r *ClickHouseReader) CheckClickHouse(ctx context.Context) error { rows, err := r.db.Query(ctx, "SELECT 1") if err != nil { diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index eb8409d239..62ebeeeedf 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -242,6 +242,8 @@ func (aH *APIHandler) RegisterMetricsRoutes(router *mux.Router, am *AuthMiddlewa func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMiddleware) { subRouter := router.PathPrefix("/api/v3").Subrouter() subRouter.HandleFunc("/autocomplete/aggregate_attributes", am.ViewAccess(aH.autocompleteAggregateAttributes)).Methods(http.MethodGet) + subRouter.HandleFunc("/autocomplete/attribute_keys", am.ViewAccess(aH.autoCompleteAttributeKeys)).Methods(http.MethodGet) + subRouter.HandleFunc("/autocomplete/attribute_values", am.ViewAccess(aH.autoCompleteAttributeValues)).Methods(http.MethodGet) } func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) { @@ -2364,3 +2366,61 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r * aH.Respond(w, response) } + +func (aH *APIHandler) autoCompleteAttributeKeys(w http.ResponseWriter, r *http.Request) { + var response *v3.FilterAttributeKeyResponse + req, err := parseFilterAttributeKeyRequest(r) + + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + switch req.DataSource { + case v3.DataSourceMetrics: + response, err = aH.reader.GetMetricAttributeKeys(r.Context(), req) + case v3.DataSourceLogs: + // TODO: implement + case v3.DataSourceTraces: + // TODO: implement + default: + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil) + return + } + + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + aH.Respond(w, response) +} + +func (aH *APIHandler) autoCompleteAttributeValues(w http.ResponseWriter, r *http.Request) { + var response *v3.FilterAttributeValueResponse + req, err := parseFilterAttributeValueRequest(r) + + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + switch req.DataSource { + case v3.DataSourceMetrics: + response, err = aH.reader.GetMetricAttributeValues(r.Context(), req) + case v3.DataSourceLogs: + // TODO: implement + case v3.DataSourceTraces: + // TODO: implement + default: + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil) + return + } + + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + aH.Respond(w, response) +} diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index e75e41d7d0..608e5df8a6 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -841,3 +841,64 @@ func parseAggregateAttributeRequest(r *http.Request) (*v3.AggregateAttributeRequ } return &req, nil } + +func parseFilterAttributeKeyRequest(r *http.Request) (*v3.FilterAttributeKeyRequest, error) { + var req v3.FilterAttributeKeyRequest + + dataSource := v3.DataSource(r.URL.Query().Get("dataSource")) + aggregateOperator := v3.AggregateOperator(r.URL.Query().Get("aggregateOperator")) + aggregateAttribute := r.URL.Query().Get("aggregateAttribute") + limit, err := strconv.Atoi(r.URL.Query().Get("limit")) + if err != nil { + limit = 50 + } + + if err := dataSource.Validate(); err != nil { + return nil, err + } + + if err := aggregateOperator.Validate(); err != nil { + return nil, err + } + + req = v3.FilterAttributeKeyRequest{ + DataSource: dataSource, + AggregateOperator: aggregateOperator, + AggregateAttribute: aggregateAttribute, + Limit: limit, + SearchText: r.URL.Query().Get("searchText"), + } + return &req, nil +} + +func parseFilterAttributeValueRequest(r *http.Request) (*v3.FilterAttributeValueRequest, error) { + + var req v3.FilterAttributeValueRequest + + dataSource := v3.DataSource(r.URL.Query().Get("dataSource")) + aggregateOperator := v3.AggregateOperator(r.URL.Query().Get("aggregateOperator")) + aggregateAttribute := r.URL.Query().Get("aggregateAttribute") + + limit, err := strconv.Atoi(r.URL.Query().Get("limit")) + if err != nil { + limit = 50 + } + + if err := dataSource.Validate(); err != nil { + return nil, err + } + + if err := aggregateOperator.Validate(); err != nil { + return nil, err + } + + req = v3.FilterAttributeValueRequest{ + DataSource: dataSource, + AggregateOperator: aggregateOperator, + AggregateAttribute: aggregateAttribute, + Limit: limit, + SearchText: r.URL.Query().Get("searchText"), + FilterAttributeKey: r.URL.Query().Get("attributeKey"), + } + return &req, nil +} diff --git a/pkg/query-service/app/parser_test.go b/pkg/query-service/app/parser_test.go index 3a29303510..222a1c9ee6 100644 --- a/pkg/query-service/app/parser_test.go +++ b/pkg/query-service/app/parser_test.go @@ -147,3 +147,193 @@ func TestParseAggregateAttrReques(t *testing.T) { assert.Equal(t, reqCase.expectedSearchText, aggregateAttrRequest.SearchText) } } + +func TestParseFilterAttributeKeyRequest(t *testing.T) { + reqCases := []struct { + desc string + queryString string + expectedOperator v3.AggregateOperator + expectedDataSource v3.DataSource + expectedAggAttr string + expectedLimit int + expectedSearchText string + expectErr bool + errMsg string + }{ + { + desc: "valid operator and data source", + queryString: "aggregateOperator=sum&dataSource=metrics&aggregateAttribute=metric_name&searchText=abc", + expectedOperator: v3.AggregateOperatorSum, + expectedDataSource: v3.DataSourceMetrics, + expectedAggAttr: "metric_name", + expectedLimit: 50, + expectedSearchText: "abc", + }, + { + desc: "different valid operator and data source as logs", + queryString: "aggregateOperator=avg&dataSource=logs&aggregateAttribute=bytes&searchText=abc", + expectedOperator: v3.AggregateOperatorAvg, + expectedDataSource: v3.DataSourceLogs, + expectedAggAttr: "bytes", + expectedLimit: 50, + expectedSearchText: "abc", + }, + { + desc: "different valid operator and with default search text and limit", + queryString: "aggregateOperator=avg&dataSource=metrics&aggregateAttribute=metric_name", + expectedOperator: v3.AggregateOperatorAvg, + expectedDataSource: v3.DataSourceMetrics, + expectedAggAttr: "metric_name", + expectedLimit: 50, + expectedSearchText: "", + }, + { + desc: "valid operator and data source with limit", + queryString: "aggregateOperator=avg&dataSource=traces&aggregateAttribute=http.req.duration&limit=10", + expectedOperator: v3.AggregateOperatorAvg, + expectedAggAttr: "http.req.duration", + expectedDataSource: v3.DataSourceTraces, + expectedLimit: 10, + expectedSearchText: "", + }, + { + desc: "invalid operator", + queryString: "aggregateOperator=avg1&dataSource=traces&limit=10", + expectErr: true, + errMsg: "invalid operator", + }, + { + desc: "invalid data source", + queryString: "aggregateOperator=avg&dataSource=traces1&limit=10", + expectErr: true, + errMsg: "invalid data source", + }, + { + desc: "invalid limit", + queryString: "aggregateOperator=avg&dataSource=traces&limit=abc", + expectedOperator: v3.AggregateOperatorAvg, + expectedDataSource: v3.DataSourceTraces, + expectedLimit: 50, + }, + } + + for _, reqCase := range reqCases { + r := httptest.NewRequest("GET", "/api/v3/autocomplete/filter_attributes?"+reqCase.queryString, nil) + filterAttrRequest, err := parseFilterAttributeKeyRequest(r) + if reqCase.expectErr { + if err == nil { + t.Errorf("expected error: %s", reqCase.errMsg) + } + if !strings.Contains(err.Error(), reqCase.errMsg) { + t.Errorf("expected error to contain: %s, got: %s", reqCase.errMsg, err.Error()) + } + continue + } + if err != nil { + t.Errorf("unexpected error: %v", err) + } + assert.Equal(t, reqCase.expectedOperator, filterAttrRequest.AggregateOperator) + assert.Equal(t, reqCase.expectedDataSource, filterAttrRequest.DataSource) + assert.Equal(t, reqCase.expectedAggAttr, filterAttrRequest.AggregateAttribute) + assert.Equal(t, reqCase.expectedLimit, filterAttrRequest.Limit) + assert.Equal(t, reqCase.expectedSearchText, filterAttrRequest.SearchText) + } +} + +func TestParseFilterAttributeValueRequest(t *testing.T) { + reqCases := []struct { + desc string + queryString string + expectedOperator v3.AggregateOperator + expectedDataSource v3.DataSource + expectedAggAttr string + expectedFilterAttr string + expectedLimit int + expectedSearchText string + expectErr bool + errMsg string + }{ + { + desc: "valid operator and data source", + queryString: "aggregateOperator=sum&dataSource=metrics&aggregateAttribute=metric_name&attributeKey=service_name&searchText=abc", + expectedOperator: v3.AggregateOperatorSum, + expectedDataSource: v3.DataSourceMetrics, + expectedAggAttr: "metric_name", + expectedFilterAttr: "service_name", + expectedLimit: 50, + expectedSearchText: "abc", + }, + { + desc: "different valid operator and data source as logs", + queryString: "aggregateOperator=avg&dataSource=logs&aggregateAttribute=bytes&attributeKey=service_name&searchText=abc", + expectedOperator: v3.AggregateOperatorAvg, + expectedDataSource: v3.DataSourceLogs, + expectedAggAttr: "bytes", + expectedFilterAttr: "service_name", + expectedLimit: 50, + expectedSearchText: "abc", + }, + { + desc: "different valid operator and with default search text and limit", + queryString: "aggregateOperator=avg&dataSource=metrics&aggregateAttribute=metric_name&attributeKey=service_name", + expectedOperator: v3.AggregateOperatorAvg, + expectedDataSource: v3.DataSourceMetrics, + expectedAggAttr: "metric_name", + expectedFilterAttr: "service_name", + expectedLimit: 50, + expectedSearchText: "", + }, + { + desc: "valid operator and data source with limit", + queryString: "aggregateOperator=avg&dataSource=traces&aggregateAttribute=http.req.duration&attributeKey=service_name&limit=10", + expectedOperator: v3.AggregateOperatorAvg, + expectedAggAttr: "http.req.duration", + expectedFilterAttr: "service_name", + expectedDataSource: v3.DataSourceTraces, + expectedLimit: 10, + expectedSearchText: "", + }, + { + desc: "invalid operator", + queryString: "aggregateOperator=avg1&dataSource=traces&limit=10", + expectErr: true, + errMsg: "invalid operator", + }, + { + desc: "invalid data source", + queryString: "aggregateOperator=avg&dataSource=traces1&limit=10", + expectErr: true, + errMsg: "invalid data source", + }, + { + desc: "invalid limit", + queryString: "aggregateOperator=avg&dataSource=traces&limit=abc", + expectedOperator: v3.AggregateOperatorAvg, + expectedDataSource: v3.DataSourceTraces, + expectedLimit: 50, + }, + } + + for _, reqCase := range reqCases { + r := httptest.NewRequest("GET", "/api/v3/autocomplete/filter_attribute_values?"+reqCase.queryString, nil) + filterAttrRequest, err := parseFilterAttributeValueRequest(r) + if reqCase.expectErr { + if err == nil { + t.Errorf("expected error: %s", reqCase.errMsg) + } + if !strings.Contains(err.Error(), reqCase.errMsg) { + t.Errorf("expected error to contain: %s, got: %s", reqCase.errMsg, err.Error()) + } + continue + } + if err != nil { + t.Errorf("unexpected error: %v", err) + } + assert.Equal(t, reqCase.expectedOperator, filterAttrRequest.AggregateOperator) + assert.Equal(t, reqCase.expectedDataSource, filterAttrRequest.DataSource) + assert.Equal(t, reqCase.expectedAggAttr, filterAttrRequest.AggregateAttribute) + assert.Equal(t, reqCase.expectedFilterAttr, filterAttrRequest.FilterAttributeKey) + assert.Equal(t, reqCase.expectedLimit, filterAttrRequest.Limit) + assert.Equal(t, reqCase.expectedSearchText, filterAttrRequest.SearchText) + } +} diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 6f0442d4ad..c8c66669d0 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -58,6 +58,8 @@ type Reader interface { GetMetricResult(ctx context.Context, query string) ([]*model.Series, error) GetMetricResultEE(ctx context.Context, query string) ([]*model.Series, string, error) GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) + GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) + GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) GetTotalSpans(ctx context.Context) (uint64, error) GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error)