feat: add attrs filters autocomplete endpoints (#2264)

This commit is contained in:
Srikanth Chekuri 2023-03-10 11:22:34 +05:30 committed by GitHub
parent 59497ed53c
commit 9af991e424
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 380 additions and 0 deletions

View File

@ -3700,6 +3700,73 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, req
return &response, nil
}
func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
var query string
var err error
var rows driver.Rows
var response v3.FilterAttributeKeyResponse
// skips the internal attributes i.e attributes starting with __
query = fmt.Sprintf("SELECT DISTINCT arrayJoin(tagKeys) as distinctTagKey from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from %s.%s WHERE metric_name=$1) WHERE distinctTagKey ILIKE $2 AND distinctTagKey NOT LIKE '\\_\\_%%'", signozMetricDBName, signozTSTableName)
if req.Limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
}
rows, err = r.db.Query(ctx, query, req.AggregateAttribute, fmt.Sprintf("%%%s%%", req.SearchText))
if err != nil {
zap.S().Error(err)
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()
var attributeKey string
for rows.Next() {
if err := rows.Scan(&attributeKey); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
key := v3.AttributeKey{
Key: attributeKey,
DataType: v3.AttributeKeyDataTypeString, // https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72.
Type: v3.AttributeKeyTypeTag,
}
response.AttributeKeys = append(response.AttributeKeys, key)
}
return &response, nil
}
func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
var query string
var err error
var rows driver.Rows
var attributeValues v3.FilterAttributeValueResponse
query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels, $1)) from %s.%s WHERE metric_name=$2 AND JSONExtractString(labels, $3) ILIKE $4", signozMetricDBName, signozTSTableName)
if req.Limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
}
rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, req.AggregateAttribute, req.FilterAttributeKey, fmt.Sprintf("%%%s%%", req.SearchText))
if err != nil {
zap.S().Error(err)
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()
var atrributeValue string
for rows.Next() {
if err := rows.Scan(&atrributeValue); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
// https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72
// this may change in future if we use OTLP as the data model
attributeValues.StringAttributeValues = append(attributeValues.StringAttributeValues, atrributeValue)
}
return &attributeValues, nil
}
func (r *ClickHouseReader) CheckClickHouse(ctx context.Context) error {
rows, err := r.db.Query(ctx, "SELECT 1")
if err != nil {

View File

@ -242,6 +242,8 @@ func (aH *APIHandler) RegisterMetricsRoutes(router *mux.Router, am *AuthMiddlewa
func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMiddleware) {
subRouter := router.PathPrefix("/api/v3").Subrouter()
subRouter.HandleFunc("/autocomplete/aggregate_attributes", am.ViewAccess(aH.autocompleteAggregateAttributes)).Methods(http.MethodGet)
subRouter.HandleFunc("/autocomplete/attribute_keys", am.ViewAccess(aH.autoCompleteAttributeKeys)).Methods(http.MethodGet)
subRouter.HandleFunc("/autocomplete/attribute_values", am.ViewAccess(aH.autoCompleteAttributeValues)).Methods(http.MethodGet)
}
func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) {
@ -2364,3 +2366,61 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r *
aH.Respond(w, response)
}
func (aH *APIHandler) autoCompleteAttributeKeys(w http.ResponseWriter, r *http.Request) {
var response *v3.FilterAttributeKeyResponse
req, err := parseFilterAttributeKeyRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
switch req.DataSource {
case v3.DataSourceMetrics:
response, err = aH.reader.GetMetricAttributeKeys(r.Context(), req)
case v3.DataSourceLogs:
// TODO: implement
case v3.DataSourceTraces:
// TODO: implement
default:
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil)
return
}
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
aH.Respond(w, response)
}
func (aH *APIHandler) autoCompleteAttributeValues(w http.ResponseWriter, r *http.Request) {
var response *v3.FilterAttributeValueResponse
req, err := parseFilterAttributeValueRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
switch req.DataSource {
case v3.DataSourceMetrics:
response, err = aH.reader.GetMetricAttributeValues(r.Context(), req)
case v3.DataSourceLogs:
// TODO: implement
case v3.DataSourceTraces:
// TODO: implement
default:
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil)
return
}
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
aH.Respond(w, response)
}

View File

