feat: tag filter backend changes (#2115)

This commit is contained in:
Vishal Sharma 2023-01-25 12:35:44 +05:30 committed by GitHub
parent ba8f804b26
commit c46bef321c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 525 additions and 203 deletions

View File

@ -772,7 +772,11 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
clickhouse.Named("serviceName", svc),
clickhouse.Named("names", ops),
)
args, errStatus := buildQueryWithTagParams(ctx, queryParams.Tags, &query, args)
// create TagQuery from TagQueryParams
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
query += subQuery
args = append(args, argsSubQuery...)
if errStatus != nil {
zap.S().Error("Error in processing sql query: ", errStatus)
return
@ -791,8 +795,9 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
zap.S().Error("Error in processing sql query: ", err)
return
}
args, errStatus = buildQueryWithTagParams(ctx, queryParams.Tags, &errorQuery, args)
subQuery, argsSubQuery, errStatus = buildQueryWithTagParams(ctx, tags)
query += subQuery
args = append(args, argsSubQuery...)
err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors)
if err != nil {
zap.S().Error("Error in processing sql query: ", err)
@ -849,7 +854,12 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *
)
args := []interface{}{}
args = append(args, namedArgs...)
args, errStatus := buildQueryWithTagParams(ctx, queryParams.Tags, &query, args)
// create TagQuery from TagQueryParams
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
query += subQuery
args = append(args, argsSubQuery...)
if errStatus != nil {
return nil, errStatus
}
@ -875,7 +885,9 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *
)
args = []interface{}{}
args = append(args, namedArgs...)
args, errStatus = buildQueryWithTagParams(ctx, queryParams.Tags, &query, args)
subQuery, argsSubQuery, errStatus = buildQueryWithTagParams(ctx, tags)
query += subQuery
args = append(args, argsSubQuery...)
if errStatus != nil {
return nil, errStatus
}
@ -1357,7 +1369,11 @@ func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *mo
args = append(args, clickhouse.Named("kind", queryParams.Kind))
}
args, errStatus := buildQueryWithTagParams(ctx, queryParams.Tags, &query, args)
// create TagQuery from TagQueryParams
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
query += subQuery
args = append(args, argsSubQuery...)
if errStatus != nil {
return nil, errStatus
}
@ -1428,6 +1444,22 @@ func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *mo
return &getFilterSpansResponse, nil
}
func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery {
tags := []model.TagQuery{}
for _, tag := range queryParams {
if len(tag.StringValues) > 0 {
tags = append(tags, model.NewTagQueryString(tag.Key, tag.StringValues, tag.Operator))
}
if len(tag.NumberValues) > 0 {
tags = append(tags, model.NewTagQueryNumber(tag.Key, tag.NumberValues, tag.Operator))
}
if len(tag.BoolValues) > 0 {
tags = append(tags, model.NewTagQueryBool(tag.Key, tag.BoolValues, tag.Operator))
}
}
return tags
}
func StringWithCharset(length int, charset string) string {
b := make([]byte, length)
for i := range b {
@ -1440,49 +1472,143 @@ func String(length int) string {
return StringWithCharset(length, charset)
}
func buildQueryWithTagParams(ctx context.Context, tags []model.TagQuery, query *string, args []interface{}) ([]interface{}, *model.ApiError) {
func buildQueryWithTagParams(ctx context.Context, tags []model.TagQuery) (string, []interface{}, *model.ApiError) {
subQuery := ""
var args []interface{}
for _, item := range tags {
if item.Operator == "in" {
for i, value := range item.Values {
tagKey := "inTagKey" + String(5)
tagValue := "inTagValue" + String(5)
if i == 0 && i == len(item.Values)-1 {
*query += fmt.Sprintf(" AND tagMap[@%s] = @%s", tagKey, tagValue)
} else if i == 0 && i != len(item.Values)-1 {
*query += fmt.Sprintf(" AND (tagMap[@%s] = @%s", tagKey, tagValue)
} else if i != 0 && i == len(item.Values)-1 {
*query += fmt.Sprintf(" OR tagMap[@%s] = @%s)", tagKey, tagValue)
} else {
*query += fmt.Sprintf(" OR tagMap[@%s] = @%s", tagKey, tagValue)
}
args = append(args, clickhouse.Named(tagKey, item.Key))
args = append(args, clickhouse.Named(tagValue, value))
}
} else if item.Operator == "not in" {
for i, value := range item.Values {
tagKey := "notinTagKey" + String(5)
tagValue := "notinTagValue" + String(5)
if i == 0 && i == len(item.Values)-1 {
*query += fmt.Sprintf(" AND NOT tagMap[@%s] = @%s", tagKey, tagValue)
} else if i == 0 && i != len(item.Values)-1 {
*query += fmt.Sprintf(" AND NOT (tagMap[@%s] = @%s", tagKey, tagValue)
} else if i != 0 && i == len(item.Values)-1 {
*query += fmt.Sprintf(" OR tagMap[@%s] = @%s)", tagKey, tagValue)
} else {
*query += fmt.Sprintf(" OR tagMap[@%s] = @%s", tagKey, tagValue)
}
args = append(args, clickhouse.Named(tagKey, item.Key))
args = append(args, clickhouse.Named(tagValue, value))
}
} else {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Tag Operator %s not supported", item.Operator)}
tagMapType := ""
switch item.(type) {
case model.TagQueryString:
tagMapType = constants.StringTagMapCol
case model.TagQueryNumber:
tagMapType = constants.NumberTagMapCol
case model.TagQueryBool:
tagMapType = constants.BoolTagMapCol
default:
// type not supported error
return "", nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("type not supported")}
}
switch item.GetOperator() {
case model.EqualOperator:
subQuery, args = addArithmeticOperator(item, tagMapType, "=")
case model.NotEqualOperator:
subQuery, args = addArithmeticOperator(item, tagMapType, "!=")
case model.LessThanOperator:
subQuery, args = addArithmeticOperator(item, tagMapType, "<")
case model.GreaterThanOperator:
subQuery, args = addArithmeticOperator(item, tagMapType, ">")
case model.InOperator:
subQuery, args = addInOperator(item, tagMapType, false)
case model.NotInOperator:
subQuery, args = addInOperator(item, tagMapType, true)
case model.LessThanEqualOperator:
subQuery, args = addArithmeticOperator(item, tagMapType, "<=")
case model.GreaterThanEqualOperator:
subQuery, args = addArithmeticOperator(item, tagMapType, ">=")
case model.ContainsOperator:
subQuery, args = addContainsOperator(item, tagMapType, false)
case model.NotContainsOperator:
subQuery, args = addContainsOperator(item, tagMapType, true)
case model.StartsWithOperator:
subQuery, args = addStartsWithOperator(item, tagMapType, false)
case model.NotStartsWithOperator:
subQuery, args = addStartsWithOperator(item, tagMapType, true)
case model.ExistsOperator:
subQuery, args = addExistsOperator(item, tagMapType, false)
case model.NotExistsOperator:
subQuery, args = addExistsOperator(item, tagMapType, true)
default:
return "", nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Tag Operator %s not supported", item.GetOperator())}
}
}
return args, nil
return subQuery, args, nil
}
func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model.TagFilterParams) (*[]model.TagFilters, *model.ApiError) {
func addInOperator(item model.TagQuery, tagMapType string, not bool) (string, []interface{}) {
values := item.GetValues()
args := []interface{}{}
notStr := ""
if not {
notStr = "NOT"
}
tagValuePair := []string{}
for _, value := range values {
tagKey := "inTagKey" + String(5)
tagValue := "inTagValue" + String(5)
tagValuePair = append(tagValuePair, fmt.Sprintf("%s[@%s] = @%s", tagMapType, tagKey, tagValue))
args = append(args, clickhouse.Named(tagKey, item.GetKey()))
args = append(args, clickhouse.Named(tagValue, value))
}
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagValuePair, " OR ")), args
}
func addContainsOperator(item model.TagQuery, tagMapType string, not bool) (string, []interface{}) {
values := item.GetValues()
args := []interface{}{}
notStr := ""
if not {
notStr = "NOT"
}
tagValuePair := []string{}
for _, value := range values {
tagKey := "containsTagKey" + String(5)
tagValue := "containsTagValue" + String(5)
tagValuePair = append(tagValuePair, fmt.Sprintf("%s[@%s] ILIKE @%s", tagMapType, tagKey, tagValue))
args = append(args, clickhouse.Named(tagKey, item.GetKey()))
args = append(args, clickhouse.Named(tagValue, "%"+fmt.Sprintf("%v", value)+"%"))
}
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagValuePair, " OR ")), args
}
func addStartsWithOperator(item model.TagQuery, tagMapType string, not bool) (string, []interface{}) {
values := item.GetValues()
args := []interface{}{}
notStr := ""
if not {
notStr = "NOT"
}
tagValuePair := []string{}
for _, value := range values {
tagKey := "startsWithTagKey" + String(5)
tagValue := "startsWithTagValue" + String(5)
tagValuePair = append(tagValuePair, fmt.Sprintf("%s[@%s] ILIKE @%s", tagMapType, tagKey, tagValue))
args = append(args, clickhouse.Named(tagKey, item.GetKey()))
args = append(args, clickhouse.Named(tagValue, "%"+fmt.Sprintf("%v", value)+"%"))
}
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagValuePair, " OR ")), args
}
func addArithmeticOperator(item model.TagQuery, tagMapType string, operator string) (string, []interface{}) {
values := item.GetValues()
args := []interface{}{}
tagValuePair := []string{}
for _, value := range values {
tagKey := "arithmeticTagKey" + String(5)
tagValue := "arithmeticTagValue" + String(5)
tagValuePair = append(tagValuePair, fmt.Sprintf("%s[@%s] %s @%s", tagMapType, tagKey, operator, tagValue))
args = append(args, clickhouse.Named(tagKey, item.GetKey()))
args = append(args, clickhouse.Named(tagValue, value))
}
return fmt.Sprintf(" AND (%s)", strings.Join(tagValuePair, " OR ")), args
}
func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string, []interface{}) {
values := item.GetValues()
notStr := ""
if not {
notStr = "NOT"
}
args := []interface{}{}
tagOperatorPair := []string{}
for range values {
tagKey := "existsTagKey" + String(5)
tagOperatorPair = append(tagOperatorPair, fmt.Sprintf("mapContains(%s, @%s)", tagMapType, tagKey))
args = append(args, clickhouse.Named(tagKey, item.GetKey()))
}
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args
}
func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model.TagFilterParams) (*model.TagFilters, *model.ApiError) {
excludeMap := make(map[string]struct{})
for _, e := range queryParams.Exclude {
@ -1541,8 +1667,8 @@ func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model
tagFilters := []model.TagFilters{}
finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
// Alternative query: SELECT groupUniqArrayArray(mapKeys(tagMap)) as tagKeys FROM signoz_index_v2
// Alternative finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
finalQuery := fmt.Sprintf(`SELECT groupUniqArrayArray(mapKeys(stringTagMap)) as stringTagKeys, groupUniqArrayArray(mapKeys(numberTagMap)) as numberTagKeys, groupUniqArrayArray(mapKeys(boolTagMap)) as boolTagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
finalQuery += query
err := r.db.Select(ctx, &tagFilters, finalQuery, args...)
@ -1552,12 +1678,20 @@ func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
}
tagFilters = excludeTags(ctx, tagFilters)
return &tagFilters, nil
tagFiltersResult := model.TagFilters{
StringTagKeys: make([]string, 0),
NumberTagKeys: make([]string, 0),
BoolTagKeys: make([]string, 0),
}
if len(tagFilters) != 0 {
tagFiltersResult.StringTagKeys = excludeTags(ctx, tagFilters[0].StringTagKeys)
tagFiltersResult.NumberTagKeys = excludeTags(ctx, tagFilters[0].NumberTagKeys)
tagFiltersResult.BoolTagKeys = excludeTags(ctx, tagFilters[0].BoolTagKeys)
}
return &tagFiltersResult, nil
}
func excludeTags(ctx context.Context, tags []model.TagFilters) []model.TagFilters {
func excludeTags(ctx context.Context, tags []string) []string {
excludedTagsMap := map[string]bool{
"http.code": true,
"http.route": true,
@ -1571,9 +1705,9 @@ func excludeTags(ctx context.Context, tags []model.TagFilters) []model.TagFilter
"error": true,
"service.name": true,
}
var newTags []model.TagFilters
newTags := make([]string, 0)
for _, tag := range tags {
_, ok := excludedTagsMap[tag.TagKeys]
_, ok := excludedTagsMap[tag]
if !ok {
newTags = append(newTags, tag)
}
@ -1581,7 +1715,21 @@ func excludeTags(ctx context.Context, tags []model.TagFilters) []model.TagFilter
return newTags
}
func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.TagFilterParams) (*[]model.TagValues, *model.ApiError) {
func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.TagFilterParams) (*model.TagValues, *model.ApiError) {
if queryParams.TagKey.Type == model.TagTypeNumber {
return &model.TagValues{
NumberTagValues: make([]float64, 0),
StringTagValues: make([]string, 0),
BoolTagValues: make([]bool, 0),
}, nil
} else if queryParams.TagKey.Type == model.TagTypeBool {
return &model.TagValues{
NumberTagValues: make([]float64, 0),
StringTagValues: make([]string, 0),
BoolTagValues: []bool{true, false},
}, nil
}
excludeMap := make(map[string]struct{})
for _, e := range queryParams.Exclude {
@ -1634,10 +1782,12 @@ func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.
tagValues := []model.TagValues{}
finalQuery := fmt.Sprintf(`SELECT tagMap[@key] as tagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
finalQuery := fmt.Sprintf(`SELECT groupArray(DISTINCT stringTagMap[@key]) as stringTagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
finalQuery += query
finalQuery += " GROUP BY tagMap[@key]"
args = append(args, clickhouse.Named("key", queryParams.TagKey))
finalQuery += " LIMIT @limit"
args = append(args, clickhouse.Named("key", queryParams.TagKey.Key))
args = append(args, clickhouse.Named("limit", queryParams.Limit))
err := r.db.Select(ctx, &tagValues, finalQuery, args...)
zap.S().Info(query)
@ -1647,10 +1797,17 @@ func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
}
cleanedTagValues := []model.TagValues{}
for _, e := range tagValues {
if e.TagValues != "" {
cleanedTagValues = append(cleanedTagValues, e)
cleanedTagValues := model.TagValues{
StringTagValues: []string{},
NumberTagValues: []float64{},
BoolTagValues: []bool{},
}
if len(tagValues) == 0 {
return &cleanedTagValues, nil
}
for _, e := range tagValues[0].StringTagValues {
if e != "" {
cleanedTagValues.StringTagValues = append(cleanedTagValues.StringTagValues, e)
}
}
return &cleanedTagValues, nil
@ -1679,7 +1836,11 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo
)
args := []interface{}{}
args = append(args, namedArgs...)
args, errStatus := buildQueryWithTagParams(ctx, queryParams.Tags, &query, args)
// create TagQuery from TagQueryParams
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
query += subQuery
args = append(args, argsSubQuery...)
if errStatus != nil {
return nil, errStatus
}
@ -1879,41 +2040,27 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
var query string
if queryParams.GroupBy != "" {
switch queryParams.GroupBy {
case constants.ServiceName:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, serviceName as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
case constants.HttpCode:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpCode as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
case constants.HttpMethod:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpMethod as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
case constants.HttpUrl:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpUrl as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
case constants.HttpRoute:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpRoute as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
case constants.HttpHost:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpHost as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
case constants.DBName:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbName as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
case constants.DBOperation:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbOperation as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
case constants.OperationRequest:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, name as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
case constants.MsgSystem:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, msgSystem as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
case constants.MsgOperation:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, msgOperation as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
case constants.DBSystem:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbSystem as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
case constants.Component:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, component as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
case constants.RPCMethod:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, rpcMethod as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
case constants.ResponseStatusCode:
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, responseStatusCode as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
default:
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("groupBy type: %s not supported", queryParams.GroupBy)}
var customStr []string
_, columnExists := constants.GroupByColMap[queryParams.GroupBy]
// Using %s for groupBy params as it can be a custom column and custom columns are not supported by clickhouse-go yet:
// issue link: https://github.com/ClickHouse/clickhouse-go/issues/870
if queryParams.GroupBy != "" && columnExists {
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, queryParams.GroupBy, aggregation_query, r.TraceDB, r.indexTable)
args = append(args, clickhouse.Named("groupByVar", queryParams.GroupBy))
} else if queryParams.GroupBy != "" {
customStr = strings.Split(queryParams.GroupBy, ".(")
if len(customStr) < 2 {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)}
}
if customStr[1] == string(model.TagTypeString)+")" {
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, stringTagMap['%s'] as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
} else if customStr[1] == string(model.TagTypeNumber)+")" {
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(numberTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
} else if customStr[1] == string(model.TagTypeBool)+")" {
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(boolTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
} else {
// return error for unsupported group by
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)}
}
} else {
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
@ -1966,47 +2113,25 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query
query = query + " AND kind = @kind"
args = append(args, clickhouse.Named("kind", queryParams.Kind))
}
// create TagQuery from TagQueryParams
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
query += subQuery
args = append(args, argsSubQuery...)
args, errStatus := buildQueryWithTagParams(ctx, queryParams.Tags, &query, args)
if errStatus != nil {
return nil, errStatus
}
if queryParams.GroupBy != "" {
switch queryParams.GroupBy {
case constants.ServiceName:
query = query + " GROUP BY time, serviceName as groupBy ORDER BY time"
case constants.HttpCode:
query = query + " GROUP BY time, httpCode as groupBy ORDER BY time"
case constants.HttpMethod:
query = query + " GROUP BY time, httpMethod as groupBy ORDER BY time"
case constants.HttpUrl:
query = query + " GROUP BY time, httpUrl as groupBy ORDER BY time"
case constants.HttpRoute:
query = query + " GROUP BY time, httpRoute as groupBy ORDER BY time"
case constants.HttpHost:
query = query + " GROUP BY time, httpHost as groupBy ORDER BY time"
case constants.DBName:
query = query + " GROUP BY time, dbName as groupBy ORDER BY time"
case constants.DBOperation:
query = query + " GROUP BY time, dbOperation as groupBy ORDER BY time"
case constants.OperationRequest:
query = query + " GROUP BY time, name as groupBy ORDER BY time"
case constants.MsgSystem:
query = query + " GROUP BY time, msgSystem as groupBy ORDER BY time"
case constants.MsgOperation:
query = query + " GROUP BY time, msgOperation as groupBy ORDER BY time"
case constants.DBSystem:
query = query + " GROUP BY time, dbSystem as groupBy ORDER BY time"
case constants.Component:
query = query + " GROUP BY time, component as groupBy ORDER BY time"
case constants.RPCMethod:
query = query + " GROUP BY time, rpcMethod as groupBy ORDER BY time"
case constants.ResponseStatusCode:
query = query + " GROUP BY time, responseStatusCode as groupBy ORDER BY time"
default:
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("groupBy type: %s not supported", queryParams.GroupBy)}
if queryParams.GroupBy != "" && columnExists {
query = query + fmt.Sprintf(" GROUP BY time, %s as groupBy ORDER BY time", queryParams.GroupBy)
} else if queryParams.GroupBy != "" {
if customStr[1] == string(model.TagTypeString)+")" {
query = query + fmt.Sprintf(" GROUP BY time, stringTagMap['%s'] as groupBy ORDER BY time", customStr[0])
} else if customStr[1] == string(model.TagTypeNumber)+")" {
query = query + fmt.Sprintf(" GROUP BY time, toString(numberTagMap['%s']) as groupBy ORDER BY time", customStr[0])
} else if customStr[1] == string(model.TagTypeBool)+")" {
query = query + fmt.Sprintf(" GROUP BY time, toString(boolTagMap['%s']) as groupBy ORDER BY time", customStr[0])
}
} else {
query = query + " GROUP BY time ORDER BY time"

View File

@ -7,6 +7,7 @@ import (
"math"
"net/http"
"strconv"
"strings"
"time"
"github.com/gorilla/mux"
@ -314,7 +315,11 @@ func parseFilteredSpansRequest(r *http.Request, aH *APIHandler) (*model.GetFilte
return nil, model.ErrFeatureUnavailable{Key: constants.TimestampSort}
}
}
tags, err := extractTagKeys(postData.Tags)
if err != nil {
return nil, err
}
postData.Tags = tags
return postData, nil
}
@ -387,17 +392,42 @@ func parseFilteredSpanAggregatesRequest(r *http.Request) (*model.GetFilteredSpan
postData.AggregationOption = aggregationOption
postData.Dimension = dimension
// tags, err := parseTagsV2("tags", r)
// if err != nil {
// return nil, err
// }
// if len(*tags) != 0 {
// params.Tags = *tags
// }
tags, err := extractTagKeys(postData.Tags)
if err != nil {
return nil, err
}
postData.Tags = tags
return postData, nil
}
func extractTagKeys(tags []model.TagQueryParam) ([]model.TagQueryParam, error) {
newTags := make([]model.TagQueryParam, 0)
if len(tags) != 0 {
for _, tag := range tags {
customStr := strings.Split(tag.Key, ".(")
if len(customStr) < 2 {
return nil, fmt.Errorf("TagKey param is not valid in query")
} else {
tag.Key = customStr[0]
}
if tag.Operator == model.ExistsOperator || tag.Operator == model.NotExistsOperator {
if customStr[1] == string(model.TagTypeString) + ")" {
tag.StringValues = []string{" "}
} else if customStr[1] ==string(model.TagTypeBool) + ")" {
tag.BoolValues = []bool{true}
} else if customStr[1] == string(model.TagTypeNumber) + ")" {
tag.NumberValues = []float64{0}
} else {
return nil, fmt.Errorf("TagKey param is not valid in query")
}
}
newTags = append(newTags, tag)
}
}
return newTags, nil
}
func parseTagFilterRequest(r *http.Request) (*model.TagFilterParams, error) {
var postData *model.TagFilterParams
err := json.NewDecoder(r.Body).Decode(&postData)
@ -426,8 +456,16 @@ func parseTagValueRequest(r *http.Request) (*model.TagFilterParams, error) {
if err != nil {
return nil, err
}
if postData.TagKey == "" {
return nil, fmt.Errorf("%s param missing in query", postData.TagKey)
if postData.TagKey == (model.TagKey{}) {
return nil, fmt.Errorf("TagKey param missing in query")
}
if postData.TagKey.Type != model.TagTypeString && postData.TagKey.Type != model.TagTypeBool && postData.TagKey.Type != model.TagTypeNumber {
return nil, fmt.Errorf("tag keys type %s is not supported", postData.TagKey.Type)
}
if postData.Limit == 0 {
postData.Limit = 100
}
postData.Start, err = parseTimeStr(postData.StartStr, "start")

View File

@ -109,6 +109,24 @@ const (
DefaultLogSkipIndexType = "bloom_filter(0.01)"
DefaultLogSkipIndexGranularity = 64
)
var GroupByColMap = map[string]struct{}{
ServiceName: {},
HttpHost: {},
HttpRoute: {},
HttpUrl: {},
HttpMethod: {},
Component: {},
OperationDB: {},
DBName: {},
DBOperation: {},
DBSystem: {},
MsgOperation: {},
MsgSystem: {},
RPCMethod: {},
ResponseStatusCode: {},
}
const (
SIGNOZ_METRIC_DBNAME = "signoz_metrics"
SIGNOZ_SAMPLES_TABLENAME = "distributed_samples_v2"
@ -195,3 +213,9 @@ const (
// written clickhouse query. The column alias indcate which value is
// to be considered as final result (or target)
var ReservedColumnTargetAliases = map[string]bool{"result": true, "res": true, "value": true}
const (
StringTagMapCol = "stringTagMap"
NumberTagMapCol = "numberTagMap"
BoolTagMapCol = "boolTagMap"
)

View File

@ -34,8 +34,8 @@ type Reader interface {
// clickhouse only.
GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError)
GetSpanFilters(ctx context.Context, query *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError)
GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*[]model.TagFilters, *model.ApiError)
GetTagValues(ctx context.Context, query *model.TagFilterParams) (*[]model.TagValues, *model.ApiError)
GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*model.TagFilters, *model.ApiError)
GetTagValues(ctx context.Context, query *model.TagFilterParams) (*model.TagValues, *model.ApiError)
GetFilteredSpans(ctx context.Context, query *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError)
GetFilteredSpansAggregates(ctx context.Context, query *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError)

View File

@ -152,7 +152,7 @@ type GetTopOperationsParams struct {
ServiceName string `json:"service"`
Start *time.Time
End *time.Time
Tags []TagQuery `json:"tags"`
Tags []TagQueryParam `json:"tags"`
}
type GetUsageParams struct {
@ -171,7 +171,7 @@ type GetServicesParams struct {
Period int
Start *time.Time
End *time.Time
Tags []TagQuery `json:"tags"`
Tags []TagQueryParam `json:"tags"`
}
type GetServiceOverviewParams struct {
@ -180,70 +180,187 @@ type GetServiceOverviewParams struct {
Period string
Start *time.Time
End *time.Time
Tags []TagQuery `json:"tags"`
ServiceName string `json:"service"`
StepSeconds int `json:"step"`
Tags []TagQueryParam `json:"tags"`
ServiceName string `json:"service"`
StepSeconds int `json:"step"`
}
type TagQuery struct {
Key string
Values []string
Operator string
type TagQueryParam struct {
Key string `json:"key"`
StringValues []string `json:"stringValues"`
BoolValues []bool `json:"boolValues"`
NumberValues []float64 `json:"numberValues"`
Operator Operator `json:"operator"`
}
type Operator string
const (
InOperator Operator = "In"
NotInOperator Operator = "NotIn"
EqualOperator Operator = "Equals"
NotEqualOperator Operator = "NotEquals"
ExistsOperator Operator = "Exists"
NotExistsOperator Operator = "NotExists"
ContainsOperator Operator = "Contains"
NotContainsOperator Operator = "NotContains"
LessThanOperator Operator = "LessThan"
GreaterThanOperator Operator = "GreaterThan"
LessThanEqualOperator Operator = "LessThanEquals"
GreaterThanEqualOperator Operator = "GreaterThanEquals"
StartsWithOperator Operator = "StartsWith"
NotStartsWithOperator Operator = "NotStartsWith"
)
type TagQuery interface {
GetKey() string
GetValues() []interface{}
GetOperator() Operator
}
type TagQueryString struct {
key string
values []string
operator Operator
}
func NewTagQueryString(key string, values []string, operator Operator) TagQueryString {
return TagQueryString{
key: key,
values: values,
operator: operator,
}
}
func (tqn TagQueryNumber) GetKey() string {
return tqn.key
}
func (tqs TagQueryString) GetValues() []interface{} {
values := make([]interface{}, len(tqs.values))
for i, v := range tqs.values {
values[i] = v
}
return values
}
func (tqs TagQueryString) GetOperator() Operator {
return tqs.operator
}
type TagQueryBool struct {
key string
values []bool
operator Operator
}
func NewTagQueryBool(key string, values []bool, operator Operator) TagQueryBool {
return TagQueryBool{
key: key,
values: values,
operator: operator,
}
}
func (tqb TagQueryBool) GetKey() string {
return tqb.key
}
func (tqb TagQueryBool) GetValues() []interface{} {
values := make([]interface{}, len(tqb.values))
for i, v := range tqb.values {
values[i] = v
}
return values
}
func (tqb TagQueryBool) GetOperator() Operator {
return tqb.operator
}
type TagQueryNumber struct {
key string
values []float64
operator Operator
}
func NewTagQueryNumber(key string, values []float64, operator Operator) TagQueryNumber {
return TagQueryNumber{
key: key,
values: values,
operator: operator,
}
}
func (tqs TagQueryString) GetKey() string {
return tqs.key
}
func (tqn TagQueryNumber) GetValues() []interface{} {
values := make([]interface{}, len(tqn.values))
for i, v := range tqn.values {
values[i] = v
}
return values
}
func (tqn TagQueryNumber) GetOperator() Operator {
return tqn.operator
}
type GetFilteredSpansParams struct {
TraceID []string `json:"traceID"`
ServiceName []string `json:"serviceName"`
Operation []string `json:"operation"`
Kind string `json:"kind"`
Status []string `json:"status"`
HttpRoute []string `json:"httpRoute"`
HttpCode []string `json:"httpCode"`
HttpUrl []string `json:"httpUrl"`
HttpHost []string `json:"httpHost"`
HttpMethod []string `json:"httpMethod"`
Component []string `json:"component"`
RPCMethod []string `json:"rpcMethod"`
ResponseStatusCode []string `json:"responseStatusCode"`
StartStr string `json:"start"`
EndStr string `json:"end"`
MinDuration string `json:"minDuration"`
MaxDuration string `json:"maxDuration"`
Limit int64 `json:"limit"`
OrderParam string `json:"orderParam"`
Order string `json:"order"`
Offset int64 `json:"offset"`
Tags []TagQuery `json:"tags"`
Exclude []string `json:"exclude"`
TraceID []string `json:"traceID"`
ServiceName []string `json:"serviceName"`
Operation []string `json:"operation"`
Kind string `json:"kind"`
Status []string `json:"status"`
HttpRoute []string `json:"httpRoute"`
HttpCode []string `json:"httpCode"`
HttpUrl []string `json:"httpUrl"`
HttpHost []string `json:"httpHost"`
HttpMethod []string `json:"httpMethod"`
Component []string `json:"component"`
RPCMethod []string `json:"rpcMethod"`
ResponseStatusCode []string `json:"responseStatusCode"`
StartStr string `json:"start"`
EndStr string `json:"end"`
MinDuration string `json:"minDuration"`
MaxDuration string `json:"maxDuration"`
Limit int64 `json:"limit"`
OrderParam string `json:"orderParam"`
Order string `json:"order"`
Offset int64 `json:"offset"`
Tags []TagQueryParam `json:"tags"`
Exclude []string `json:"exclude"`
Start *time.Time
End *time.Time
}
type GetFilteredSpanAggregatesParams struct {
TraceID []string `json:"traceID"`
ServiceName []string `json:"serviceName"`
Operation []string `json:"operation"`
Kind string `json:"kind"`
Status []string `json:"status"`
HttpRoute []string `json:"httpRoute"`
HttpCode []string `json:"httpCode"`
HttpUrl []string `json:"httpUrl"`
HttpHost []string `json:"httpHost"`
HttpMethod []string `json:"httpMethod"`
Component []string `json:"component"`
RPCMethod []string `json:"rpcMethod"`
ResponseStatusCode []string `json:"responseStatusCode"`
MinDuration string `json:"minDuration"`
MaxDuration string `json:"maxDuration"`
Tags []TagQuery `json:"tags"`
StartStr string `json:"start"`
EndStr string `json:"end"`
StepSeconds int `json:"step"`
Dimension string `json:"dimension"`
AggregationOption string `json:"aggregationOption"`
GroupBy string `json:"groupBy"`
Function string `json:"function"`
Exclude []string `json:"exclude"`
TraceID []string `json:"traceID"`
ServiceName []string `json:"serviceName"`
Operation []string `json:"operation"`
Kind string `json:"kind"`
Status []string `json:"status"`
HttpRoute []string `json:"httpRoute"`
HttpCode []string `json:"httpCode"`
HttpUrl []string `json:"httpUrl"`
HttpHost []string `json:"httpHost"`
HttpMethod []string `json:"httpMethod"`
Component []string `json:"component"`
RPCMethod []string `json:"rpcMethod"`
ResponseStatusCode []string `json:"responseStatusCode"`
MinDuration string `json:"minDuration"`
MaxDuration string `json:"maxDuration"`
Tags []TagQueryParam `json:"tags"`
StartStr string `json:"start"`
EndStr string `json:"end"`
StepSeconds int `json:"step"`
Dimension string `json:"dimension"`
AggregationOption string `json:"aggregationOption"`
GroupBy string `json:"groupBy"`
Function string `json:"function"`
Exclude []string `json:"exclude"`
Start *time.Time
End *time.Time
}
@ -289,11 +406,25 @@ type TagFilterParams struct {
MaxDuration string `json:"maxDuration"`
StartStr string `json:"start"`
EndStr string `json:"end"`
TagKey string `json:"tagKey"`
TagKey TagKey `json:"tagKey"`
Limit int `json:"limit"`
Start *time.Time
End *time.Time
}
type TagType string
const (
TagTypeString TagType = "string"
TagTypeNumber TagType = "number"
TagTypeBool TagType = "bool"
)
type TagKey struct {
Key string `json:"key"`
Type TagType `json:"type"`
}
type TTLParams struct {
Type string // It can be one of {traces, metrics}.
ColdStorageVolume string // Name of the cold storage volume.

View File

@ -275,11 +275,15 @@ type TopOperationsItem struct {
}
type TagFilters struct {
TagKeys string `json:"tagKeys" ch:"tagKeys"`
StringTagKeys []string `json:"stringTagKeys" ch:"stringTagKeys"`
NumberTagKeys []string `json:"numberTagKeys" ch:"numberTagKeys"`
BoolTagKeys []string `json:"boolTagKeys" ch:"boolTagKeys"`
}
type TagValues struct {
TagValues string `json:"tagValues" ch:"tagValues"`
StringTagValues []string `json:"stringTagValues" ch:"stringTagValues"`
BoolTagValues []bool `json:"boolTagValues" ch:"boolTagValues"`
NumberTagValues []float64 `json:"numberTagValues" ch:"numberTagValues"`
}
type ServiceMapDependencyResponseItem struct {