chore: move clickhousereader filter suggestion methods to dedicated file (#6061)

This commit is contained in:
Raj Kamal Singh 2024-09-25 09:51:16 +05:30 committed by GitHub
parent 0feab5aa93
commit 708158f50f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 270 additions and 257 deletions

View File

@ -0,0 +1,270 @@
// Clickhouse reader methods for powering QB filter suggestions
package clickhouseReader
import (
"context"
"database/sql"
"fmt"
"slices"
"strings"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.uber.org/zap"
)
func (r *ClickHouseReader) GetQBFilterSuggestionsForLogs(
ctx context.Context,
req *v3.QBFilterSuggestionsRequest,
) (*v3.QBFilterSuggestionsResponse, *model.ApiError) {
suggestions := v3.QBFilterSuggestionsResponse{
AttributeKeys: []v3.AttributeKey{},
ExampleQueries: []v3.FilterSet{},
}
// Use existing autocomplete logic for generating attribute suggestions
attribKeysResp, err := r.GetLogAttributeKeys(
ctx, &v3.FilterAttributeKeyRequest{
SearchText: req.SearchText,
DataSource: v3.DataSourceLogs,
Limit: int(req.AttributesLimit),
})
if err != nil {
return nil, model.InternalError(fmt.Errorf("couldn't get attribute keys: %w", err))
}
suggestions.AttributeKeys = attribKeysResp.AttributeKeys
// Rank suggested attributes
slices.SortFunc(suggestions.AttributeKeys, func(a v3.AttributeKey, b v3.AttributeKey) int {
// Higher score => higher rank
attribKeyScore := func(a v3.AttributeKey) int {
// Scoring criteria is expected to get more sophisticated in follow up changes
if a.Type == v3.AttributeKeyTypeResource {
return 2
}
if a.Type == v3.AttributeKeyTypeTag {
return 1
}
return 0
}
// To sort in descending order of score the return value must be negative when a > b
return attribKeyScore(b) - attribKeyScore(a)
})
// Put together suggested example queries.
newExampleQuery := func() v3.FilterSet {
// Include existing filter in example query if specified.
if req.ExistingFilter != nil {
return *req.ExistingFilter
}
return v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
}
}
// Suggest example queries for top suggested log attributes and resource attributes
exampleAttribs := []v3.AttributeKey{}
for _, attrib := range suggestions.AttributeKeys {
isAttributeOrResource := slices.Contains([]v3.AttributeKeyType{
v3.AttributeKeyTypeResource, v3.AttributeKeyTypeTag,
}, attrib.Type)
isNumOrStringType := slices.Contains([]v3.AttributeKeyDataType{
v3.AttributeKeyDataTypeInt64, v3.AttributeKeyDataTypeFloat64, v3.AttributeKeyDataTypeString,
}, attrib.DataType)
if isAttributeOrResource && isNumOrStringType {
exampleAttribs = append(exampleAttribs, attrib)
}
if len(exampleAttribs) >= int(req.ExamplesLimit) {
break
}
}
if len(exampleAttribs) > 0 {
exampleAttribValues, err := r.getValuesForLogAttributes(
ctx, exampleAttribs, req.ExamplesLimit,
)
if err != nil {
// Do not fail the entire request if only example query generation fails
zap.L().Error("could not find attribute values for creating example query", zap.Error(err))
} else {
// add example queries for as many attributes as possible.
// suggest 1st value for 1st attrib, followed by 1st value for second attrib and so on
// and if there is still room, suggest 2nd value for 1st attrib, 2nd value for 2nd attrib and so on
for valueIdx := 0; valueIdx < int(req.ExamplesLimit); valueIdx++ {
for attrIdx, attr := range exampleAttribs {
needMoreExamples := len(suggestions.ExampleQueries) < int(req.ExamplesLimit)
if needMoreExamples && valueIdx < len(exampleAttribValues[attrIdx]) {
exampleQuery := newExampleQuery()
exampleQuery.Items = append(exampleQuery.Items, v3.FilterItem{
Key: attr,
Operator: "=",
Value: exampleAttribValues[attrIdx][valueIdx],
})
suggestions.ExampleQueries = append(
suggestions.ExampleQueries, exampleQuery,
)
}
}
}
}
}
// Suggest static example queries for standard log attributes if needed.
if len(suggestions.ExampleQueries) < int(req.ExamplesLimit) {
exampleQuery := newExampleQuery()
exampleQuery.Items = append(exampleQuery.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "body",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeUnspecified,
IsColumn: true,
},
Operator: "contains",
Value: "error",
})
suggestions.ExampleQueries = append(suggestions.ExampleQueries, exampleQuery)
}
return &suggestions, nil
}
// Get up to `limit` values seen for each attribute in `attributes`
// Returns a slice of slices where the ith slice has values for ith entry in `attributes`
func (r *ClickHouseReader) getValuesForLogAttributes(
ctx context.Context, attributes []v3.AttributeKey, limit uint64,
) ([][]any, *model.ApiError) {
/*
The query used here needs to be as cheap as possible, and while uncommon, it is possible for
a tag to have 100s of millions of values (eg: message, request_id)
Construct a query to UNION the result of querying first `limit` values for each attribute. For example:
```
select * from (
(
select tagKey, stringTagValue, int64TagValue, float64TagValue
from signoz_logs.distributed_tag_attributes
where tagKey = $1 and (
stringTagValue != '' or int64TagValue is not null or float64TagValue is not null
)
limit 2
) UNION DISTINCT (
select tagKey, stringTagValue, int64TagValue, float64TagValue
from signoz_logs.distributed_tag_attributes
where tagKey = $2 and (
stringTagValue != '' or int64TagValue is not null or float64TagValue is not null
)
limit 2
)
) settings max_threads=2
```
Since tag_attributes table uses ReplacingMergeTree, the values would be distinct and no order by
is being used to ensure the `limit` clause minimizes the amount of data scanned.
This query scanned ~30k rows per attribute on fiscalnote-v2 for attributes like `message` and `time`
that had >~110M values each
*/
if len(attributes) > 10 {
zap.L().Error(
"log attribute values requested for too many attributes. This can lead to slow and costly queries",
zap.Int("count", len(attributes)),
)
attributes = attributes[:10]
}
tagQueries := []string{}
tagKeyQueryArgs := []any{}
for idx, attrib := range attributes {
tagQueries = append(tagQueries, fmt.Sprintf(`(
select tagKey, stringTagValue, int64TagValue, float64TagValue
from %s.%s
where tagKey = $%d and (
stringTagValue != '' or int64TagValue is not null or float64TagValue is not null
)
limit %d
)`, r.logsDB, r.logsTagAttributeTable, idx+1, limit))
tagKeyQueryArgs = append(tagKeyQueryArgs, attrib.Key)
}
query := fmt.Sprintf(`select * from (
%s
) settings max_threads=2`, strings.Join(tagQueries, " UNION DISTINCT "))
rows, err := r.db.Query(ctx, query, tagKeyQueryArgs...)
if err != nil {
zap.L().Error("couldn't query attrib values for suggestions", zap.Error(err))
return nil, model.InternalError(fmt.Errorf(
"couldn't query attrib values for suggestions: %w", err,
))
}
defer rows.Close()
result := make([][]any, len(attributes))
// Helper for getting hold of the result slice to append to for each scanned row
resultIdxForAttrib := func(key string, dataType v3.AttributeKeyDataType) int {
return slices.IndexFunc(attributes, func(attrib v3.AttributeKey) bool {
return attrib.Key == key && attrib.DataType == dataType
})
}
// Scan rows and append to result
for rows.Next() {
var tagKey string
var stringValue string
var float64Value sql.NullFloat64
var int64Value sql.NullInt64
err := rows.Scan(
&tagKey, &stringValue, &int64Value, &float64Value,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't scan attrib value rows: %w", err,
))
}
if len(stringValue) > 0 {
attrResultIdx := resultIdxForAttrib(tagKey, v3.AttributeKeyDataTypeString)
if attrResultIdx >= 0 {
result[attrResultIdx] = append(result[attrResultIdx], stringValue)
}
} else if int64Value.Valid {
attrResultIdx := resultIdxForAttrib(tagKey, v3.AttributeKeyDataTypeInt64)
if attrResultIdx >= 0 {
result[attrResultIdx] = append(result[attrResultIdx], int64Value.Int64)
}
} else if float64Value.Valid {
attrResultIdx := resultIdxForAttrib(tagKey, v3.AttributeKeyDataTypeFloat64)
if attrResultIdx >= 0 {
result[attrResultIdx] = append(result[attrResultIdx], float64Value.Float64)
}
}
}
if err := rows.Err(); err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't scan attrib value rows: %w", err,
))
}
return result, nil
}