@ -841,3 +841,64 @@ func parseAggregateAttributeRequest(r *http.Request) (*v3.AggregateAttributeRequ
}
return &req, nil
}
func parseFilterAttributeKeyRequest(r *http.Request) (*v3.FilterAttributeKeyRequest, error) {
var req v3.FilterAttributeKeyRequest
dataSource := v3.DataSource(r.URL.Query().Get("dataSource"))
aggregateOperator := v3.AggregateOperator(r.URL.Query().Get("aggregateOperator"))
aggregateAttribute := r.URL.Query().Get("aggregateAttribute")
limit, err := strconv.Atoi(r.URL.Query().Get("limit"))
if err != nil {
limit = 50
}
if err := dataSource.Validate(); err != nil {
return nil, err
}
if err := aggregateOperator.Validate(); err != nil {
return nil, err
}
req = v3.FilterAttributeKeyRequest{
DataSource: dataSource,
AggregateOperator: aggregateOperator,
AggregateAttribute: aggregateAttribute,
Limit: limit,
SearchText: r.URL.Query().Get("searchText"),
}
return &req, nil
}
func parseFilterAttributeValueRequest(r *http.Request) (*v3.FilterAttributeValueRequest, error) {
var req v3.FilterAttributeValueRequest
dataSource := v3.DataSource(r.URL.Query().Get("dataSource"))
aggregateOperator := v3.AggregateOperator(r.URL.Query().Get("aggregateOperator"))
aggregateAttribute := r.URL.Query().Get("aggregateAttribute")
limit, err := strconv.Atoi(r.URL.Query().Get("limit"))
if err != nil {
limit = 50
}
if err := dataSource.Validate(); err != nil {
return nil, err
}
if err := aggregateOperator.Validate(); err != nil {
return nil, err
}
req = v3.FilterAttributeValueRequest{
DataSource: dataSource,
AggregateOperator: aggregateOperator,
AggregateAttribute: aggregateAttribute,
Limit: limit,
SearchText: r.URL.Query().Get("searchText"),
FilterAttributeKey: r.URL.Query().Get("attributeKey"),
}
return &req, nil
}

View File

