mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-14 06:15:57 +08:00
feat: don't throw error if unable to enrich metadata (#2608)
* feat: don't throw error is unable to enrich metadata * feat: remove isDefaultEnriched key * feat: validate and cast added * feat: function name corrected --------- Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
parent
b6a455d264
commit
d1a256a6d5
@ -53,20 +53,22 @@ var logOperators = map[v3.FilterOperator]string{
|
|||||||
// (todo) check contains/not contains/
|
// (todo) check contains/not contains/
|
||||||
}
|
}
|
||||||
|
|
||||||
func encrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.AttributeKey) (v3.AttributeKey, error) {
|
func enrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.AttributeKey) v3.AttributeKey {
|
||||||
if field.Type == "" || field.DataType == "" {
|
if field.Type == "" || field.DataType == "" {
|
||||||
// check if the field is present in the fields map
|
// check if the field is present in the fields map
|
||||||
if existingField, ok := fields[field.Key]; ok {
|
if existingField, ok := fields[field.Key]; ok {
|
||||||
if existingField.IsColumn {
|
if existingField.IsColumn {
|
||||||
return field, nil
|
return field
|
||||||
}
|
}
|
||||||
field.Type = existingField.Type
|
field.Type = existingField.Type
|
||||||
field.DataType = existingField.DataType
|
field.DataType = existingField.DataType
|
||||||
} else {
|
} else {
|
||||||
return field, fmt.Errorf("field not found to enrich metadata")
|
// enrich with default values if metadata is not found
|
||||||
|
field.Type = v3.AttributeKeyTypeTag
|
||||||
|
field.DataType = v3.AttributeKeyDataTypeString
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return field, nil
|
return field
|
||||||
}
|
}
|
||||||
|
|
||||||
func getClickhouseLogsColumnType(columnType v3.AttributeKeyType) string {
|
func getClickhouseLogsColumnType(columnType v3.AttributeKeyType) string {
|
||||||
@ -92,13 +94,8 @@ func getClickhouseColumnName(key v3.AttributeKey, fields map[string]v3.Attribute
|
|||||||
clickhouseColumn := key.Key
|
clickhouseColumn := key.Key
|
||||||
//if the key is present in the topLevelColumn then it will be only searched in those columns,
|
//if the key is present in the topLevelColumn then it will be only searched in those columns,
|
||||||
//regardless if it is indexed/present again in resource or column attribute
|
//regardless if it is indexed/present again in resource or column attribute
|
||||||
var err error
|
|
||||||
_, isTopLevelCol := constants.LogsTopLevelColumnsV3[key.Key]
|
_, isTopLevelCol := constants.LogsTopLevelColumnsV3[key.Key]
|
||||||
if !isTopLevelCol && !key.IsColumn {
|
if !isTopLevelCol && !key.IsColumn {
|
||||||
key, err = encrichFieldWithMetadata(key, fields)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
columnType := getClickhouseLogsColumnType(key.Type)
|
columnType := getClickhouseLogsColumnType(key.Type)
|
||||||
columnDataType := getClickhouseLogsColumnDataType(key.DataType)
|
columnDataType := getClickhouseLogsColumnDataType(key.DataType)
|
||||||
clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key)
|
clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key)
|
||||||
@ -113,7 +110,8 @@ func getSelectLabels(aggregatorOperator v3.AggregateOperator, groupBy []v3.Attri
|
|||||||
selectLabels = ""
|
selectLabels = ""
|
||||||
} else {
|
} else {
|
||||||
for _, tag := range groupBy {
|
for _, tag := range groupBy {
|
||||||
columnName, err := getClickhouseColumnName(tag, fields)
|
enrichedTag := enrichFieldWithMetadata(tag, fields)
|
||||||
|
columnName, err := getClickhouseColumnName(enrichedTag, fields)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -128,33 +126,31 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
|
|||||||
|
|
||||||
if fs != nil && len(fs.Items) != 0 {
|
if fs != nil && len(fs.Items) != 0 {
|
||||||
for _, item := range fs.Items {
|
for _, item := range fs.Items {
|
||||||
toFormat := item.Value
|
|
||||||
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
|
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
|
||||||
|
key := enrichFieldWithMetadata(item.Key, fields)
|
||||||
|
value, err := utils.ValidateAndCastValue(item.Value, key.DataType)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err)
|
||||||
|
}
|
||||||
if logsOp, ok := logOperators[op]; ok {
|
if logsOp, ok := logOperators[op]; ok {
|
||||||
switch op {
|
switch op {
|
||||||
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
|
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
|
||||||
//(todo): refractor this later
|
|
||||||
key, err := encrichFieldWithMetadata(item.Key, fields)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
columnType := getClickhouseLogsColumnType(key.Type)
|
columnType := getClickhouseLogsColumnType(key.Type)
|
||||||
columnDataType := getClickhouseLogsColumnDataType(key.DataType)
|
columnDataType := getClickhouseLogsColumnDataType(key.DataType)
|
||||||
conditions = append(conditions, fmt.Sprintf(logsOp, columnType, columnDataType, item.Key.Key))
|
conditions = append(conditions, fmt.Sprintf(logsOp, columnType, columnDataType, key.Key))
|
||||||
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
|
case v3.FilterOperatorContains, v3.FilterOperatorNotContains:
|
||||||
// generate the key
|
columnName, err := getClickhouseColumnName(key, fields)
|
||||||
columnName, err := getClickhouseColumnName(item.Key, fields)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
conditions = append(conditions, fmt.Sprintf("%s %s '%%%s%%'", columnName, logsOp, item.Value))
|
conditions = append(conditions, fmt.Sprintf("%s %s '%%%s%%'", columnName, logsOp, item.Value))
|
||||||
default:
|
default:
|
||||||
// generate the key
|
columnName, err := getClickhouseColumnName(key, fields)
|
||||||
columnName, err := getClickhouseColumnName(item.Key, fields)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
fmtVal := utils.ClickHouseFormattedValue(toFormat)
|
|
||||||
|
fmtVal := utils.ClickHouseFormattedValue(value)
|
||||||
conditions = append(conditions, fmt.Sprintf("%s %s %s", columnName, logsOp, fmtVal))
|
conditions = append(conditions, fmt.Sprintf("%s %s %s", columnName, logsOp, fmtVal))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -165,10 +161,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey,
|
|||||||
|
|
||||||
// add group by conditions to filter out log lines which doesn't have the key
|
// add group by conditions to filter out log lines which doesn't have the key
|
||||||
for _, attr := range groupBy {
|
for _, attr := range groupBy {
|
||||||
enrichedAttr, err := encrichFieldWithMetadata(attr, fields)
|
enrichedAttr := enrichFieldWithMetadata(attr, fields)
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
if !enrichedAttr.IsColumn {
|
if !enrichedAttr.IsColumn {
|
||||||
columnType := getClickhouseLogsColumnType(enrichedAttr.Type)
|
columnType := getClickhouseLogsColumnType(enrichedAttr.Type)
|
||||||
columnDataType := getClickhouseLogsColumnDataType(enrichedAttr.DataType)
|
columnDataType := getClickhouseLogsColumnDataType(enrichedAttr.DataType)
|
||||||
@ -231,7 +224,8 @@ func buildLogsQuery(start, end, step int64, mq *v3.BuilderQuery, fields map[stri
|
|||||||
|
|
||||||
aggregationKey := ""
|
aggregationKey := ""
|
||||||
if mq.AggregateAttribute.Key != "" {
|
if mq.AggregateAttribute.Key != "" {
|
||||||
aggregationKey, err = getClickhouseColumnName(mq.AggregateAttribute, fields)
|
enrichedAttribute := enrichFieldWithMetadata(mq.AggregateAttribute, fields)
|
||||||
|
aggregationKey, err = getClickhouseColumnName(enrichedAttribute, fields)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -269,14 +263,10 @@ func buildLogsQuery(start, end, step int64, mq *v3.BuilderQuery, fields map[stri
|
|||||||
return query, nil
|
return query, nil
|
||||||
case v3.AggregateOperatorCount:
|
case v3.AggregateOperatorCount:
|
||||||
if mq.AggregateAttribute.Key != "" {
|
if mq.AggregateAttribute.Key != "" {
|
||||||
field, err := encrichFieldWithMetadata(mq.AggregateAttribute, fields)
|
field := enrichFieldWithMetadata(mq.AggregateAttribute, fields)
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
columnType := getClickhouseLogsColumnType(field.Type)
|
columnType := getClickhouseLogsColumnType(field.Type)
|
||||||
columnDataType := getClickhouseLogsColumnDataType(field.DataType)
|
columnDataType := getClickhouseLogsColumnDataType(field.DataType)
|
||||||
filterSubQuery = fmt.Sprintf("%s AND has(%s_%s_key, '%s')", filterSubQuery, columnType, columnDataType, mq.AggregateAttribute.Key)
|
filterSubQuery = fmt.Sprintf("%s AND has(%s_%s_key, '%s')", filterSubQuery, columnType, columnDataType, mq.AggregateAttribute.Key)
|
||||||
// check having
|
|
||||||
}
|
}
|
||||||
|
|
||||||
op := "toFloat64(count(*))"
|
op := "toFloat64(count(*))"
|
||||||
|
@ -100,6 +100,8 @@ var timeSeriesFilterQueryData = []struct {
|
|||||||
FilterSet *v3.FilterSet
|
FilterSet *v3.FilterSet
|
||||||
GroupBy []v3.AttributeKey
|
GroupBy []v3.AttributeKey
|
||||||
ExpectedFilter string
|
ExpectedFilter string
|
||||||
|
Fields map[string]v3.AttributeKey
|
||||||
|
Error string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
Name: "Test attribute and resource attribute",
|
Name: "Test attribute and resource attribute",
|
||||||
@ -173,6 +175,20 @@ var timeSeriesFilterQueryData = []struct {
|
|||||||
}},
|
}},
|
||||||
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%'",
|
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%'",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "Test no metadata",
|
||||||
|
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
|
||||||
|
{Key: v3.AttributeKey{Key: "host"}, Value: "102.", Operator: "ncontains"},
|
||||||
|
}},
|
||||||
|
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%'",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Test no metadata number",
|
||||||
|
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
|
||||||
|
{Key: v3.AttributeKey{Key: "bytes"}, Value: 102, Operator: "="},
|
||||||
|
}},
|
||||||
|
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'bytes')] = '102'",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Name: "Test groupBy",
|
Name: "Test groupBy",
|
||||||
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
|
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
|
||||||
@ -189,14 +205,35 @@ var timeSeriesFilterQueryData = []struct {
|
|||||||
GroupBy: []v3.AttributeKey{{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}},
|
GroupBy: []v3.AttributeKey{{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}},
|
||||||
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%'",
|
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%'",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "Wrong data",
|
||||||
|
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
|
||||||
|
{Key: v3.AttributeKey{Key: "bytes"}, Value: true, Operator: "="},
|
||||||
|
}},
|
||||||
|
Fields: map[string]v3.AttributeKey{"bytes": {Key: "bytes", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag}},
|
||||||
|
Error: "failed to validate and cast value for bytes: invalid data type, expected float, got bool",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Cast data",
|
||||||
|
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
|
||||||
|
{Key: v3.AttributeKey{Key: "bytes"}, Value: 102, Operator: "="},
|
||||||
|
}},
|
||||||
|
Fields: map[string]v3.AttributeKey{"bytes": {Key: "bytes", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}},
|
||||||
|
ExpectedFilter: " AND attributes_int64_value[indexOf(attributes_int64_key, 'bytes')] = 102",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuildLogsTimeSeriesFilterQuery(t *testing.T) {
|
func TestBuildLogsTimeSeriesFilterQuery(t *testing.T) {
|
||||||
for _, tt := range timeSeriesFilterQueryData {
|
for _, tt := range timeSeriesFilterQueryData {
|
||||||
Convey("TestBuildLogsTimeSeriesFilterQuery", t, func() {
|
Convey("TestBuildLogsTimeSeriesFilterQuery", t, func() {
|
||||||
query, err := buildLogsTimeSeriesFilterQuery(tt.FilterSet, tt.GroupBy, map[string]v3.AttributeKey{})
|
query, err := buildLogsTimeSeriesFilterQuery(tt.FilterSet, tt.GroupBy, tt.Fields)
|
||||||
So(err, ShouldBeNil)
|
if tt.Error != "" {
|
||||||
So(query, ShouldEqual, tt.ExpectedFilter)
|
So(err.Error(), ShouldEqual, tt.Error)
|
||||||
|
} else {
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(query, ShouldEqual, tt.ExpectedFilter)
|
||||||
|
}
|
||||||
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -254,7 +291,7 @@ var testBuildLogsQueryData = []struct {
|
|||||||
Expression: "A",
|
Expression: "A",
|
||||||
},
|
},
|
||||||
TableName: "logs",
|
TableName: "logs",
|
||||||
ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_float64_value[indexOf(attributes_float64_key, 'bytes')] > 100 AND has(attributes_string_key, 'user_name') group by ts order by ts",
|
ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_float64_value[indexOf(attributes_float64_key, 'bytes')] > 100.000000 AND has(attributes_string_key, 'user_name') group by ts order by ts",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "Test aggregate count distinct and order by value",
|
Name: "Test aggregate count distinct and order by value",
|
||||||
|
Loading…
x
Reference in New Issue
Block a user