From c46bef321c4088c5ef7bfc310c7b7832b605c6a6 Mon Sep 17 00:00:00 2001 From: Vishal Sharma Date: Wed, 25 Jan 2023 12:35:44 +0530 Subject: [PATCH] feat: tag filter backend changes (#2115) --- .../app/clickhouseReader/reader.go | 389 ++++++++++++------ pkg/query-service/app/parser.go | 58 ++- pkg/query-service/constants/constants.go | 24 ++ pkg/query-service/interfaces/interface.go | 4 +- pkg/query-service/model/queryParams.go | 245 ++++++++--- pkg/query-service/model/response.go | 8 +- 6 files changed, 525 insertions(+), 203 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 492e457c0f..38c3ce09e0 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -772,7 +772,11 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G clickhouse.Named("serviceName", svc), clickhouse.Named("names", ops), ) - args, errStatus := buildQueryWithTagParams(ctx, queryParams.Tags, &query, args) + // create TagQuery from TagQueryParams + tags := createTagQueryFromTagQueryParams(queryParams.Tags) + subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) + query += subQuery + args = append(args, argsSubQuery...) if errStatus != nil { zap.S().Error("Error in processing sql query: ", errStatus) return @@ -791,8 +795,9 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G zap.S().Error("Error in processing sql query: ", err) return } - - args, errStatus = buildQueryWithTagParams(ctx, queryParams.Tags, &errorQuery, args) + subQuery, argsSubQuery, errStatus = buildQueryWithTagParams(ctx, tags) + query += subQuery + args = append(args, argsSubQuery...) err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors) if err != nil { zap.S().Error("Error in processing sql query: ", err) @@ -849,7 +854,12 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams * ) args := []interface{}{} args = append(args, namedArgs...) - args, errStatus := buildQueryWithTagParams(ctx, queryParams.Tags, &query, args) + + // create TagQuery from TagQueryParams + tags := createTagQueryFromTagQueryParams(queryParams.Tags) + subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) + query += subQuery + args = append(args, argsSubQuery...) if errStatus != nil { return nil, errStatus } @@ -875,7 +885,9 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams * ) args = []interface{}{} args = append(args, namedArgs...) - args, errStatus = buildQueryWithTagParams(ctx, queryParams.Tags, &query, args) + subQuery, argsSubQuery, errStatus = buildQueryWithTagParams(ctx, tags) + query += subQuery + args = append(args, argsSubQuery...) if errStatus != nil { return nil, errStatus } @@ -1357,7 +1369,11 @@ func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *mo args = append(args, clickhouse.Named("kind", queryParams.Kind)) } - args, errStatus := buildQueryWithTagParams(ctx, queryParams.Tags, &query, args) + // create TagQuery from TagQueryParams + tags := createTagQueryFromTagQueryParams(queryParams.Tags) + subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) + query += subQuery + args = append(args, argsSubQuery...) if errStatus != nil { return nil, errStatus } @@ -1428,6 +1444,22 @@ func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *mo return &getFilterSpansResponse, nil } +func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery { + tags := []model.TagQuery{} + for _, tag := range queryParams { + if len(tag.StringValues) > 0 { + tags = append(tags, model.NewTagQueryString(tag.Key, tag.StringValues, tag.Operator)) + } + if len(tag.NumberValues) > 0 { + tags = append(tags, model.NewTagQueryNumber(tag.Key, tag.NumberValues, tag.Operator)) + } + if len(tag.BoolValues) > 0 { + tags = append(tags, model.NewTagQueryBool(tag.Key, tag.BoolValues, tag.Operator)) + } + } + return tags +} + func StringWithCharset(length int, charset string) string { b := make([]byte, length) for i := range b { @@ -1440,49 +1472,143 @@ func String(length int) string { return StringWithCharset(length, charset) } -func buildQueryWithTagParams(ctx context.Context, tags []model.TagQuery, query *string, args []interface{}) ([]interface{}, *model.ApiError) { - +func buildQueryWithTagParams(ctx context.Context, tags []model.TagQuery) (string, []interface{}, *model.ApiError) { + subQuery := "" + var args []interface{} for _, item := range tags { - if item.Operator == "in" { - for i, value := range item.Values { - tagKey := "inTagKey" + String(5) - tagValue := "inTagValue" + String(5) - if i == 0 && i == len(item.Values)-1 { - *query += fmt.Sprintf(" AND tagMap[@%s] = @%s", tagKey, tagValue) - } else if i == 0 && i != len(item.Values)-1 { - *query += fmt.Sprintf(" AND (tagMap[@%s] = @%s", tagKey, tagValue) - } else if i != 0 && i == len(item.Values)-1 { - *query += fmt.Sprintf(" OR tagMap[@%s] = @%s)", tagKey, tagValue) - } else { - *query += fmt.Sprintf(" OR tagMap[@%s] = @%s", tagKey, tagValue) - } - args = append(args, clickhouse.Named(tagKey, item.Key)) - args = append(args, clickhouse.Named(tagValue, value)) - } - } else if item.Operator == "not in" { - for i, value := range item.Values { - tagKey := "notinTagKey" + String(5) - tagValue := "notinTagValue" + String(5) - if i == 0 && i == len(item.Values)-1 { - *query += fmt.Sprintf(" AND NOT tagMap[@%s] = @%s", tagKey, tagValue) - } else if i == 0 && i != len(item.Values)-1 { - *query += fmt.Sprintf(" AND NOT (tagMap[@%s] = @%s", tagKey, tagValue) - } else if i != 0 && i == len(item.Values)-1 { - *query += fmt.Sprintf(" OR tagMap[@%s] = @%s)", tagKey, tagValue) - } else { - *query += fmt.Sprintf(" OR tagMap[@%s] = @%s", tagKey, tagValue) - } - args = append(args, clickhouse.Named(tagKey, item.Key)) - args = append(args, clickhouse.Named(tagValue, value)) - } - } else { - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Tag Operator %s not supported", item.Operator)} + tagMapType := "" + switch item.(type) { + case model.TagQueryString: + tagMapType = constants.StringTagMapCol + case model.TagQueryNumber: + tagMapType = constants.NumberTagMapCol + case model.TagQueryBool: + tagMapType = constants.BoolTagMapCol + default: + // type not supported error + return "", nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("type not supported")} + } + switch item.GetOperator() { + case model.EqualOperator: + subQuery, args = addArithmeticOperator(item, tagMapType, "=") + case model.NotEqualOperator: + subQuery, args = addArithmeticOperator(item, tagMapType, "!=") + case model.LessThanOperator: + subQuery, args = addArithmeticOperator(item, tagMapType, "<") + case model.GreaterThanOperator: + subQuery, args = addArithmeticOperator(item, tagMapType, ">") + case model.InOperator: + subQuery, args = addInOperator(item, tagMapType, false) + case model.NotInOperator: + subQuery, args = addInOperator(item, tagMapType, true) + case model.LessThanEqualOperator: + subQuery, args = addArithmeticOperator(item, tagMapType, "<=") + case model.GreaterThanEqualOperator: + subQuery, args = addArithmeticOperator(item, tagMapType, ">=") + case model.ContainsOperator: + subQuery, args = addContainsOperator(item, tagMapType, false) + case model.NotContainsOperator: + subQuery, args = addContainsOperator(item, tagMapType, true) + case model.StartsWithOperator: + subQuery, args = addStartsWithOperator(item, tagMapType, false) + case model.NotStartsWithOperator: + subQuery, args = addStartsWithOperator(item, tagMapType, true) + case model.ExistsOperator: + subQuery, args = addExistsOperator(item, tagMapType, false) + case model.NotExistsOperator: + subQuery, args = addExistsOperator(item, tagMapType, true) + default: + return "", nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Tag Operator %s not supported", item.GetOperator())} } } - return args, nil + return subQuery, args, nil } -func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model.TagFilterParams) (*[]model.TagFilters, *model.ApiError) { +func addInOperator(item model.TagQuery, tagMapType string, not bool) (string, []interface{}) { + values := item.GetValues() + args := []interface{}{} + notStr := "" + if not { + notStr = "NOT" + } + tagValuePair := []string{} + for _, value := range values { + tagKey := "inTagKey" + String(5) + tagValue := "inTagValue" + String(5) + tagValuePair = append(tagValuePair, fmt.Sprintf("%s[@%s] = @%s", tagMapType, tagKey, tagValue)) + args = append(args, clickhouse.Named(tagKey, item.GetKey())) + args = append(args, clickhouse.Named(tagValue, value)) + } + return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagValuePair, " OR ")), args +} + +func addContainsOperator(item model.TagQuery, tagMapType string, not bool) (string, []interface{}) { + values := item.GetValues() + args := []interface{}{} + notStr := "" + if not { + notStr = "NOT" + } + tagValuePair := []string{} + for _, value := range values { + tagKey := "containsTagKey" + String(5) + tagValue := "containsTagValue" + String(5) + tagValuePair = append(tagValuePair, fmt.Sprintf("%s[@%s] ILIKE @%s", tagMapType, tagKey, tagValue)) + args = append(args, clickhouse.Named(tagKey, item.GetKey())) + args = append(args, clickhouse.Named(tagValue, "%"+fmt.Sprintf("%v", value)+"%")) + } + return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagValuePair, " OR ")), args +} + +func addStartsWithOperator(item model.TagQuery, tagMapType string, not bool) (string, []interface{}) { + values := item.GetValues() + args := []interface{}{} + notStr := "" + if not { + notStr = "NOT" + } + tagValuePair := []string{} + for _, value := range values { + tagKey := "startsWithTagKey" + String(5) + tagValue := "startsWithTagValue" + String(5) + tagValuePair = append(tagValuePair, fmt.Sprintf("%s[@%s] ILIKE @%s", tagMapType, tagKey, tagValue)) + args = append(args, clickhouse.Named(tagKey, item.GetKey())) + args = append(args, clickhouse.Named(tagValue, "%"+fmt.Sprintf("%v", value)+"%")) + } + return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagValuePair, " OR ")), args +} + +func addArithmeticOperator(item model.TagQuery, tagMapType string, operator string) (string, []interface{}) { + values := item.GetValues() + args := []interface{}{} + tagValuePair := []string{} + for _, value := range values { + tagKey := "arithmeticTagKey" + String(5) + tagValue := "arithmeticTagValue" + String(5) + tagValuePair = append(tagValuePair, fmt.Sprintf("%s[@%s] %s @%s", tagMapType, tagKey, operator, tagValue)) + args = append(args, clickhouse.Named(tagKey, item.GetKey())) + args = append(args, clickhouse.Named(tagValue, value)) + } + return fmt.Sprintf(" AND (%s)", strings.Join(tagValuePair, " OR ")), args +} + +func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string, []interface{}) { + values := item.GetValues() + notStr := "" + if not { + notStr = "NOT" + } + args := []interface{}{} + tagOperatorPair := []string{} + for range values { + tagKey := "existsTagKey" + String(5) + tagOperatorPair = append(tagOperatorPair, fmt.Sprintf("mapContains(%s, @%s)", tagMapType, tagKey)) + args = append(args, clickhouse.Named(tagKey, item.GetKey())) + } + return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args +} + +func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model.TagFilterParams) (*model.TagFilters, *model.ApiError) { excludeMap := make(map[string]struct{}) for _, e := range queryParams.Exclude { @@ -1541,8 +1667,8 @@ func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model tagFilters := []model.TagFilters{} - finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) - // Alternative query: SELECT groupUniqArrayArray(mapKeys(tagMap)) as tagKeys FROM signoz_index_v2 + // Alternative finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) + finalQuery := fmt.Sprintf(`SELECT groupUniqArrayArray(mapKeys(stringTagMap)) as stringTagKeys, groupUniqArrayArray(mapKeys(numberTagMap)) as numberTagKeys, groupUniqArrayArray(mapKeys(boolTagMap)) as boolTagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) finalQuery += query err := r.db.Select(ctx, &tagFilters, finalQuery, args...) @@ -1552,12 +1678,20 @@ func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model zap.S().Debug("Error in processing sql query: ", err) return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")} } - tagFilters = excludeTags(ctx, tagFilters) - - return &tagFilters, nil + tagFiltersResult := model.TagFilters{ + StringTagKeys: make([]string, 0), + NumberTagKeys: make([]string, 0), + BoolTagKeys: make([]string, 0), + } + if len(tagFilters) != 0 { + tagFiltersResult.StringTagKeys = excludeTags(ctx, tagFilters[0].StringTagKeys) + tagFiltersResult.NumberTagKeys = excludeTags(ctx, tagFilters[0].NumberTagKeys) + tagFiltersResult.BoolTagKeys = excludeTags(ctx, tagFilters[0].BoolTagKeys) + } + return &tagFiltersResult, nil } -func excludeTags(ctx context.Context, tags []model.TagFilters) []model.TagFilters { +func excludeTags(ctx context.Context, tags []string) []string { excludedTagsMap := map[string]bool{ "http.code": true, "http.route": true, @@ -1571,9 +1705,9 @@ func excludeTags(ctx context.Context, tags []model.TagFilters) []model.TagFilter "error": true, "service.name": true, } - var newTags []model.TagFilters + newTags := make([]string, 0) for _, tag := range tags { - _, ok := excludedTagsMap[tag.TagKeys] + _, ok := excludedTagsMap[tag] if !ok { newTags = append(newTags, tag) } @@ -1581,7 +1715,21 @@ func excludeTags(ctx context.Context, tags []model.TagFilters) []model.TagFilter return newTags } -func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.TagFilterParams) (*[]model.TagValues, *model.ApiError) { +func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.TagFilterParams) (*model.TagValues, *model.ApiError) { + + if queryParams.TagKey.Type == model.TagTypeNumber { + return &model.TagValues{ + NumberTagValues: make([]float64, 0), + StringTagValues: make([]string, 0), + BoolTagValues: make([]bool, 0), + }, nil + } else if queryParams.TagKey.Type == model.TagTypeBool { + return &model.TagValues{ + NumberTagValues: make([]float64, 0), + StringTagValues: make([]string, 0), + BoolTagValues: []bool{true, false}, + }, nil + } excludeMap := make(map[string]struct{}) for _, e := range queryParams.Exclude { @@ -1634,10 +1782,12 @@ func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model. tagValues := []model.TagValues{} - finalQuery := fmt.Sprintf(`SELECT tagMap[@key] as tagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) + finalQuery := fmt.Sprintf(`SELECT groupArray(DISTINCT stringTagMap[@key]) as stringTagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) finalQuery += query - finalQuery += " GROUP BY tagMap[@key]" - args = append(args, clickhouse.Named("key", queryParams.TagKey)) + finalQuery += " LIMIT @limit" + + args = append(args, clickhouse.Named("key", queryParams.TagKey.Key)) + args = append(args, clickhouse.Named("limit", queryParams.Limit)) err := r.db.Select(ctx, &tagValues, finalQuery, args...) zap.S().Info(query) @@ -1647,10 +1797,17 @@ func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model. return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")} } - cleanedTagValues := []model.TagValues{} - for _, e := range tagValues { - if e.TagValues != "" { - cleanedTagValues = append(cleanedTagValues, e) + cleanedTagValues := model.TagValues{ + StringTagValues: []string{}, + NumberTagValues: []float64{}, + BoolTagValues: []bool{}, + } + if len(tagValues) == 0 { + return &cleanedTagValues, nil + } + for _, e := range tagValues[0].StringTagValues { + if e != "" { + cleanedTagValues.StringTagValues = append(cleanedTagValues.StringTagValues, e) } } return &cleanedTagValues, nil @@ -1679,7 +1836,11 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo ) args := []interface{}{} args = append(args, namedArgs...) - args, errStatus := buildQueryWithTagParams(ctx, queryParams.Tags, &query, args) + // create TagQuery from TagQueryParams + tags := createTagQueryFromTagQueryParams(queryParams.Tags) + subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) + query += subQuery + args = append(args, argsSubQuery...) if errStatus != nil { return nil, errStatus } @@ -1879,41 +2040,27 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))} var query string - if queryParams.GroupBy != "" { - switch queryParams.GroupBy { - case constants.ServiceName: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, serviceName as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - case constants.HttpCode: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpCode as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - case constants.HttpMethod: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpMethod as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - case constants.HttpUrl: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpUrl as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - case constants.HttpRoute: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpRoute as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - case constants.HttpHost: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpHost as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - case constants.DBName: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbName as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - case constants.DBOperation: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbOperation as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - case constants.OperationRequest: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, name as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - case constants.MsgSystem: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, msgSystem as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - case constants.MsgOperation: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, msgOperation as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - case constants.DBSystem: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbSystem as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - case constants.Component: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, component as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - case constants.RPCMethod: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, rpcMethod as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - case constants.ResponseStatusCode: - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, responseStatusCode as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - - default: - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("groupBy type: %s not supported", queryParams.GroupBy)} + var customStr []string + _, columnExists := constants.GroupByColMap[queryParams.GroupBy] + // Using %s for groupBy params as it can be a custom column and custom columns are not supported by clickhouse-go yet: + // issue link: https://github.com/ClickHouse/clickhouse-go/issues/870 + if queryParams.GroupBy != "" && columnExists { + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, queryParams.GroupBy, aggregation_query, r.TraceDB, r.indexTable) + args = append(args, clickhouse.Named("groupByVar", queryParams.GroupBy)) + } else if queryParams.GroupBy != "" { + customStr = strings.Split(queryParams.GroupBy, ".(") + if len(customStr) < 2 { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)} + } + if customStr[1] == string(model.TagTypeString)+")" { + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, stringTagMap['%s'] as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) + } else if customStr[1] == string(model.TagTypeNumber)+")" { + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(numberTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) + } else if customStr[1] == string(model.TagTypeBool)+")" { + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(boolTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) + } else { + // return error for unsupported group by + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)} } } else { query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) @@ -1966,47 +2113,25 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query query = query + " AND kind = @kind" args = append(args, clickhouse.Named("kind", queryParams.Kind)) } + // create TagQuery from TagQueryParams + tags := createTagQueryFromTagQueryParams(queryParams.Tags) + subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) + query += subQuery + args = append(args, argsSubQuery...) - args, errStatus := buildQueryWithTagParams(ctx, queryParams.Tags, &query, args) if errStatus != nil { return nil, errStatus } - if queryParams.GroupBy != "" { - switch queryParams.GroupBy { - case constants.ServiceName: - query = query + " GROUP BY time, serviceName as groupBy ORDER BY time" - case constants.HttpCode: - query = query + " GROUP BY time, httpCode as groupBy ORDER BY time" - case constants.HttpMethod: - query = query + " GROUP BY time, httpMethod as groupBy ORDER BY time" - case constants.HttpUrl: - query = query + " GROUP BY time, httpUrl as groupBy ORDER BY time" - case constants.HttpRoute: - query = query + " GROUP BY time, httpRoute as groupBy ORDER BY time" - case constants.HttpHost: - query = query + " GROUP BY time, httpHost as groupBy ORDER BY time" - case constants.DBName: - query = query + " GROUP BY time, dbName as groupBy ORDER BY time" - case constants.DBOperation: - query = query + " GROUP BY time, dbOperation as groupBy ORDER BY time" - case constants.OperationRequest: - query = query + " GROUP BY time, name as groupBy ORDER BY time" - case constants.MsgSystem: - query = query + " GROUP BY time, msgSystem as groupBy ORDER BY time" - case constants.MsgOperation: - query = query + " GROUP BY time, msgOperation as groupBy ORDER BY time" - case constants.DBSystem: - query = query + " GROUP BY time, dbSystem as groupBy ORDER BY time" - case constants.Component: - query = query + " GROUP BY time, component as groupBy ORDER BY time" - case constants.RPCMethod: - query = query + " GROUP BY time, rpcMethod as groupBy ORDER BY time" - case constants.ResponseStatusCode: - query = query + " GROUP BY time, responseStatusCode as groupBy ORDER BY time" - - default: - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("groupBy type: %s not supported", queryParams.GroupBy)} + if queryParams.GroupBy != "" && columnExists { + query = query + fmt.Sprintf(" GROUP BY time, %s as groupBy ORDER BY time", queryParams.GroupBy) + } else if queryParams.GroupBy != "" { + if customStr[1] == string(model.TagTypeString)+")" { + query = query + fmt.Sprintf(" GROUP BY time, stringTagMap['%s'] as groupBy ORDER BY time", customStr[0]) + } else if customStr[1] == string(model.TagTypeNumber)+")" { + query = query + fmt.Sprintf(" GROUP BY time, toString(numberTagMap['%s']) as groupBy ORDER BY time", customStr[0]) + } else if customStr[1] == string(model.TagTypeBool)+")" { + query = query + fmt.Sprintf(" GROUP BY time, toString(boolTagMap['%s']) as groupBy ORDER BY time", customStr[0]) } } else { query = query + " GROUP BY time ORDER BY time" diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 67903594c7..769922e519 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -7,6 +7,7 @@ import ( "math" "net/http" "strconv" + "strings" "time" "github.com/gorilla/mux" @@ -314,7 +315,11 @@ func parseFilteredSpansRequest(r *http.Request, aH *APIHandler) (*model.GetFilte return nil, model.ErrFeatureUnavailable{Key: constants.TimestampSort} } } - + tags, err := extractTagKeys(postData.Tags) + if err != nil { + return nil, err + } + postData.Tags = tags return postData, nil } @@ -387,17 +392,42 @@ func parseFilteredSpanAggregatesRequest(r *http.Request) (*model.GetFilteredSpan postData.AggregationOption = aggregationOption postData.Dimension = dimension - // tags, err := parseTagsV2("tags", r) - // if err != nil { - // return nil, err - // } - // if len(*tags) != 0 { - // params.Tags = *tags - // } + tags, err := extractTagKeys(postData.Tags) + if err != nil { + return nil, err + } + postData.Tags = tags return postData, nil } +func extractTagKeys(tags []model.TagQueryParam) ([]model.TagQueryParam, error) { + newTags := make([]model.TagQueryParam, 0) + if len(tags) != 0 { + for _, tag := range tags { + customStr := strings.Split(tag.Key, ".(") + if len(customStr) < 2 { + return nil, fmt.Errorf("TagKey param is not valid in query") + } else { + tag.Key = customStr[0] + } + if tag.Operator == model.ExistsOperator || tag.Operator == model.NotExistsOperator { + if customStr[1] == string(model.TagTypeString) + ")" { + tag.StringValues = []string{" "} + } else if customStr[1] ==string(model.TagTypeBool) + ")" { + tag.BoolValues = []bool{true} + } else if customStr[1] == string(model.TagTypeNumber) + ")" { + tag.NumberValues = []float64{0} + } else { + return nil, fmt.Errorf("TagKey param is not valid in query") + } + } + newTags = append(newTags, tag) + } + } + return newTags, nil +} + func parseTagFilterRequest(r *http.Request) (*model.TagFilterParams, error) { var postData *model.TagFilterParams err := json.NewDecoder(r.Body).Decode(&postData) @@ -426,8 +456,16 @@ func parseTagValueRequest(r *http.Request) (*model.TagFilterParams, error) { if err != nil { return nil, err } - if postData.TagKey == "" { - return nil, fmt.Errorf("%s param missing in query", postData.TagKey) + if postData.TagKey == (model.TagKey{}) { + return nil, fmt.Errorf("TagKey param missing in query") + } + + if postData.TagKey.Type != model.TagTypeString && postData.TagKey.Type != model.TagTypeBool && postData.TagKey.Type != model.TagTypeNumber { + return nil, fmt.Errorf("tag keys type %s is not supported", postData.TagKey.Type) + } + + if postData.Limit == 0 { + postData.Limit = 100 } postData.Start, err = parseTimeStr(postData.StartStr, "start") diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 7a266cb2fc..5bdc147c60 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -109,6 +109,24 @@ const ( DefaultLogSkipIndexType = "bloom_filter(0.01)" DefaultLogSkipIndexGranularity = 64 ) + +var GroupByColMap = map[string]struct{}{ + ServiceName: {}, + HttpHost: {}, + HttpRoute: {}, + HttpUrl: {}, + HttpMethod: {}, + Component: {}, + OperationDB: {}, + DBName: {}, + DBOperation: {}, + DBSystem: {}, + MsgOperation: {}, + MsgSystem: {}, + RPCMethod: {}, + ResponseStatusCode: {}, +} + const ( SIGNOZ_METRIC_DBNAME = "signoz_metrics" SIGNOZ_SAMPLES_TABLENAME = "distributed_samples_v2" @@ -195,3 +213,9 @@ const ( // written clickhouse query. The column alias indcate which value is // to be considered as final result (or target) var ReservedColumnTargetAliases = map[string]bool{"result": true, "res": true, "value": true} + +const ( + StringTagMapCol = "stringTagMap" + NumberTagMapCol = "numberTagMap" + BoolTagMapCol = "boolTagMap" +) diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index bcaf889ab6..a551e4b7f8 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -34,8 +34,8 @@ type Reader interface { // clickhouse only. GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError) GetSpanFilters(ctx context.Context, query *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError) - GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*[]model.TagFilters, *model.ApiError) - GetTagValues(ctx context.Context, query *model.TagFilterParams) (*[]model.TagValues, *model.ApiError) + GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*model.TagFilters, *model.ApiError) + GetTagValues(ctx context.Context, query *model.TagFilterParams) (*model.TagValues, *model.ApiError) GetFilteredSpans(ctx context.Context, query *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) GetFilteredSpansAggregates(ctx context.Context, query *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 7262b238b7..1f7e2daed8 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -152,7 +152,7 @@ type GetTopOperationsParams struct { ServiceName string `json:"service"` Start *time.Time End *time.Time - Tags []TagQuery `json:"tags"` + Tags []TagQueryParam `json:"tags"` } type GetUsageParams struct { @@ -171,7 +171,7 @@ type GetServicesParams struct { Period int Start *time.Time End *time.Time - Tags []TagQuery `json:"tags"` + Tags []TagQueryParam `json:"tags"` } type GetServiceOverviewParams struct { @@ -180,70 +180,187 @@ type GetServiceOverviewParams struct { Period string Start *time.Time End *time.Time - Tags []TagQuery `json:"tags"` - ServiceName string `json:"service"` - StepSeconds int `json:"step"` + Tags []TagQueryParam `json:"tags"` + ServiceName string `json:"service"` + StepSeconds int `json:"step"` } -type TagQuery struct { - Key string - Values []string - Operator string +type TagQueryParam struct { + Key string `json:"key"` + StringValues []string `json:"stringValues"` + BoolValues []bool `json:"boolValues"` + NumberValues []float64 `json:"numberValues"` + Operator Operator `json:"operator"` +} + +type Operator string + +const ( + InOperator Operator = "In" + NotInOperator Operator = "NotIn" + EqualOperator Operator = "Equals" + NotEqualOperator Operator = "NotEquals" + ExistsOperator Operator = "Exists" + NotExistsOperator Operator = "NotExists" + ContainsOperator Operator = "Contains" + NotContainsOperator Operator = "NotContains" + LessThanOperator Operator = "LessThan" + GreaterThanOperator Operator = "GreaterThan" + LessThanEqualOperator Operator = "LessThanEquals" + GreaterThanEqualOperator Operator = "GreaterThanEquals" + StartsWithOperator Operator = "StartsWith" + NotStartsWithOperator Operator = "NotStartsWith" +) + +type TagQuery interface { + GetKey() string + GetValues() []interface{} + GetOperator() Operator +} + +type TagQueryString struct { + key string + values []string + operator Operator +} + +func NewTagQueryString(key string, values []string, operator Operator) TagQueryString { + return TagQueryString{ + key: key, + values: values, + operator: operator, + } +} + +func (tqn TagQueryNumber) GetKey() string { + return tqn.key +} + +func (tqs TagQueryString) GetValues() []interface{} { + values := make([]interface{}, len(tqs.values)) + for i, v := range tqs.values { + values[i] = v + } + return values +} + +func (tqs TagQueryString) GetOperator() Operator { + return tqs.operator +} + +type TagQueryBool struct { + key string + values []bool + operator Operator +} + +func NewTagQueryBool(key string, values []bool, operator Operator) TagQueryBool { + return TagQueryBool{ + key: key, + values: values, + operator: operator, + } +} + +func (tqb TagQueryBool) GetKey() string { + return tqb.key +} + +func (tqb TagQueryBool) GetValues() []interface{} { + values := make([]interface{}, len(tqb.values)) + for i, v := range tqb.values { + values[i] = v + } + return values +} + +func (tqb TagQueryBool) GetOperator() Operator { + return tqb.operator +} + +type TagQueryNumber struct { + key string + values []float64 + operator Operator +} + +func NewTagQueryNumber(key string, values []float64, operator Operator) TagQueryNumber { + return TagQueryNumber{ + key: key, + values: values, + operator: operator, + } +} + +func (tqs TagQueryString) GetKey() string { + return tqs.key +} + +func (tqn TagQueryNumber) GetValues() []interface{} { + values := make([]interface{}, len(tqn.values)) + for i, v := range tqn.values { + values[i] = v + } + return values +} + +func (tqn TagQueryNumber) GetOperator() Operator { + return tqn.operator } type GetFilteredSpansParams struct { - TraceID []string `json:"traceID"` - ServiceName []string `json:"serviceName"` - Operation []string `json:"operation"` - Kind string `json:"kind"` - Status []string `json:"status"` - HttpRoute []string `json:"httpRoute"` - HttpCode []string `json:"httpCode"` - HttpUrl []string `json:"httpUrl"` - HttpHost []string `json:"httpHost"` - HttpMethod []string `json:"httpMethod"` - Component []string `json:"component"` - RPCMethod []string `json:"rpcMethod"` - ResponseStatusCode []string `json:"responseStatusCode"` - StartStr string `json:"start"` - EndStr string `json:"end"` - MinDuration string `json:"minDuration"` - MaxDuration string `json:"maxDuration"` - Limit int64 `json:"limit"` - OrderParam string `json:"orderParam"` - Order string `json:"order"` - Offset int64 `json:"offset"` - Tags []TagQuery `json:"tags"` - Exclude []string `json:"exclude"` + TraceID []string `json:"traceID"` + ServiceName []string `json:"serviceName"` + Operation []string `json:"operation"` + Kind string `json:"kind"` + Status []string `json:"status"` + HttpRoute []string `json:"httpRoute"` + HttpCode []string `json:"httpCode"` + HttpUrl []string `json:"httpUrl"` + HttpHost []string `json:"httpHost"` + HttpMethod []string `json:"httpMethod"` + Component []string `json:"component"` + RPCMethod []string `json:"rpcMethod"` + ResponseStatusCode []string `json:"responseStatusCode"` + StartStr string `json:"start"` + EndStr string `json:"end"` + MinDuration string `json:"minDuration"` + MaxDuration string `json:"maxDuration"` + Limit int64 `json:"limit"` + OrderParam string `json:"orderParam"` + Order string `json:"order"` + Offset int64 `json:"offset"` + Tags []TagQueryParam `json:"tags"` + Exclude []string `json:"exclude"` Start *time.Time End *time.Time } type GetFilteredSpanAggregatesParams struct { - TraceID []string `json:"traceID"` - ServiceName []string `json:"serviceName"` - Operation []string `json:"operation"` - Kind string `json:"kind"` - Status []string `json:"status"` - HttpRoute []string `json:"httpRoute"` - HttpCode []string `json:"httpCode"` - HttpUrl []string `json:"httpUrl"` - HttpHost []string `json:"httpHost"` - HttpMethod []string `json:"httpMethod"` - Component []string `json:"component"` - RPCMethod []string `json:"rpcMethod"` - ResponseStatusCode []string `json:"responseStatusCode"` - MinDuration string `json:"minDuration"` - MaxDuration string `json:"maxDuration"` - Tags []TagQuery `json:"tags"` - StartStr string `json:"start"` - EndStr string `json:"end"` - StepSeconds int `json:"step"` - Dimension string `json:"dimension"` - AggregationOption string `json:"aggregationOption"` - GroupBy string `json:"groupBy"` - Function string `json:"function"` - Exclude []string `json:"exclude"` + TraceID []string `json:"traceID"` + ServiceName []string `json:"serviceName"` + Operation []string `json:"operation"` + Kind string `json:"kind"` + Status []string `json:"status"` + HttpRoute []string `json:"httpRoute"` + HttpCode []string `json:"httpCode"` + HttpUrl []string `json:"httpUrl"` + HttpHost []string `json:"httpHost"` + HttpMethod []string `json:"httpMethod"` + Component []string `json:"component"` + RPCMethod []string `json:"rpcMethod"` + ResponseStatusCode []string `json:"responseStatusCode"` + MinDuration string `json:"minDuration"` + MaxDuration string `json:"maxDuration"` + Tags []TagQueryParam `json:"tags"` + StartStr string `json:"start"` + EndStr string `json:"end"` + StepSeconds int `json:"step"` + Dimension string `json:"dimension"` + AggregationOption string `json:"aggregationOption"` + GroupBy string `json:"groupBy"` + Function string `json:"function"` + Exclude []string `json:"exclude"` Start *time.Time End *time.Time } @@ -289,11 +406,25 @@ type TagFilterParams struct { MaxDuration string `json:"maxDuration"` StartStr string `json:"start"` EndStr string `json:"end"` - TagKey string `json:"tagKey"` + TagKey TagKey `json:"tagKey"` + Limit int `json:"limit"` Start *time.Time End *time.Time } +type TagType string + +const ( + TagTypeString TagType = "string" + TagTypeNumber TagType = "number" + TagTypeBool TagType = "bool" +) + +type TagKey struct { + Key string `json:"key"` + Type TagType `json:"type"` +} + type TTLParams struct { Type string // It can be one of {traces, metrics}. ColdStorageVolume string // Name of the cold storage volume. diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index bcc85749fa..01afb950e4 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -275,11 +275,15 @@ type TopOperationsItem struct { } type TagFilters struct { - TagKeys string `json:"tagKeys" ch:"tagKeys"` + StringTagKeys []string `json:"stringTagKeys" ch:"stringTagKeys"` + NumberTagKeys []string `json:"numberTagKeys" ch:"numberTagKeys"` + BoolTagKeys []string `json:"boolTagKeys" ch:"boolTagKeys"` } type TagValues struct { - TagValues string `json:"tagValues" ch:"tagValues"` + StringTagValues []string `json:"stringTagValues" ch:"stringTagValues"` + BoolTagValues []bool `json:"boolTagValues" ch:"boolTagValues"` + NumberTagValues []float64 `json:"numberTagValues" ch:"numberTagValues"` } type ServiceMapDependencyResponseItem struct {