@ -147,3 +147,193 @@ func TestParseAggregateAttrReques(t *testing.T) {
assert.Equal(t, reqCase.expectedSearchText, aggregateAttrRequest.SearchText)
}
}
func TestParseFilterAttributeKeyRequest(t *testing.T) {
reqCases := []struct {
desc string
queryString string
expectedOperator v3.AggregateOperator
expectedDataSource v3.DataSource
expectedAggAttr string
expectedLimit int
expectedSearchText string
expectErr bool
errMsg string
}{
{
desc: "valid operator and data source",
queryString: "aggregateOperator=sum&dataSource=metrics&aggregateAttribute=metric_name&searchText=abc",
expectedOperator: v3.AggregateOperatorSum,
expectedDataSource: v3.DataSourceMetrics,
expectedAggAttr: "metric_name",
expectedLimit: 50,
expectedSearchText: "abc",
},
{
desc: "different valid operator and data source as logs",
queryString: "aggregateOperator=avg&dataSource=logs&aggregateAttribute=bytes&searchText=abc",
expectedOperator: v3.AggregateOperatorAvg,
expectedDataSource: v3.DataSourceLogs,
expectedAggAttr: "bytes",
expectedLimit: 50,
expectedSearchText: "abc",
},
{
desc: "different valid operator and with default search text and limit",
queryString: "aggregateOperator=avg&dataSource=metrics&aggregateAttribute=metric_name",
expectedOperator: v3.AggregateOperatorAvg,
expectedDataSource: v3.DataSourceMetrics,
expectedAggAttr: "metric_name",
expectedLimit: 50,
expectedSearchText: "",
},
{
desc: "valid operator and data source with limit",
queryString: "aggregateOperator=avg&dataSource=traces&aggregateAttribute=http.req.duration&limit=10",
expectedOperator: v3.AggregateOperatorAvg,
expectedAggAttr: "http.req.duration",
expectedDataSource: v3.DataSourceTraces,
expectedLimit: 10,
expectedSearchText: "",
},
{
desc: "invalid operator",
queryString: "aggregateOperator=avg1&dataSource=traces&limit=10",
expectErr: true,
errMsg: "invalid operator",
},
{
desc: "invalid data source",
queryString: "aggregateOperator=avg&dataSource=traces1&limit=10",
expectErr: true,
errMsg: "invalid data source",
},
{
desc: "invalid limit",
queryString: "aggregateOperator=avg&dataSource=traces&limit=abc",
expectedOperator: v3.AggregateOperatorAvg,
expectedDataSource: v3.DataSourceTraces,
expectedLimit: 50,
},
}
for _, reqCase := range reqCases {
r := httptest.NewRequest("GET", "/api/v3/autocomplete/filter_attributes?"+reqCase.queryString, nil)
filterAttrRequest, err := parseFilterAttributeKeyRequest(r)
if reqCase.expectErr {
if err == nil {
t.Errorf("expected error: %s", reqCase.errMsg)
}
if !strings.Contains(err.Error(), reqCase.errMsg) {
t.Errorf("expected error to contain: %s, got: %s", reqCase.errMsg, err.Error())
}
continue
}
if err != nil {
t.Errorf("unexpected error: %v", err)
}
assert.Equal(t, reqCase.expectedOperator, filterAttrRequest.AggregateOperator)
assert.Equal(t, reqCase.expectedDataSource, filterAttrRequest.DataSource)
assert.Equal(t, reqCase.expectedAggAttr, filterAttrRequest.AggregateAttribute)
assert.Equal(t, reqCase.expectedLimit, filterAttrRequest.Limit)
assert.Equal(t, reqCase.expectedSearchText, filterAttrRequest.SearchText)
}
}
func TestParseFilterAttributeValueRequest(t *testing.T) {
reqCases := []struct {
desc string
queryString string
expectedOperator v3.AggregateOperator
expectedDataSource v3.DataSource
expectedAggAttr string
expectedFilterAttr string
expectedLimit int
expectedSearchText string
expectErr bool
errMsg string
}{
{
desc: "valid operator and data source",
queryString: "aggregateOperator=sum&dataSource=metrics&aggregateAttribute=metric_name&attributeKey=service_name&searchText=abc",
expectedOperator: v3.AggregateOperatorSum,
expectedDataSource: v3.DataSourceMetrics,
expectedAggAttr: "metric_name",
expectedFilterAttr: "service_name",
expectedLimit: 50,
expectedSearchText: "abc",
},
{
desc: "different valid operator and data source as logs",
queryString: "aggregateOperator=avg&dataSource=logs&aggregateAttribute=bytes&attributeKey=service_name&searchText=abc",
expectedOperator: v3.AggregateOperatorAvg,
expectedDataSource: v3.DataSourceLogs,
expectedAggAttr: "bytes",
expectedFilterAttr: "service_name",
expectedLimit: 50,
expectedSearchText: "abc",
},
{
desc: "different valid operator and with default search text and limit",
queryString: "aggregateOperator=avg&dataSource=metrics&aggregateAttribute=metric_name&attributeKey=service_name",
expectedOperator: v3.AggregateOperatorAvg,
expectedDataSource: v3.DataSourceMetrics,
expectedAggAttr: "metric_name",
expectedFilterAttr: "service_name",
expectedLimit: 50,
expectedSearchText: "",
},
{
desc: "valid operator and data source with limit",
queryString: "aggregateOperator=avg&dataSource=traces&aggregateAttribute=http.req.duration&attributeKey=service_name&limit=10",
expectedOperator: v3.AggregateOperatorAvg,
expectedAggAttr: "http.req.duration",
expectedFilterAttr: "service_name",
expectedDataSource: v3.DataSourceTraces,
expectedLimit: 10,
expectedSearchText: "",
},
{
desc: "invalid operator",
queryString: "aggregateOperator=avg1&dataSource=traces&limit=10",
expectErr: true,
errMsg: "invalid operator",
},
{
desc: "invalid data source",
queryString: "aggregateOperator=avg&dataSource=traces1&limit=10",
expectErr: true,
errMsg: "invalid data source",
},
{
desc: "invalid limit",
queryString: "aggregateOperator=avg&dataSource=traces&limit=abc",
expectedOperator: v3.AggregateOperatorAvg,
expectedDataSource: v3.DataSourceTraces,
expectedLimit: 50,
},
}
for _, reqCase := range reqCases {
r := httptest.NewRequest("GET", "/api/v3/autocomplete/filter_attribute_values?"+reqCase.queryString, nil)
filterAttrRequest, err := parseFilterAttributeValueRequest(r)
if reqCase.expectErr {
if err == nil {
t.Errorf("expected error: %s", reqCase.errMsg)
}
if !strings.Contains(err.Error(), reqCase.errMsg) {
t.Errorf("expected error to contain: %s, got: %s", reqCase.errMsg, err.Error())
}
continue
}
if err != nil {
t.Errorf("unexpected error: %v", err)
}
assert.Equal(t, reqCase.expectedOperator, filterAttrRequest.AggregateOperator)
assert.Equal(t, reqCase.expectedDataSource, filterAttrRequest.DataSource)
assert.Equal(t, reqCase.expectedAggAttr, filterAttrRequest.AggregateAttribute)
assert.Equal(t, reqCase.expectedFilterAttr, filterAttrRequest.FilterAttributeKey)
assert.Equal(t, reqCase.expectedLimit, filterAttrRequest.Limit)
assert.Equal(t, reqCase.expectedSearchText, filterAttrRequest.SearchText)
}
}

View File

@ -58,6 +58,8 @@ type Reader interface {
GetMetricResult(ctx context.Context, query string) ([]*model.Series, error)
GetMetricResultEE(ctx context.Context, query string) ([]*model.Series, string, error)
GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error)
GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
GetTotalSpans(ctx context.Context) (uint64, error)
GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error)