View File

@ -10,7 +10,6 @@ import (
"os"
"reflect"
"regexp"
"slices"
"sort"
"strconv"
"strings"
@ -4069,262 +4068,6 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi
}
func (r *ClickHouseReader) GetQBFilterSuggestionsForLogs(
ctx context.Context,
req *v3.QBFilterSuggestionsRequest,
) (*v3.QBFilterSuggestionsResponse, *model.ApiError) {
suggestions := v3.QBFilterSuggestionsResponse{
AttributeKeys: []v3.AttributeKey{},
ExampleQueries: []v3.FilterSet{},
}
// Use existing autocomplete logic for generating attribute suggestions
attribKeysResp, err := r.GetLogAttributeKeys(
ctx, &v3.FilterAttributeKeyRequest{
SearchText: req.SearchText,
DataSource: v3.DataSourceLogs,
Limit: int(req.AttributesLimit),
})
if err != nil {
return nil, model.InternalError(fmt.Errorf("couldn't get attribute keys: %w", err))
}
suggestions.AttributeKeys = attribKeysResp.AttributeKeys
// Rank suggested attributes
slices.SortFunc(suggestions.AttributeKeys, func(a v3.AttributeKey, b v3.AttributeKey) int {
// Higher score => higher rank
attribKeyScore := func(a v3.AttributeKey) int {
// Scoring criteria is expected to get more sophisticated in follow up changes
if a.Type == v3.AttributeKeyTypeResource {
return 2
}
if a.Type == v3.AttributeKeyTypeTag {
return 1
}
return 0
}
// To sort in descending order of score the return value must be negative when a > b
return attribKeyScore(b) - attribKeyScore(a)
})
// Put together suggested example queries.
newExampleQuery := func() v3.FilterSet {
// Include existing filter in example query if specified.
if req.ExistingFilter != nil {
return *req.ExistingFilter
}
return v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
}
}
// Suggest example queries for top suggested log attributes and resource attributes
exampleAttribs := []v3.AttributeKey{}
for _, attrib := range suggestions.AttributeKeys {
isAttributeOrResource := slices.Contains([]v3.AttributeKeyType{
v3.AttributeKeyTypeResource, v3.AttributeKeyTypeTag,
}, attrib.Type)
isNumOrStringType := slices.Contains([]v3.AttributeKeyDataType{
v3.AttributeKeyDataTypeInt64, v3.AttributeKeyDataTypeFloat64, v3.AttributeKeyDataTypeString,
}, attrib.DataType)
if isAttributeOrResource && isNumOrStringType {
exampleAttribs = append(exampleAttribs, attrib)
}
if len(exampleAttribs) >= int(req.ExamplesLimit) {
break
}
}
if len(exampleAttribs) > 0 {
exampleAttribValues, err := r.getValuesForLogAttributes(
ctx, exampleAttribs, req.ExamplesLimit,
)
if err != nil {
// Do not fail the entire request if only example query generation fails
zap.L().Error("could not find attribute values for creating example query", zap.Error(err))
} else {
// add example queries for as many attributes as possible.
// suggest 1st value for 1st attrib, followed by 1st value for second attrib and so on
// and if there is still room, suggest 2nd value for 1st attrib, 2nd value for 2nd attrib and so on
for valueIdx := 0; valueIdx < int(req.ExamplesLimit); valueIdx++ {
for attrIdx, attr := range exampleAttribs {
needMoreExamples := len(suggestions.ExampleQueries) < int(req.ExamplesLimit)
if needMoreExamples && valueIdx < len(exampleAttribValues[attrIdx]) {
exampleQuery := newExampleQuery()
exampleQuery.Items = append(exampleQuery.Items, v3.FilterItem{
Key: attr,
Operator: "=",
Value: exampleAttribValues[attrIdx][valueIdx],
})
suggestions.ExampleQueries = append(
suggestions.ExampleQueries, exampleQuery,
)
}
}
}
}
}
// Suggest static example queries for standard log attributes if needed.
if len(suggestions.ExampleQueries) < int(req.ExamplesLimit) {
exampleQuery := newExampleQuery()
exampleQuery.Items = append(exampleQuery.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "body",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeUnspecified,
IsColumn: true,
},
Operator: "contains",
Value: "error",
})
suggestions.ExampleQueries = append(suggestions.ExampleQueries, exampleQuery)
}
return &suggestions, nil
}
// Get up to `limit` values seen for each attribute in `attributes`
// Returns a slice of slices where the ith slice has values for ith entry in `attributes`
func (r *ClickHouseReader) getValuesForLogAttributes(
ctx context.Context, attributes []v3.AttributeKey, limit uint64,
) ([][]any, *model.ApiError) {
/*
The query used here needs to be as cheap as possible, and while uncommon, it is possible for
a tag to have 100s of millions of values (eg: message, request_id)
Construct a query to UNION the result of querying first `limit` values for each attribute. For example:
```
select * from (
(
select tagKey, stringTagValue, int64TagValue, float64TagValue
from signoz_logs.distributed_tag_attributes
where tagKey = $1 and (
stringTagValue != '' or int64TagValue is not null or float64TagValue is not null
)
limit 2
) UNION DISTINCT (
select tagKey, stringTagValue, int64TagValue, float64TagValue
from signoz_logs.distributed_tag_attributes
where tagKey = $2 and (
stringTagValue != '' or int64TagValue is not null or float64TagValue is not null
)
limit 2
)
) settings max_threads=2
```
Since tag_attributes table uses ReplacingMergeTree, the values would be distinct and no order by
is being used to ensure the `limit` clause minimizes the amount of data scanned.
This query scanned ~30k rows per attribute on fiscalnote-v2 for attributes like `message` and `time`
that had >~110M values each
*/
if len(attributes) > 10 {
zap.L().Error(
"log attribute values requested for too many attributes. This can lead to slow and costly queries",
zap.Int("count", len(attributes)),
)
attributes = attributes[:10]
}
tagQueries := []string{}
tagKeyQueryArgs := []any{}
for idx, attrib := range attributes {
tagQueries = append(tagQueries, fmt.Sprintf(`(
select tagKey, stringTagValue, int64TagValue, float64TagValue
from %s.%s
where tagKey = $%d and (
stringTagValue != '' or int64TagValue is not null or float64TagValue is not null
)
limit %d
)`, r.logsDB, r.logsTagAttributeTable, idx+1, limit))
tagKeyQueryArgs = append(tagKeyQueryArgs, attrib.Key)
}
query := fmt.Sprintf(`select * from (
%s
) settings max_threads=2`, strings.Join(tagQueries, " UNION DISTINCT "))
rows, err := r.db.Query(ctx, query, tagKeyQueryArgs...)
if err != nil {
zap.L().Error("couldn't query attrib values for suggestions", zap.Error(err))
return nil, model.InternalError(fmt.Errorf(
"couldn't query attrib values for suggestions: %w", err,
))
}
defer rows.Close()
result := make([][]any, len(attributes))
// Helper for getting hold of the result slice to append to for each scanned row
resultIdxForAttrib := func(key string, dataType v3.AttributeKeyDataType) int {
return slices.IndexFunc(attributes, func(attrib v3.AttributeKey) bool {
return attrib.Key == key && attrib.DataType == dataType
})
}
// Scan rows and append to result
for rows.Next() {
var tagKey string
var stringValue string
var float64Value sql.NullFloat64
var int64Value sql.NullInt64
err := rows.Scan(
&tagKey, &stringValue, &int64Value, &float64Value,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't scan attrib value rows: %w", err,
))
}
if len(stringValue) > 0 {
attrResultIdx := resultIdxForAttrib(tagKey, v3.AttributeKeyDataTypeString)
if attrResultIdx >= 0 {
result[attrResultIdx] = append(result[attrResultIdx], stringValue)
}
} else if int64Value.Valid {
attrResultIdx := resultIdxForAttrib(tagKey, v3.AttributeKeyDataTypeInt64)
if attrResultIdx >= 0 {
result[attrResultIdx] = append(result[attrResultIdx], int64Value.Int64)
}
} else if float64Value.Valid {
attrResultIdx := resultIdxForAttrib(tagKey, v3.AttributeKeyDataTypeFloat64)
if attrResultIdx >= 0 {
result[attrResultIdx] = append(result[attrResultIdx], float64Value.Float64)
}
}
}
if err := rows.Err(); err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't scan attrib value rows: %w", err,
))
}
return result, nil
}
func readRow(vars []interface{}, columnNames []string, countOfNumberCols int) ([]string, map[string]string, []map[string]string, *v3.Point) {
// Each row will have a value and a timestamp, and an optional list of label values
// example: {Timestamp: ..., Value: ...}