mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-08 16:49:01 +08:00
logs v4 qb refactor (#5908)
* feat: logsV4 initial refactoring * feat: filter_query builder with tests added * feat: all functions of v4 refactored * fix: tests fixed * fix: update select for table panel * fix: tests updated with better examples of limit and group by * fix: resource filter support in live tail --------- Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
parent
20ac75e3d2
commit
6e7f04b492
@ -20,7 +20,7 @@ const (
|
||||
NGRAM_SIZE = 4
|
||||
)
|
||||
|
||||
var dataTypeMapping = map[string]string{
|
||||
var DataTypeMapping = map[string]string{
|
||||
"string": STRING,
|
||||
"int64": INT64,
|
||||
"float64": FLOAT64,
|
||||
@ -31,7 +31,7 @@ var dataTypeMapping = map[string]string{
|
||||
"array(bool)": ARRAY_BOOL,
|
||||
}
|
||||
|
||||
var arrayValueTypeMapping = map[string]string{
|
||||
var ArrayValueTypeMapping = map[string]string{
|
||||
"array(string)": "string",
|
||||
"array(int64)": "int64",
|
||||
"array(float64)": "float64",
|
||||
@ -59,7 +59,7 @@ var jsonLogOperators = map[v3.FilterOperator]string{
|
||||
v3.FilterOperatorNotHas: "NOT has(%s, %s)",
|
||||
}
|
||||
|
||||
func getPath(keyArr []string) string {
|
||||
func GetPath(keyArr []string) string {
|
||||
path := []string{}
|
||||
for i := 0; i < len(keyArr); i++ {
|
||||
if strings.HasSuffix(keyArr[i], "[*]") {
|
||||
@ -71,7 +71,7 @@ func getPath(keyArr []string) string {
|
||||
return strings.Join(path, ".")
|
||||
}
|
||||
|
||||
func getJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) (string, error) {
|
||||
func GetJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) (string, error) {
|
||||
keyArr := strings.Split(key.Key, ".")
|
||||
// i.e it should be at least body.name, and not something like body
|
||||
if len(keyArr) < 2 {
|
||||
@ -89,11 +89,11 @@ func getJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) (
|
||||
|
||||
var dataType string
|
||||
var ok bool
|
||||
if dataType, ok = dataTypeMapping[string(key.DataType)]; !ok {
|
||||
if dataType, ok = DataTypeMapping[string(key.DataType)]; !ok {
|
||||
return "", fmt.Errorf("unsupported dataType for JSON: %s", key.DataType)
|
||||
}
|
||||
|
||||
path := getPath(keyArr[1:])
|
||||
path := GetPath(keyArr[1:])
|
||||
|
||||
if isArray {
|
||||
return fmt.Sprintf("JSONExtract(JSON_QUERY(%s, '$.%s'), '%s')", keyArr[0], path, dataType), nil
|
||||
@ -109,7 +109,7 @@ func getJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) (
|
||||
}
|
||||
|
||||
// takes the path and the values and generates where clauses for better usage of index
|
||||
func getPathIndexFilter(path string) string {
|
||||
func GetPathIndexFilter(path string) string {
|
||||
filters := []string{}
|
||||
keyArr := strings.Split(path, ".")
|
||||
if len(keyArr) < 2 {
|
||||
@ -136,7 +136,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) {
|
||||
dataType := item.Key.DataType
|
||||
isArray := false
|
||||
// check if its an array and handle it
|
||||
if val, ok := arrayValueTypeMapping[string(item.Key.DataType)]; ok {
|
||||
if val, ok := ArrayValueTypeMapping[string(item.Key.DataType)]; ok {
|
||||
if item.Operator != v3.FilterOperatorHas && item.Operator != v3.FilterOperatorNotHas {
|
||||
return "", fmt.Errorf("only has operator is supported for array")
|
||||
}
|
||||
@ -144,7 +144,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) {
|
||||
dataType = v3.AttributeKeyDataType(val)
|
||||
}
|
||||
|
||||
key, err := getJSONFilterKey(item.Key, item.Operator, isArray)
|
||||
key, err := GetJSONFilterKey(item.Key, item.Operator, isArray)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -164,7 +164,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) {
|
||||
if logsOp, ok := jsonLogOperators[op]; ok {
|
||||
switch op {
|
||||
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
|
||||
filter = fmt.Sprintf(logsOp, key, getPath(strings.Split(item.Key.Key, ".")[1:]))
|
||||
filter = fmt.Sprintf(logsOp, key, GetPath(strings.Split(item.Key.Key, ".")[1:]))
|
||||
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex, v3.FilterOperatorHas, v3.FilterOperatorNotHas:
|
||||
fmtVal := utils.ClickHouseFormattedValue(value)
|
||||
filter = fmt.Sprintf(logsOp, key, fmtVal)
|
||||
@ -181,7 +181,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) {
|
||||
|
||||
filters := []string{}
|
||||
|
||||
pathFilter := getPathIndexFilter(item.Key.Key)
|
||||
pathFilter := GetPathIndexFilter(item.Key.Key)
|
||||
if pathFilter != "" {
|
||||
filters = append(filters, pathFilter)
|
||||
}
|
||||
@ -196,7 +196,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) {
|
||||
|
||||
// add exists check for non array items as default values of int/float/bool will corrupt the results
|
||||
if !isArray && !(item.Operator == v3.FilterOperatorExists || item.Operator == v3.FilterOperatorNotExists) {
|
||||
existsFilter := fmt.Sprintf("JSON_EXISTS(body, '$.%s')", getPath(strings.Split(item.Key.Key, ".")[1:]))
|
||||
existsFilter := fmt.Sprintf("JSON_EXISTS(body, '$.%s')", GetPath(strings.Split(item.Key.Key, ".")[1:]))
|
||||
filter = fmt.Sprintf("%s AND %s", existsFilter, filter)
|
||||
}
|
||||
|
||||
|
@ -140,7 +140,7 @@ var testGetJSONFilterKeyData = []struct {
|
||||
func TestGetJSONFilterKey(t *testing.T) {
|
||||
for _, tt := range testGetJSONFilterKeyData {
|
||||
Convey("testgetKey", t, func() {
|
||||
columnName, err := getJSONFilterKey(tt.Key, tt.Operator, tt.IsArray)
|
||||
columnName, err := GetJSONFilterKey(tt.Key, tt.Operator, tt.IsArray)
|
||||
if tt.Error {
|
||||
So(err, ShouldNotBeNil)
|
||||
} else {
|
||||
|
@ -9,7 +9,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
)
|
||||
|
||||
var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{
|
||||
var AggregateOperatorToPercentile = map[v3.AggregateOperator]float64{
|
||||
v3.AggregateOperatorP05: 0.05,
|
||||
v3.AggregateOperatorP10: 0.10,
|
||||
v3.AggregateOperatorP20: 0.20,
|
||||
@ -21,7 +21,7 @@ var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{
|
||||
v3.AggregateOperatorP99: 0.99,
|
||||
}
|
||||
|
||||
var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{
|
||||
var AggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{
|
||||
v3.AggregateOperatorAvg: "avg",
|
||||
v3.AggregateOperatorMax: "max",
|
||||
v3.AggregateOperatorMin: "min",
|
||||
@ -53,7 +53,7 @@ var logOperators = map[v3.FilterOperator]string{
|
||||
|
||||
const BODY = "body"
|
||||
|
||||
func getClickhouseLogsColumnType(columnType v3.AttributeKeyType) string {
|
||||
func GetClickhouseLogsColumnType(columnType v3.AttributeKeyType) string {
|
||||
if columnType == v3.AttributeKeyTypeTag {
|
||||
return "attributes"
|
||||
}
|
||||
@ -83,7 +83,7 @@ func getClickhouseColumnName(key v3.AttributeKey) string {
|
||||
//if the key is present in the topLevelColumn then it will be only searched in those columns,
|
||||
//regardless if it is indexed/present again in resource or column attribute
|
||||
if !key.IsColumn {
|
||||
columnType := getClickhouseLogsColumnType(key.Type)
|
||||
columnType := GetClickhouseLogsColumnType(key.Type)
|
||||
columnDataType := getClickhouseLogsColumnDataType(key.DataType)
|
||||
clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key)
|
||||
return clickhouseColumn
|
||||
@ -114,7 +114,7 @@ func getSelectLabels(aggregatorOperator v3.AggregateOperator, groupBy []v3.Attri
|
||||
return selectLabels
|
||||
}
|
||||
|
||||
func getSelectKeys(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string {
|
||||
func GetSelectKeys(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string {
|
||||
var selectLabels []string
|
||||
if aggregatorOperator == v3.AggregateOperatorNoOp {
|
||||
return ""
|
||||
@ -154,7 +154,7 @@ func GetExistsNexistsFilter(op v3.FilterOperator, item v3.FilterItem) string {
|
||||
}
|
||||
return fmt.Sprintf("%s_exists`=%v", strings.TrimSuffix(getClickhouseColumnName(item.Key), "`"), val)
|
||||
}
|
||||
columnType := getClickhouseLogsColumnType(item.Key.Type)
|
||||
columnType := GetClickhouseLogsColumnType(item.Key.Type)
|
||||
columnDataType := getClickhouseLogsColumnDataType(item.Key.DataType)
|
||||
return fmt.Sprintf(logOperators[op], columnType, columnDataType, item.Key.Key)
|
||||
}
|
||||
@ -224,7 +224,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
|
||||
// add group by conditions to filter out log lines which doesn't have the key
|
||||
for _, attr := range groupBy {
|
||||
if !attr.IsColumn {
|
||||
columnType := getClickhouseLogsColumnType(attr.Type)
|
||||
columnType := GetClickhouseLogsColumnType(attr.Type)
|
||||
columnDataType := getClickhouseLogsColumnDataType(attr.DataType)
|
||||
conditions = append(conditions, fmt.Sprintf("has(%s_%s_key, '%s')", columnType, columnDataType, attr.Key))
|
||||
} else if attr.Type != v3.AttributeKeyTypeUnspecified {
|
||||
@ -258,7 +258,7 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
|
||||
|
||||
selectLabels := getSelectLabels(mq.AggregateOperator, mq.GroupBy)
|
||||
|
||||
having := having(mq.Having)
|
||||
having := Having(mq.Having)
|
||||
if having != "" {
|
||||
having = " having " + having
|
||||
}
|
||||
@ -288,10 +288,10 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
|
||||
// we dont need value for first query
|
||||
// going with this route as for a cleaner approach on implementation
|
||||
if graphLimitQtype == constants.FirstQueryGraphLimit {
|
||||
queryTmpl = "SELECT " + getSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")"
|
||||
queryTmpl = "SELECT " + GetSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")"
|
||||
}
|
||||
|
||||
groupBy := groupByAttributeKeyTags(panelType, graphLimitQtype, mq.GroupBy...)
|
||||
groupBy := GroupByAttributeKeyTags(panelType, graphLimitQtype, mq.GroupBy...)
|
||||
if panelType != v3.PanelTypeList && groupBy != "" {
|
||||
groupBy = " group by " + groupBy
|
||||
}
|
||||
@ -301,7 +301,7 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
|
||||
}
|
||||
|
||||
if graphLimitQtype == constants.SecondQueryGraphLimit {
|
||||
filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", getSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)"
|
||||
filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)"
|
||||
}
|
||||
|
||||
aggregationKey := ""
|
||||
@ -329,7 +329,7 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
|
||||
rate = rate / 60.0
|
||||
}
|
||||
|
||||
op := fmt.Sprintf("%s(%s)/%f", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate)
|
||||
op := fmt.Sprintf("%s(%s)/%f", AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate)
|
||||
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
|
||||
return query, nil
|
||||
case
|
||||
@ -342,11 +342,11 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
|
||||
v3.AggregateOperatorP90,
|
||||
v3.AggregateOperatorP95,
|
||||
v3.AggregateOperatorP99:
|
||||
op := fmt.Sprintf("quantile(%v)(%s)", aggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey)
|
||||
op := fmt.Sprintf("quantile(%v)(%s)", AggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey)
|
||||
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax:
|
||||
op := fmt.Sprintf("%s(%s)", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey)
|
||||
op := fmt.Sprintf("%s(%s)", AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey)
|
||||
query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorCount:
|
||||
@ -394,7 +394,7 @@ func groupBy(panelType v3.PanelType, graphLimitQtype string, tags ...string) str
|
||||
return strings.Join(tags, ",")
|
||||
}
|
||||
|
||||
func groupByAttributeKeyTags(panelType v3.PanelType, graphLimitQtype string, tags ...v3.AttributeKey) string {
|
||||
func GroupByAttributeKeyTags(panelType v3.PanelType, graphLimitQtype string, tags ...v3.AttributeKey) string {
|
||||
groupTags := []string{}
|
||||
for _, tag := range tags {
|
||||
groupTags = append(groupTags, "`"+tag.Key+"`")
|
||||
@ -446,7 +446,7 @@ func orderByAttributeKeyTags(panelType v3.PanelType, items []v3.OrderBy, tags []
|
||||
return str
|
||||
}
|
||||
|
||||
func having(items []v3.Having) string {
|
||||
func Having(items []v3.Having) string {
|
||||
// aggregate something and filter on that aggregate
|
||||
var having []string
|
||||
for _, item := range items {
|
||||
@ -455,7 +455,7 @@ func having(items []v3.Having) string {
|
||||
return strings.Join(having, " AND ")
|
||||
}
|
||||
|
||||
func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v3.AggregateOperator) (string, error) {
|
||||
func ReduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v3.AggregateOperator) (string, error) {
|
||||
// the timestamp picked is not relevant here since the final value used is show the single
|
||||
// chart with just the query value.
|
||||
switch reduceTo {
|
||||
@ -475,14 +475,14 @@ func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v
|
||||
return query, nil
|
||||
}
|
||||
|
||||
func addLimitToQuery(query string, limit uint64) string {
|
||||
func AddLimitToQuery(query string, limit uint64) string {
|
||||
if limit == 0 {
|
||||
return query
|
||||
}
|
||||
return fmt.Sprintf("%s LIMIT %d", query, limit)
|
||||
}
|
||||
|
||||
func addOffsetToQuery(query string, offset uint64) string {
|
||||
func AddOffsetToQuery(query string, offset uint64) string {
|
||||
return fmt.Sprintf("%s OFFSET %d", query, offset)
|
||||
}
|
||||
|
||||
@ -492,7 +492,7 @@ type Options struct {
|
||||
PreferRPM bool
|
||||
}
|
||||
|
||||
func isOrderByTs(orderBy []v3.OrderBy) bool {
|
||||
func IsOrderByTs(orderBy []v3.OrderBy) bool {
|
||||
if len(orderBy) == 1 && (orderBy[0].Key == constants.TIMESTAMP || orderBy[0].ColumnName == constants.TIMESTAMP) {
|
||||
return true
|
||||
}
|
||||
@ -523,7 +523,7 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
query = addLimitToQuery(query, mq.Limit)
|
||||
query = AddLimitToQuery(query, mq.Limit)
|
||||
|
||||
return query, nil
|
||||
} else if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
|
||||
@ -539,7 +539,7 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan
|
||||
return "", err
|
||||
}
|
||||
if panelType == v3.PanelTypeValue {
|
||||
query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator)
|
||||
query, err = ReduceQuery(query, mq.ReduceTo, mq.AggregateOperator)
|
||||
}
|
||||
|
||||
if panelType == v3.PanelTypeList {
|
||||
@ -550,21 +550,21 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan
|
||||
|
||||
if mq.PageSize > 0 {
|
||||
if mq.Limit > 0 && mq.Offset+mq.PageSize > mq.Limit {
|
||||
query = addLimitToQuery(query, mq.Limit-mq.Offset)
|
||||
query = AddLimitToQuery(query, mq.Limit-mq.Offset)
|
||||
} else {
|
||||
query = addLimitToQuery(query, mq.PageSize)
|
||||
query = AddLimitToQuery(query, mq.PageSize)
|
||||
}
|
||||
|
||||
// add offset to the query only if it is not orderd by timestamp.
|
||||
if !isOrderByTs(mq.OrderBy) {
|
||||
query = addOffsetToQuery(query, mq.Offset)
|
||||
if !IsOrderByTs(mq.OrderBy) {
|
||||
query = AddOffsetToQuery(query, mq.Offset)
|
||||
}
|
||||
|
||||
} else {
|
||||
query = addLimitToQuery(query, mq.Limit)
|
||||
query = AddLimitToQuery(query, mq.Limit)
|
||||
}
|
||||
} else if panelType == v3.PanelTypeTable {
|
||||
query = addLimitToQuery(query, mq.Limit)
|
||||
query = AddLimitToQuery(query, mq.Limit)
|
||||
}
|
||||
|
||||
return query, err
|
||||
|
105
pkg/query-service/app/logs/v4/json_filter.go
Normal file
105
pkg/query-service/app/logs/v4/json_filter.go
Normal file
@ -0,0 +1,105 @@
|
||||
package v4
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
)
|
||||
|
||||
var jsonLogOperators = map[v3.FilterOperator]string{
|
||||
v3.FilterOperatorEqual: "=",
|
||||
v3.FilterOperatorNotEqual: "!=",
|
||||
v3.FilterOperatorLessThan: "<",
|
||||
v3.FilterOperatorLessThanOrEq: "<=",
|
||||
v3.FilterOperatorGreaterThan: ">",
|
||||
v3.FilterOperatorGreaterThanOrEq: ">=",
|
||||
v3.FilterOperatorLike: "LIKE",
|
||||
v3.FilterOperatorNotLike: "NOT LIKE",
|
||||
v3.FilterOperatorContains: "LIKE",
|
||||
v3.FilterOperatorNotContains: "NOT LIKE",
|
||||
v3.FilterOperatorRegex: "match(%s, %s)",
|
||||
v3.FilterOperatorNotRegex: "NOT match(%s, %s)",
|
||||
v3.FilterOperatorIn: "IN",
|
||||
v3.FilterOperatorNotIn: "NOT IN",
|
||||
v3.FilterOperatorExists: "JSON_EXISTS(%s, '$.%s')",
|
||||
v3.FilterOperatorNotExists: "NOT JSON_EXISTS(%s, '$.%s')",
|
||||
v3.FilterOperatorHas: "has(%s, %s)",
|
||||
v3.FilterOperatorNotHas: "NOT has(%s, %s)",
|
||||
}
|
||||
|
||||
func GetJSONFilter(item v3.FilterItem) (string, error) {
|
||||
|
||||
dataType := item.Key.DataType
|
||||
isArray := false
|
||||
// check if its an array and handle it
|
||||
if val, ok := logsV3.ArrayValueTypeMapping[string(item.Key.DataType)]; ok {
|
||||
if item.Operator != v3.FilterOperatorHas && item.Operator != v3.FilterOperatorNotHas {
|
||||
return "", fmt.Errorf("only has operator is supported for array")
|
||||
}
|
||||
isArray = true
|
||||
dataType = v3.AttributeKeyDataType(val)
|
||||
}
|
||||
|
||||
key, err := logsV3.GetJSONFilterKey(item.Key, item.Operator, isArray)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// non array
|
||||
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
|
||||
|
||||
var value interface{}
|
||||
if op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists {
|
||||
value, err = utils.ValidateAndCastValue(item.Value, dataType)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err)
|
||||
}
|
||||
}
|
||||
|
||||
var filter string
|
||||
if logsOp, ok := jsonLogOperators[op]; ok {
|
||||
switch op {
|
||||
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
|
||||
filter = fmt.Sprintf(logsOp, key, logsV3.GetPath(strings.Split(item.Key.Key, ".")[1:]))
|
||||
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex, v3.FilterOperatorHas, v3.FilterOperatorNotHas:
|
||||
fmtVal := utils.ClickHouseFormattedValue(value)
|
||||
filter = fmt.Sprintf(logsOp, key, fmtVal)
|
||||
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
|
||||
val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value))
|
||||
filter = fmt.Sprintf("%s %s '%%%s%%'", key, logsOp, val)
|
||||
default:
|
||||
fmtVal := utils.ClickHouseFormattedValue(value)
|
||||
filter = fmt.Sprintf("%s %s %s", key, logsOp, fmtVal)
|
||||
}
|
||||
} else {
|
||||
return "", fmt.Errorf("unsupported operator: %s", op)
|
||||
}
|
||||
|
||||
filters := []string{}
|
||||
|
||||
pathFilter := logsV3.GetPathIndexFilter(item.Key.Key)
|
||||
if pathFilter != "" {
|
||||
filters = append(filters, pathFilter)
|
||||
}
|
||||
if op == v3.FilterOperatorContains ||
|
||||
op == v3.FilterOperatorEqual ||
|
||||
op == v3.FilterOperatorHas {
|
||||
val, ok := item.Value.(string)
|
||||
if ok && len(val) >= logsV3.NGRAM_SIZE {
|
||||
filters = append(filters, fmt.Sprintf("lower(body) like lower('%%%s%%')", utils.QuoteEscapedString(strings.ToLower(val))))
|
||||
}
|
||||
}
|
||||
|
||||
// add exists check for non array items as default values of int/float/bool will corrupt the results
|
||||
if !isArray && !(item.Operator == v3.FilterOperatorExists || item.Operator == v3.FilterOperatorNotExists) {
|
||||
existsFilter := fmt.Sprintf("JSON_EXISTS(body, '$.%s')", logsV3.GetPath(strings.Split(item.Key.Key, ".")[1:]))
|
||||
filter = fmt.Sprintf("%s AND %s", existsFilter, filter)
|
||||
}
|
||||
|
||||
filters = append(filters, filter)
|
||||
|
||||
return strings.Join(filters, " AND "), nil
|
||||
}
|
200
pkg/query-service/app/logs/v4/json_filter_test.go
Normal file
200
pkg/query-service/app/logs/v4/json_filter_test.go
Normal file
@ -0,0 +1,200 @@
|
||||
package v4
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
var testGetJSONFilterData = []struct {
|
||||
Name string
|
||||
FilterItem v3.FilterItem
|
||||
Filter string
|
||||
Error bool
|
||||
}{
|
||||
{
|
||||
Name: "Array membership string",
|
||||
FilterItem: v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body.requestor_list[*]",
|
||||
DataType: "array(string)",
|
||||
IsJSON: true,
|
||||
},
|
||||
Operator: "has",
|
||||
Value: "index_service",
|
||||
},
|
||||
Filter: "lower(body) like lower('%requestor_list%') AND lower(body) like lower('%index_service%') AND has(JSONExtract(JSON_QUERY(body, '$.\"requestor_list\"[*]'), 'Array(String)'), 'index_service')",
|
||||
},
|
||||
{
|
||||
Name: "Array membership int64",
|
||||
FilterItem: v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body.int_numbers[*]",
|
||||
DataType: "array(int64)",
|
||||
IsJSON: true,
|
||||
},
|
||||
Operator: "has",
|
||||
Value: 2,
|
||||
},
|
||||
Filter: "lower(body) like lower('%int_numbers%') AND has(JSONExtract(JSON_QUERY(body, '$.\"int_numbers\"[*]'), '" + logsV3.ARRAY_INT64 + "'), 2)",
|
||||
},
|
||||
{
|
||||
Name: "Array membership float64",
|
||||
FilterItem: v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body.nested_num[*].float_nums[*]",
|
||||
DataType: "array(float64)",
|
||||
IsJSON: true,
|
||||
},
|
||||
Operator: "nhas",
|
||||
Value: 2.2,
|
||||
},
|
||||
Filter: "lower(body) like lower('%nested_num%float_nums%') AND NOT has(JSONExtract(JSON_QUERY(body, '$.\"nested_num\"[*].\"float_nums\"[*]'), '" + logsV3.ARRAY_FLOAT64 + "'), 2.200000)",
|
||||
},
|
||||
{
|
||||
Name: "Array membership bool",
|
||||
FilterItem: v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body.bool[*]",
|
||||
DataType: "array(bool)",
|
||||
IsJSON: true,
|
||||
},
|
||||
Operator: "has",
|
||||
Value: true,
|
||||
},
|
||||
Filter: "lower(body) like lower('%bool%') AND has(JSONExtract(JSON_QUERY(body, '$.\"bool\"[*]'), '" + logsV3.ARRAY_BOOL + "'), true)",
|
||||
},
|
||||
{
|
||||
Name: "eq operator",
|
||||
FilterItem: v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body.message",
|
||||
DataType: "string",
|
||||
IsJSON: true,
|
||||
},
|
||||
Operator: "=",
|
||||
Value: "hello",
|
||||
},
|
||||
Filter: "lower(body) like lower('%message%') AND lower(body) like lower('%hello%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') = 'hello'",
|
||||
},
|
||||
{
|
||||
Name: "eq operator number",
|
||||
FilterItem: v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body.status",
|
||||
DataType: "int64",
|
||||
IsJSON: true,
|
||||
},
|
||||
Operator: "=",
|
||||
Value: 1,
|
||||
},
|
||||
Filter: "lower(body) like lower('%status%') AND JSON_EXISTS(body, '$.\"status\"') AND JSONExtract(JSON_VALUE(body, '$.\"status\"'), '" + logsV3.INT64 + "') = 1",
|
||||
},
|
||||
{
|
||||
Name: "neq operator number",
|
||||
FilterItem: v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body.status",
|
||||
DataType: "float64",
|
||||
IsJSON: true,
|
||||
},
|
||||
Operator: "=",
|
||||
Value: 1.1,
|
||||
},
|
||||
Filter: "lower(body) like lower('%status%') AND JSON_EXISTS(body, '$.\"status\"') AND JSONExtract(JSON_VALUE(body, '$.\"status\"'), '" + logsV3.FLOAT64 + "') = 1.100000",
|
||||
},
|
||||
{
|
||||
Name: "eq operator bool",
|
||||
FilterItem: v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body.boolkey",
|
||||
DataType: "bool",
|
||||
IsJSON: true,
|
||||
},
|
||||
Operator: "=",
|
||||
Value: true,
|
||||
},
|
||||
Filter: "lower(body) like lower('%boolkey%') AND JSON_EXISTS(body, '$.\"boolkey\"') AND JSONExtract(JSON_VALUE(body, '$.\"boolkey\"'), '" + logsV3.BOOL + "') = true",
|
||||
},
|
||||
{
|
||||
Name: "greater than operator",
|
||||
FilterItem: v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body.status",
|
||||
DataType: "int64",
|
||||
IsJSON: true,
|
||||
},
|
||||
Operator: ">",
|
||||
Value: 1,
|
||||
},
|
||||
Filter: "lower(body) like lower('%status%') AND JSON_EXISTS(body, '$.\"status\"') AND JSONExtract(JSON_VALUE(body, '$.\"status\"'), '" + logsV3.INT64 + "') > 1",
|
||||
},
|
||||
{
|
||||
Name: "regex operator",
|
||||
FilterItem: v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body.message",
|
||||
DataType: "string",
|
||||
IsJSON: true,
|
||||
},
|
||||
Operator: "regex",
|
||||
Value: "a*",
|
||||
},
|
||||
Filter: "lower(body) like lower('%message%') AND JSON_EXISTS(body, '$.\"message\"') AND match(JSON_VALUE(body, '$.\"message\"'), 'a*')",
|
||||
},
|
||||
{
|
||||
Name: "contains operator",
|
||||
FilterItem: v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body.message",
|
||||
DataType: "string",
|
||||
IsJSON: true,
|
||||
},
|
||||
Operator: "contains",
|
||||
Value: "a",
|
||||
},
|
||||
Filter: "lower(body) like lower('%message%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') LIKE '%a%'",
|
||||
},
|
||||
{
|
||||
Name: "contains operator with quotes",
|
||||
FilterItem: v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body.message",
|
||||
DataType: "string",
|
||||
IsJSON: true,
|
||||
},
|
||||
Operator: "contains",
|
||||
Value: "hello 'world'",
|
||||
},
|
||||
Filter: "lower(body) like lower('%message%') AND lower(body) like lower('%hello \\'world\\'%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') LIKE '%hello \\'world\\'%'",
|
||||
},
|
||||
{
|
||||
Name: "exists",
|
||||
FilterItem: v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body.message",
|
||||
DataType: "string",
|
||||
IsJSON: true,
|
||||
},
|
||||
Operator: "exists",
|
||||
Value: "",
|
||||
},
|
||||
Filter: "lower(body) like lower('%message%') AND JSON_EXISTS(body, '$.\"message\"')",
|
||||
},
|
||||
}
|
||||
|
||||
func TestGetJSONFilter(t *testing.T) {
|
||||
for _, tt := range testGetJSONFilterData {
|
||||
Convey("testGetJSONFilter", t, func() {
|
||||
filter, err := GetJSONFilter(tt.FilterItem)
|
||||
if tt.Error {
|
||||
So(err, ShouldNotBeNil)
|
||||
} else {
|
||||
So(err, ShouldBeNil)
|
||||
So(filter, ShouldEqual, tt.Filter)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -1,7 +1,13 @@
|
||||
package v4
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
)
|
||||
|
||||
var logOperators = map[v3.FilterOperator]string{
|
||||
@ -29,3 +35,504 @@ const (
|
||||
DISTRIBUTED_LOGS_V2_RESOURCE = "distributed_logs_v2_resource"
|
||||
NANOSECOND = 1000000000
|
||||
)
|
||||
|
||||
func getClickhouseLogsColumnDataType(columnDataType v3.AttributeKeyDataType) string {
|
||||
if columnDataType == v3.AttributeKeyDataTypeFloat64 || columnDataType == v3.AttributeKeyDataTypeInt64 {
|
||||
return "number"
|
||||
}
|
||||
if columnDataType == v3.AttributeKeyDataTypeBool {
|
||||
return "bool"
|
||||
}
|
||||
return "string"
|
||||
}
|
||||
|
||||
func getClickhouseKey(key v3.AttributeKey) string {
|
||||
// check if it is a top level static field
|
||||
if _, ok := constants.StaticFieldsLogsV3[key.Key]; ok && key.Type == v3.AttributeKeyTypeUnspecified {
|
||||
return key.Key
|
||||
}
|
||||
|
||||
//if the key is present in the topLevelColumn then it will be only searched in those columns,
|
||||
//regardless if it is indexed/present again in resource or column attribute
|
||||
if !key.IsColumn {
|
||||
columnType := logsV3.GetClickhouseLogsColumnType(key.Type)
|
||||
columnDataType := getClickhouseLogsColumnDataType(key.DataType)
|
||||
return fmt.Sprintf("%s_%s['%s']", columnType, columnDataType, key.Key)
|
||||
}
|
||||
|
||||
// materialized column created from query
|
||||
// https://github.com/SigNoz/signoz/pull/4775
|
||||
return "`" + utils.GetClickhouseColumnNameV2(string(key.Type), string(key.DataType), key.Key) + "`"
|
||||
}
|
||||
|
||||
func getSelectLabels(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string {
|
||||
var selectLabels string
|
||||
if aggregatorOperator == v3.AggregateOperatorNoOp {
|
||||
selectLabels = ""
|
||||
} else {
|
||||
for _, tag := range groupBy {
|
||||
columnName := getClickhouseKey(tag)
|
||||
selectLabels += fmt.Sprintf(" %s as `%s`,", columnName, tag.Key)
|
||||
}
|
||||
}
|
||||
return selectLabels
|
||||
}
|
||||
|
||||
func getExistsNexistsFilter(op v3.FilterOperator, item v3.FilterItem) string {
|
||||
if _, ok := constants.StaticFieldsLogsV3[item.Key.Key]; ok && item.Key.Type == v3.AttributeKeyTypeUnspecified {
|
||||
// no exists filter for static fields as they exists everywhere
|
||||
// TODO(nitya): Think what we can do here
|
||||
return ""
|
||||
} else if item.Key.IsColumn {
|
||||
// get filter for materialized columns
|
||||
val := true
|
||||
if op == v3.FilterOperatorNotExists {
|
||||
val = false
|
||||
}
|
||||
return fmt.Sprintf("%s_exists`=%v", strings.TrimSuffix(getClickhouseKey(item.Key), "`"), val)
|
||||
}
|
||||
// filter for non materialized attributes
|
||||
columnType := logsV3.GetClickhouseLogsColumnType(item.Key.Type)
|
||||
columnDataType := getClickhouseLogsColumnDataType(item.Key.DataType)
|
||||
return fmt.Sprintf(logOperators[op], columnType, columnDataType, item.Key.Key)
|
||||
}
|
||||
|
||||
func buildAttributeFilter(item v3.FilterItem) (string, error) {
|
||||
// check if the user is searching for value in all attributes
|
||||
key := item.Key.Key
|
||||
op := v3.FilterOperator(strings.ToLower(string(item.Operator)))
|
||||
|
||||
var value interface{}
|
||||
var err error
|
||||
if op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists {
|
||||
value, err = utils.ValidateAndCastValue(item.Value, item.Key.DataType)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(nitya): as of now __attrs is only supports attributes_string. Discuss more on this
|
||||
// also for eq and contains as now it does a exact match
|
||||
if key == "__attrs" {
|
||||
if (op != v3.FilterOperatorEqual && op != v3.FilterOperatorContains) || item.Key.DataType != v3.AttributeKeyDataTypeString {
|
||||
return "", fmt.Errorf("only = operator and string data type is supported for __attrs")
|
||||
}
|
||||
val := utils.ClickHouseFormattedValue(item.Value)
|
||||
return fmt.Sprintf("has(mapValues(attributes_string), %s)", val), nil
|
||||
}
|
||||
|
||||
keyName := getClickhouseKey(item.Key)
|
||||
fmtVal := utils.ClickHouseFormattedValue(value)
|
||||
|
||||
if logsOp, ok := logOperators[op]; ok {
|
||||
switch op {
|
||||
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
|
||||
return getExistsNexistsFilter(op, item), nil
|
||||
case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex:
|
||||
|
||||
return fmt.Sprintf(logsOp, keyName, fmtVal), nil
|
||||
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
|
||||
val := utils.QuoteEscapedStringForContains(fmt.Sprintf("%s", item.Value))
|
||||
// for body the contains is case insensitive
|
||||
if keyName == BODY {
|
||||
return fmt.Sprintf("lower(%s) %s lower('%%%s%%')", keyName, logsOp, val), nil
|
||||
} else {
|
||||
return fmt.Sprintf("%s %s '%%%s%%'", keyName, logsOp, val), nil
|
||||
}
|
||||
default:
|
||||
// for use lower for like and ilike
|
||||
if op == v3.FilterOperatorLike || op == v3.FilterOperatorNotLike {
|
||||
if keyName == BODY {
|
||||
keyName = fmt.Sprintf("lower(%s)", keyName)
|
||||
fmtVal = fmt.Sprintf("lower(%s)", fmtVal)
|
||||
}
|
||||
}
|
||||
return fmt.Sprintf("%s %s %s", keyName, logsOp, fmtVal), nil
|
||||
}
|
||||
} else {
|
||||
return "", fmt.Errorf("unsupported operator: %s", op)
|
||||
}
|
||||
}
|
||||
|
||||
func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey, aggregateAttribute v3.AttributeKey) (string, error) {
|
||||
var conditions []string
|
||||
|
||||
if fs == nil || len(fs.Items) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
for _, item := range fs.Items {
|
||||
// skip if it's a resource attribute
|
||||
if item.Key.Type == v3.AttributeKeyTypeResource {
|
||||
continue
|
||||
}
|
||||
|
||||
// if the filter is json filter
|
||||
if item.Key.IsJSON {
|
||||
filter, err := GetJSONFilter(item)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
conditions = append(conditions, filter)
|
||||
continue
|
||||
}
|
||||
|
||||
// generate the filter
|
||||
filter, err := buildAttributeFilter(item)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
conditions = append(conditions, filter)
|
||||
|
||||
// add extra condition for map contains
|
||||
// by default clickhouse is not able to utilize indexes for keys with all operators.
|
||||
// mapContains forces the use of index.
|
||||
op := v3.FilterOperator(strings.ToLower(string(item.Operator)))
|
||||
if item.Key.IsColumn == false && op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists {
|
||||
conditions = append(conditions, getExistsNexistsFilter(v3.FilterOperatorExists, item))
|
||||
}
|
||||
}
|
||||
|
||||
// add group by conditions to filter out log lines which doesn't have the key
|
||||
for _, attr := range groupBy {
|
||||
// skip if it's a resource attribute
|
||||
if attr.Type == v3.AttributeKeyTypeResource {
|
||||
continue
|
||||
}
|
||||
|
||||
if !attr.IsColumn {
|
||||
columnType := logsV3.GetClickhouseLogsColumnType(attr.Type)
|
||||
columnDataType := getClickhouseLogsColumnDataType(attr.DataType)
|
||||
conditions = append(conditions, fmt.Sprintf("mapContains(%s_%s, '%s')", columnType, columnDataType, attr.Key))
|
||||
} else if attr.Type != v3.AttributeKeyTypeUnspecified {
|
||||
// for materialzied columns and not the top level static fields
|
||||
name := utils.GetClickhouseColumnNameV2(string(attr.Type), string(attr.DataType), attr.Key)
|
||||
conditions = append(conditions, fmt.Sprintf("`%s_exists`=true", name))
|
||||
}
|
||||
}
|
||||
|
||||
// add conditions for aggregate attribute
|
||||
if aggregateAttribute.Key != "" && aggregateAttribute.Type != v3.AttributeKeyTypeResource {
|
||||
existsFilter := getExistsNexistsFilter(v3.FilterOperatorExists, v3.FilterItem{Key: aggregateAttribute})
|
||||
conditions = append(conditions, existsFilter)
|
||||
}
|
||||
|
||||
queryString := strings.Join(conditions, " AND ")
|
||||
return queryString, nil
|
||||
}
|
||||
|
||||
// orderBy returns a string of comma separated tags for order by clause
|
||||
// if there are remaining items which are not present in tags they are also added
|
||||
// if the order is not specified, it defaults to ASC
|
||||
func orderBy(panelType v3.PanelType, items []v3.OrderBy, tagLookup map[string]struct{}) []string {
|
||||
var orderBy []string
|
||||
|
||||
for _, item := range items {
|
||||
if item.ColumnName == constants.SigNozOrderByValue {
|
||||
orderBy = append(orderBy, fmt.Sprintf("value %s", item.Order))
|
||||
} else if _, ok := tagLookup[item.ColumnName]; ok {
|
||||
orderBy = append(orderBy, fmt.Sprintf("`%s` %s", item.ColumnName, item.Order))
|
||||
} else if panelType == v3.PanelTypeList {
|
||||
attr := v3.AttributeKey{Key: item.ColumnName, DataType: item.DataType, Type: item.Type, IsColumn: item.IsColumn}
|
||||
name := getClickhouseKey(attr)
|
||||
if item.IsColumn {
|
||||
name = "`" + name + "`"
|
||||
}
|
||||
orderBy = append(orderBy, fmt.Sprintf("%s %s", name, item.Order))
|
||||
}
|
||||
}
|
||||
return orderBy
|
||||
}
|
||||
|
||||
func orderByAttributeKeyTags(panelType v3.PanelType, items []v3.OrderBy, tags []v3.AttributeKey) string {
|
||||
|
||||
tagLookup := map[string]struct{}{}
|
||||
for _, v := range tags {
|
||||
tagLookup[v.Key] = struct{}{}
|
||||
}
|
||||
|
||||
orderByArray := orderBy(panelType, items, tagLookup)
|
||||
|
||||
if len(orderByArray) == 0 {
|
||||
if panelType == v3.PanelTypeList {
|
||||
orderByArray = append(orderByArray, constants.TIMESTAMP+" DESC")
|
||||
} else {
|
||||
orderByArray = append(orderByArray, "value DESC")
|
||||
}
|
||||
}
|
||||
|
||||
str := strings.Join(orderByArray, ",")
|
||||
return str
|
||||
}
|
||||
|
||||
func generateAggregateClause(aggOp v3.AggregateOperator,
|
||||
aggKey string,
|
||||
step int64,
|
||||
preferRPM bool,
|
||||
timeFilter string,
|
||||
whereClause string,
|
||||
groupBy string,
|
||||
having string,
|
||||
orderBy string,
|
||||
) (string, error) {
|
||||
queryTmpl := " %s as value from signoz_logs." + DISTRIBUTED_LOGS_V2 +
|
||||
" where " + timeFilter + "%s" +
|
||||
"%s%s" +
|
||||
"%s"
|
||||
switch aggOp {
|
||||
case v3.AggregateOperatorRate:
|
||||
rate := float64(step)
|
||||
if preferRPM {
|
||||
rate = rate / 60.0
|
||||
}
|
||||
|
||||
op := fmt.Sprintf("count(%s)/%f", aggKey, rate)
|
||||
query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy)
|
||||
return query, nil
|
||||
case
|
||||
v3.AggregateOperatorRateSum,
|
||||
v3.AggregateOperatorRateMax,
|
||||
v3.AggregateOperatorRateAvg,
|
||||
v3.AggregateOperatorRateMin:
|
||||
rate := float64(step)
|
||||
if preferRPM {
|
||||
rate = rate / 60.0
|
||||
}
|
||||
|
||||
op := fmt.Sprintf("%s(%s)/%f", logsV3.AggregateOperatorToSQLFunc[aggOp], aggKey, rate)
|
||||
query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy)
|
||||
return query, nil
|
||||
case
|
||||
v3.AggregateOperatorP05,
|
||||
v3.AggregateOperatorP10,
|
||||
v3.AggregateOperatorP20,
|
||||
v3.AggregateOperatorP25,
|
||||
v3.AggregateOperatorP50,
|
||||
v3.AggregateOperatorP75,
|
||||
v3.AggregateOperatorP90,
|
||||
v3.AggregateOperatorP95,
|
||||
v3.AggregateOperatorP99:
|
||||
op := fmt.Sprintf("quantile(%v)(%s)", logsV3.AggregateOperatorToPercentile[aggOp], aggKey)
|
||||
query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax:
|
||||
op := fmt.Sprintf("%s(%s)", logsV3.AggregateOperatorToSQLFunc[aggOp], aggKey)
|
||||
query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorCount:
|
||||
op := "toFloat64(count(*))"
|
||||
query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy)
|
||||
return query, nil
|
||||
case v3.AggregateOperatorCountDistinct:
|
||||
op := fmt.Sprintf("toFloat64(count(distinct(%s)))", aggKey)
|
||||
query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy)
|
||||
return query, nil
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported aggregate operator")
|
||||
}
|
||||
}
|
||||
|
||||
func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.BuilderQuery, graphLimitQtype string, preferRPM bool) (string, error) {
|
||||
// timerange will be sent in epoch millisecond
|
||||
logsStart := utils.GetEpochNanoSecs(start)
|
||||
logsEnd := utils.GetEpochNanoSecs(end)
|
||||
|
||||
// -1800 this is added so that the bucket start considers all the fingerprints.
|
||||
bucketStart := logsStart/NANOSECOND - 1800
|
||||
bucketEnd := logsEnd / NANOSECOND
|
||||
|
||||
// timestamp filter , bucket_start filter is added for primary key
|
||||
timeFilter := fmt.Sprintf("(timestamp >= %d AND timestamp <= %d) AND (ts_bucket_start >= %d AND ts_bucket_start <= %d)", logsStart, logsEnd, bucketStart, bucketEnd)
|
||||
|
||||
// build the where clause for main table
|
||||
filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, mq.AggregateAttribute)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if filterSubQuery != "" {
|
||||
filterSubQuery = " AND " + filterSubQuery
|
||||
}
|
||||
|
||||
// build the where clause for resource table
|
||||
resourceSubQuery, err := buildResourceSubQuery(bucketStart, bucketEnd, mq.Filters, mq.GroupBy, mq.AggregateAttribute, false)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
// join both the filter clauses
|
||||
if resourceSubQuery != "" {
|
||||
filterSubQuery = filterSubQuery + " AND (resource_fingerprint GLOBAL IN " + resourceSubQuery + ")"
|
||||
}
|
||||
|
||||
// get the select labels
|
||||
selectLabels := getSelectLabels(mq.AggregateOperator, mq.GroupBy)
|
||||
|
||||
// get the order by clause
|
||||
orderBy := orderByAttributeKeyTags(panelType, mq.OrderBy, mq.GroupBy)
|
||||
if panelType != v3.PanelTypeList && orderBy != "" {
|
||||
orderBy = " order by " + orderBy
|
||||
}
|
||||
|
||||
// if noop create the query and return
|
||||
if mq.AggregateOperator == v3.AggregateOperatorNoOp {
|
||||
// with noop any filter or different order by other than ts will use new table
|
||||
sqlSelect := constants.LogsSQLSelectV2
|
||||
queryTmpl := sqlSelect + "from signoz_logs.%s where %s%s order by %s"
|
||||
query := fmt.Sprintf(queryTmpl, DISTRIBUTED_LOGS_V2, timeFilter, filterSubQuery, orderBy)
|
||||
return query, nil
|
||||
// ---- NOOP ends here ----
|
||||
}
|
||||
|
||||
// ---- FOR aggregation queries ----
|
||||
|
||||
// get the having conditions
|
||||
having := logsV3.Having(mq.Having)
|
||||
if having != "" {
|
||||
having = " having " + having
|
||||
}
|
||||
|
||||
// get the group by clause
|
||||
groupBy := logsV3.GroupByAttributeKeyTags(panelType, graphLimitQtype, mq.GroupBy...)
|
||||
if panelType != v3.PanelTypeList && groupBy != "" {
|
||||
groupBy = " group by " + groupBy
|
||||
}
|
||||
|
||||
// get the aggregation key
|
||||
aggregationKey := ""
|
||||
if mq.AggregateAttribute.Key != "" {
|
||||
aggregationKey = getClickhouseKey(mq.AggregateAttribute)
|
||||
}
|
||||
|
||||
// for limit queries, there are two queries formed
|
||||
// in the second query we need to add the placeholder so that first query can be placed
|
||||
if graphLimitQtype == constants.SecondQueryGraphLimit {
|
||||
filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", logsV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)"
|
||||
}
|
||||
|
||||
aggClause, err := generateAggregateClause(mq.AggregateOperator, aggregationKey, step, preferRPM, timeFilter, filterSubQuery, groupBy, having, orderBy)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var queryTmplPrefix string
|
||||
if graphLimitQtype == constants.FirstQueryGraphLimit {
|
||||
queryTmplPrefix = "SELECT"
|
||||
} else if panelType == v3.PanelTypeTable {
|
||||
queryTmplPrefix =
|
||||
"SELECT"
|
||||
// step or aggregate interval is whole time period in case of table panel
|
||||
step = (utils.GetEpochNanoSecs(end) - utils.GetEpochNanoSecs(start)) / NANOSECOND
|
||||
} else if panelType == v3.PanelTypeGraph || panelType == v3.PanelTypeValue {
|
||||
// Select the aggregate value for interval
|
||||
queryTmplPrefix =
|
||||
fmt.Sprintf("SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts,", step)
|
||||
}
|
||||
|
||||
query := queryTmplPrefix + selectLabels + aggClause
|
||||
|
||||
// for limit query this is the first query,
|
||||
// we don't the the aggregation value here as we are just concerned with the names of group by
|
||||
// for applying the limit
|
||||
if graphLimitQtype == constants.FirstQueryGraphLimit {
|
||||
query = "SELECT " + logsV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + query + ")"
|
||||
}
|
||||
return query, nil
|
||||
}
|
||||
|
||||
func buildLogsLiveTailQuery(mq *v3.BuilderQuery) (string, error) {
|
||||
filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, v3.AttributeKey{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// no values for bucket start and end
|
||||
resourceSubQuery, err := buildResourceSubQuery(0, 0, mq.Filters, mq.GroupBy, mq.AggregateAttribute, true)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
// join both the filter clauses
|
||||
if resourceSubQuery != "" {
|
||||
filterSubQuery = filterSubQuery + " AND (resource_fingerprint GLOBAL IN " + resourceSubQuery
|
||||
}
|
||||
|
||||
// the reader will add the timestamp and id filters
|
||||
switch mq.AggregateOperator {
|
||||
case v3.AggregateOperatorNoOp:
|
||||
query := constants.LogsSQLSelectV2 + "from signoz_logs." + DISTRIBUTED_LOGS_V2 + " where "
|
||||
if len(filterSubQuery) > 0 {
|
||||
query = query + filterSubQuery + " AND "
|
||||
}
|
||||
|
||||
return query, nil
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported aggregate operator in live tail")
|
||||
}
|
||||
}
|
||||
|
||||
// PrepareLogsQuery prepares the query for logs
|
||||
func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options v3.LogQBOptions) (string, error) {
|
||||
|
||||
// adjust the start and end time to the step interval
|
||||
// NOTE: Disabling this as it's creating confusion between charts and actual data
|
||||
// if panelType != v3.PanelTypeList {
|
||||
// start = start - (start % (mq.StepInterval * 1000))
|
||||
// end = end - (end % (mq.StepInterval * 1000))
|
||||
// }
|
||||
|
||||
if options.IsLivetailQuery {
|
||||
query, err := buildLogsLiveTailQuery(mq)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return query, nil
|
||||
} else if options.GraphLimitQtype == constants.FirstQueryGraphLimit {
|
||||
// give me just the group_by names (no values)
|
||||
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
query = logsV3.AddLimitToQuery(query, mq.Limit)
|
||||
|
||||
return query, nil
|
||||
} else if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
|
||||
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return query, nil
|
||||
}
|
||||
|
||||
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if panelType == v3.PanelTypeValue {
|
||||
query, err = logsV3.ReduceQuery(query, mq.ReduceTo, mq.AggregateOperator)
|
||||
}
|
||||
|
||||
if panelType == v3.PanelTypeList {
|
||||
// check if limit exceeded
|
||||
if mq.Limit > 0 && mq.Offset >= mq.Limit {
|
||||
return "", fmt.Errorf("max limit exceeded")
|
||||
}
|
||||
|
||||
if mq.PageSize > 0 {
|
||||
if mq.Limit > 0 && mq.Offset+mq.PageSize > mq.Limit {
|
||||
query = logsV3.AddLimitToQuery(query, mq.Limit-mq.Offset)
|
||||
} else {
|
||||
query = logsV3.AddLimitToQuery(query, mq.PageSize)
|
||||
}
|
||||
|
||||
// add offset to the query only if it is not orderd by timestamp.
|
||||
if !logsV3.IsOrderByTs(mq.OrderBy) {
|
||||
query = logsV3.AddOffsetToQuery(query, mq.Offset)
|
||||
}
|
||||
|
||||
} else {
|
||||
query = logsV3.AddLimitToQuery(query, mq.Limit)
|
||||
}
|
||||
} else if panelType == v3.PanelTypeTable {
|
||||
query = logsV3.AddLimitToQuery(query, mq.Limit)
|
||||
}
|
||||
|
||||
return query, err
|
||||
}
|
||||
|
1099
pkg/query-service/app/logs/v4/query_builder_test.go
Normal file
1099
pkg/query-service/app/logs/v4/query_builder_test.go
Normal file
File diff suppressed because it is too large
Load Diff
@ -164,7 +164,7 @@ func buildResourceFiltersFromAggregateAttribute(aggregateAttribute v3.AttributeK
|
||||
return ""
|
||||
}
|
||||
|
||||
func buildResourceSubQuery(bucketStart, bucketEnd int64, fs *v3.FilterSet, groupBy []v3.AttributeKey, aggregateAttribute v3.AttributeKey) (string, error) {
|
||||
func buildResourceSubQuery(bucketStart, bucketEnd int64, fs *v3.FilterSet, groupBy []v3.AttributeKey, aggregateAttribute v3.AttributeKey, isLiveTail bool) (string, error) {
|
||||
|
||||
// BUILD THE WHERE CLAUSE
|
||||
var conditions []string
|
||||
@ -193,9 +193,14 @@ func buildResourceSubQuery(bucketStart, bucketEnd int64, fs *v3.FilterSet, group
|
||||
conditionStr := strings.Join(conditions, " AND ")
|
||||
|
||||
// BUILD THE FINAL QUERY
|
||||
query := fmt.Sprintf("SELECT fingerprint FROM signoz_logs.%s WHERE (seen_at_ts_bucket_start >= %d) AND (seen_at_ts_bucket_start <= %d) AND ", DISTRIBUTED_LOGS_V2_RESOURCE, bucketStart, bucketEnd)
|
||||
|
||||
query = "(" + query + conditionStr + ")"
|
||||
var query string
|
||||
if isLiveTail {
|
||||
query = fmt.Sprintf("SELECT fingerprint FROM signoz_logs.%s WHERE ", DISTRIBUTED_LOGS_V2_RESOURCE)
|
||||
query = "(" + query + conditionStr
|
||||
} else {
|
||||
query = fmt.Sprintf("SELECT fingerprint FROM signoz_logs.%s WHERE (seen_at_ts_bucket_start >= %d) AND (seen_at_ts_bucket_start <= %d) AND ", DISTRIBUTED_LOGS_V2_RESOURCE, bucketStart, bucketEnd)
|
||||
query = "(" + query + conditionStr + ")"
|
||||
}
|
||||
|
||||
return query, nil
|
||||
}
|
||||
|
@ -469,7 +469,7 @@ func Test_buildResourceSubQuery(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := buildResourceSubQuery(tt.args.bucketStart, tt.args.bucketEnd, tt.args.fs, tt.args.groupBy, tt.args.aggregateAttribute)
|
||||
got, err := buildResourceSubQuery(tt.args.bucketStart, tt.args.bucketEnd, tt.args.fs, tt.args.groupBy, tt.args.aggregateAttribute, false)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("buildResourceSubQuery() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
|
@ -316,6 +316,12 @@ const (
|
||||
"CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64," +
|
||||
"CAST((attributes_bool_key, attributes_bool_value), 'Map(String, Bool)') as attributes_bool," +
|
||||
"CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string "
|
||||
LogsSQLSelectV2 = "SELECT " +
|
||||
"timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, " +
|
||||
"attributes_string, " +
|
||||
"attributes_number, " +
|
||||
"attributes_bool, " +
|
||||
"resources_string "
|
||||
TracesExplorerViewSQLSelectWithSubQuery = "WITH subQuery AS (SELECT distinct on (traceID) traceID, durationNano, " +
|
||||
"serviceName, name FROM %s.%s WHERE parentSpanID = '' AND %s %s ORDER BY durationNano DESC "
|
||||
TracesExplorerViewSQLSelectQuery = "SELECT subQuery.serviceName, subQuery.name, count() AS " +
|
||||
@ -380,6 +386,12 @@ var StaticFieldsLogsV3 = map[string]v3.AttributeKey{
|
||||
Type: v3.AttributeKeyTypeUnspecified,
|
||||
IsColumn: true,
|
||||
},
|
||||
"__attrs": {
|
||||
Key: "__attrs",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeUnspecified,
|
||||
IsColumn: true,
|
||||
},
|
||||
}
|
||||
|
||||
const SigNozOrderByValue = "#SIGNOZ_VALUE"
|
||||
|
@ -1290,3 +1290,9 @@ type URLShareableOptions struct {
|
||||
Format string `json:"format"`
|
||||
SelectColumns []AttributeKey `json:"selectColumns"`
|
||||
}
|
||||
|
||||
type LogQBOptions struct {
|
||||
GraphLimitQtype string
|
||||
IsLivetailQuery bool
|
||||
PreferRPM bool
|
||||
}
|
||||
|
@ -272,6 +272,28 @@ func GetClickhouseColumnName(typeName string, dataType, field string) string {
|
||||
return colName
|
||||
}
|
||||
|
||||
func GetClickhouseColumnNameV2(typeName string, dataType, field string) string {
|
||||
if typeName == string(v3.AttributeKeyTypeTag) {
|
||||
typeName = constants.Attributes
|
||||
}
|
||||
|
||||
if typeName != string(v3.AttributeKeyTypeResource) {
|
||||
typeName = typeName[:len(typeName)-1]
|
||||
}
|
||||
|
||||
dataType = strings.ToLower(dataType)
|
||||
|
||||
if dataType == "int64" || dataType == "float64" {
|
||||
dataType = "number"
|
||||
}
|
||||
|
||||
// if name contains . replace it with `$$`
|
||||
field = strings.ReplaceAll(field, ".", "$$")
|
||||
|
||||
colName := fmt.Sprintf("%s_%s_%s", strings.ToLower(typeName), dataType, field)
|
||||
return colName
|
||||
}
|
||||
|
||||
// GetEpochNanoSecs takes epoch and returns it in ns
|
||||
func GetEpochNanoSecs(epoch int64) int64 {
|
||||
temp := epoch
|
||||
|
Loading…
x
Reference in New Issue
Block a user