mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-06-21 19:35:17 +08:00
Fix: QS: Log Pipelines: generate correct nil checks for operators referencing fields like attributes["http.status.code"] (#4284)
* chore: add test validating that using paths like attributes["http.method"] works * chore: refactor nil checks on processor fields generated for pipelines * chore: get nil checks working on paths like attributes["http.method"] * chore: use parsed AST for generating nil checks for add operator value expressions * chore: some cleanup * chore: some more cleanup * chore: some more cleanup * chore: some more cleanup --------- Co-authored-by: Nityananda Gohain <nityanandagohain@gmail.com>
This commit is contained in:
parent
263ac9fa5a
commit
ec27916fa5
@ -2,8 +2,12 @@ package logparsingpipeline
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/antonmedv/expr"
|
||||
"github.com/antonmedv/expr/ast"
|
||||
"github.com/antonmedv/expr/parser"
|
||||
"github.com/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
|
||||
@ -81,12 +85,16 @@ func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) {
|
||||
}
|
||||
|
||||
if operator.Type == "regex_parser" {
|
||||
parseFromParts := strings.Split(operator.ParseFrom, ".")
|
||||
parseFromPath := strings.Join(parseFromParts, "?.")
|
||||
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for parseFrom of regex op %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
operator.If = fmt.Sprintf(
|
||||
`%s != nil && %s matches "%s"`,
|
||||
parseFromPath,
|
||||
parseFromPath,
|
||||
`%s && %s matches "%s"`,
|
||||
parseFromNotNilCheck,
|
||||
operator.ParseFrom,
|
||||
strings.ReplaceAll(
|
||||
strings.ReplaceAll(operator.Regex, `\`, `\\`),
|
||||
`"`, `\"`,
|
||||
@ -94,37 +102,71 @@ func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) {
|
||||
)
|
||||
|
||||
} else if operator.Type == "json_parser" {
|
||||
parseFromParts := strings.Split(operator.ParseFrom, ".")
|
||||
parseFromPath := strings.Join(parseFromParts, "?.")
|
||||
operator.If = fmt.Sprintf(`%s != nil && %s matches "^\\s*{.*}\\s*$"`, parseFromPath, parseFromPath)
|
||||
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for parseFrom of json parser op %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
operator.If = fmt.Sprintf(
|
||||
`%s && %s matches "^\\s*{.*}\\s*$"`, parseFromNotNilCheck, operator.ParseFrom,
|
||||
)
|
||||
|
||||
} else if operator.Type == "add" {
|
||||
if strings.HasPrefix(operator.Value, "EXPR(") && strings.HasSuffix(operator.Value, ")") {
|
||||
expression := strings.TrimSuffix(strings.TrimPrefix(operator.Value, "EXPR("), ")")
|
||||
fieldsNotNilCheck, err := fieldsReferencedInExprNotNilCheck(expression)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"could'nt generate nil check for fields referenced in value expr of add operator %s: %w",
|
||||
operator.Name, err,
|
||||
)
|
||||
}
|
||||
if fieldsNotNilCheck != "" {
|
||||
operator.If = fieldsNotNilCheck
|
||||
}
|
||||
}
|
||||
|
||||
} else if operator.Type == "move" || operator.Type == "copy" {
|
||||
fromParts := strings.Split(operator.From, ".")
|
||||
fromPath := strings.Join(fromParts, "?.")
|
||||
operator.If = fmt.Sprintf(`%s != nil`, fromPath)
|
||||
fromNotNilCheck, err := fieldNotNilCheck(operator.From)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for From field of %s op %s: %w", operator.Type, operator.Name, err,
|
||||
)
|
||||
}
|
||||
operator.If = fromNotNilCheck
|
||||
|
||||
} else if operator.Type == "remove" {
|
||||
fieldParts := strings.Split(operator.Field, ".")
|
||||
fieldPath := strings.Join(fieldParts, "?.")
|
||||
operator.If = fmt.Sprintf(`%s != nil`, fieldPath)
|
||||
fieldNotNilCheck, err := fieldNotNilCheck(operator.Field)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for field to be removed by op %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
operator.If = fieldNotNilCheck
|
||||
|
||||
} else if operator.Type == "trace_parser" {
|
||||
cleanTraceParser(&operator)
|
||||
|
||||
} else if operator.Type == "time_parser" {
|
||||
parseFromParts := strings.Split(operator.ParseFrom, ".")
|
||||
parseFromPath := strings.Join(parseFromParts, "?.")
|
||||
|
||||
operator.If = fmt.Sprintf(`%s != nil`, parseFromPath)
|
||||
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for parseFrom of time parser op %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
operator.If = parseFromNotNilCheck
|
||||
|
||||
if operator.LayoutType == "strptime" {
|
||||
regex, err := RegexForStrptimeLayout(operator.Layout)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate time_parser processor: %w", err)
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate layout regex for time_parser %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
|
||||
operator.If = fmt.Sprintf(
|
||||
`%s && %s matches "%s"`, operator.If, parseFromPath, regex,
|
||||
`%s && %s matches "%s"`, operator.If, operator.ParseFrom, regex,
|
||||
)
|
||||
} else if operator.LayoutType == "epoch" {
|
||||
valueRegex := `^\\s*[0-9]+\\s*$`
|
||||
@ -133,19 +175,22 @@ func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) {
|
||||
}
|
||||
|
||||
operator.If = fmt.Sprintf(
|
||||
`%s && string(%s) matches "%s"`, operator.If, parseFromPath, valueRegex,
|
||||
`%s && string(%s) matches "%s"`, operator.If, operator.ParseFrom, valueRegex,
|
||||
)
|
||||
|
||||
}
|
||||
// TODO(Raj): Maybe add support for gotime too eventually
|
||||
|
||||
} else if operator.Type == "severity_parser" {
|
||||
parseFromParts := strings.Split(operator.ParseFrom, ".")
|
||||
parseFromPath := strings.Join(parseFromParts, "?.")
|
||||
|
||||
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't generate nil check for parseFrom of severity parser %s: %w", operator.Name, err,
|
||||
)
|
||||
}
|
||||
operator.If = fmt.Sprintf(
|
||||
`%s != nil && ( type(%s) == "string" || ( type(%s) in ["int", "float"] && %s == float(int(%s)) ) )`,
|
||||
parseFromPath, parseFromPath, parseFromPath, parseFromPath, parseFromPath,
|
||||
`%s && ( type(%s) == "string" || ( type(%s) in ["int", "float"] && %s == float(int(%s)) ) )`,
|
||||
parseFromNotNilCheck, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom,
|
||||
)
|
||||
|
||||
}
|
||||
@ -169,3 +214,151 @@ func cleanTraceParser(operator *PipelineOperator) {
|
||||
operator.TraceFlags = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Generates an expression checking that `fieldPath` has a non-nil value in a log record.
|
||||
func fieldNotNilCheck(fieldPath string) (string, error) {
|
||||
_, err := expr.Compile(fieldPath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("invalid fieldPath %s: %w", fieldPath, err)
|
||||
}
|
||||
|
||||
// helper for turning `.` into `?.` in field paths.
|
||||
// Eg: a.b?.c.d -> a?.b?.c?.d
|
||||
optionalChainedPath := func(path string) string {
|
||||
return strings.ReplaceAll(
|
||||
strings.ReplaceAll(path, "?.", "."), ".", "?.",
|
||||
)
|
||||
}
|
||||
|
||||
// Optional chaining before membership ops is not supported by expr.
|
||||
// Eg: The field `attributes.test["a.b"].value["c.d"].e` can't be checked using
|
||||
// the nil check `attributes.test?.["a.b"]?.value?.["c.d"]?.e != nil`
|
||||
// This needs to be worked around by checking that the target of membership op is not nil first.
|
||||
// Eg: attributes.test != nil && attributes.test["a.b"]?.value != nil && attributes.test["a.b"].value["c.d"]?.e != nil
|
||||
|
||||
// Split once from the right to include the rightmost membership op and everything after it.
|
||||
// Eg: `attributes.test["a.b"].value["c.d"].e` would result in `attributes.test["a.b"].value` and `["c.d"].e`
|
||||
parts := rSplitAfterN(fieldPath, "[", 2)
|
||||
if len(parts) < 2 {
|
||||
// there is no [] access in fieldPath
|
||||
return fmt.Sprintf("%s != nil", optionalChainedPath(fieldPath)), nil
|
||||
}
|
||||
|
||||
// recursively generate nil check for target of the rightmost membership op (attributes.test["a.b"].value)
|
||||
// should come out to be (attributes.test != nil && attributes.test["a.b"]?.value != nil)
|
||||
collectionNotNilCheck, err := fieldNotNilCheck(parts[0])
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("couldn't generate nil check for %s: %w", parts[0], err)
|
||||
}
|
||||
|
||||
// generate nil check for entire path.
|
||||
suffixParts := strings.SplitAfter(parts[1], "]") // ["c.d"], ".e"
|
||||
fullPath := parts[0] + suffixParts[0]
|
||||
if len(suffixParts) > 1 {
|
||||
// attributes.test["a.b"].value["c.d"]?.e
|
||||
fullPath += optionalChainedPath(suffixParts[1])
|
||||
}
|
||||
fullPathCheck := fmt.Sprintf("%s != nil", fullPath)
|
||||
|
||||
// If the membership op is for array/slice indexing, add check ensuring array is long enough
|
||||
// attributes.test[3] -> len(attributes.test) > 3 && attributes.test[3] != nil
|
||||
if !(strings.Contains(suffixParts[0], "'") || strings.Contains(suffixParts[0], `"`)) {
|
||||
fullPathCheck = fmt.Sprintf(
|
||||
"len(%s) > %s && %s",
|
||||
parts[0], suffixParts[0][1:len(suffixParts[0])-1], fullPathCheck,
|
||||
)
|
||||
}
|
||||
|
||||
// If prefix is `attributes` or `resource` there is no need to add a nil check for
|
||||
// the prefix since all log records have non nil `attributes` and `resource` fields.
|
||||
if slices.Contains([]string{"attributes", "resource"}, parts[0]) {
|
||||
return fullPathCheck, nil
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s && %s", collectionNotNilCheck, fullPathCheck), nil
|
||||
}
|
||||
|
||||
// Split `str` after `sep` from the right to create up to `n` parts.
|
||||
// rSplitAfterN("a.b.c.d", ".", 3) -> ["a.b", ".c", ".d"]
|
||||
func rSplitAfterN(str string, sep string, n int) []string {
|
||||
reversedStr := reverseString(str)
|
||||
parts := strings.SplitAfterN(reversedStr, sep, n)
|
||||
slices.Reverse(parts)
|
||||
result := []string{}
|
||||
for _, p := range parts {
|
||||
result = append(result, reverseString(p))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func reverseString(s string) string {
|
||||
r := []rune(s)
|
||||
for i := 0; i < len(r)/2; i++ {
|
||||
j := len(s) - 1 - i
|
||||
r[i], r[j] = r[j], r[i]
|
||||
}
|
||||
return string(r)
|
||||
}
|
||||
|
||||
// Generate expression for checking that all fields referenced in `expr` have a non nil value in log record.
|
||||
// Eg: `attributes.x + len(resource.y)` will return the expression `attributes.x != nil && resource.y != nil`
|
||||
func fieldsReferencedInExprNotNilCheck(expr string) (string, error) {
|
||||
referencedFields, err := logFieldsReferencedInExpr(expr)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("couldn't extract log fields referenced in expr %s: %w", expr, err)
|
||||
}
|
||||
|
||||
// Generating nil check for deepest fields takes care of their prefixes too.
|
||||
// Eg: `attributes.test.value + len(attributes.test)` needs a nil check only for `attributes.test.value`
|
||||
deepestFieldRefs := []string{}
|
||||
for _, field := range referencedFields {
|
||||
isPrefixOfAnotherReferencedField := slices.ContainsFunc(
|
||||
referencedFields, func(e string) bool {
|
||||
return len(e) > len(field) && strings.HasPrefix(e, field)
|
||||
},
|
||||
)
|
||||
if !isPrefixOfAnotherReferencedField {
|
||||
deepestFieldRefs = append(deepestFieldRefs, field)
|
||||
}
|
||||
}
|
||||
|
||||
fieldExprChecks := []string{}
|
||||
for _, field := range deepestFieldRefs {
|
||||
checkExpr, err := fieldNotNilCheck(field)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("could not create nil check for %s: %w", field, err)
|
||||
}
|
||||
fieldExprChecks = append(fieldExprChecks, fmt.Sprintf("(%s)", checkExpr))
|
||||
}
|
||||
|
||||
return strings.Join(fieldExprChecks, " && "), nil
|
||||
}
|
||||
|
||||
// Expr AST visitor for extracting referenced log fields
|
||||
// See more at https://github.com/expr-lang/expr/blob/master/ast/visitor.go
|
||||
type logFieldsInExprExtractor struct {
|
||||
referencedFields []string
|
||||
}
|
||||
|
||||
func (v *logFieldsInExprExtractor) Visit(node *ast.Node) {
|
||||
if n, ok := (*node).(*ast.MemberNode); ok {
|
||||
memberRef := n.String()
|
||||
if strings.HasPrefix(memberRef, "attributes") || strings.HasPrefix(memberRef, "resource") {
|
||||
v.referencedFields = append(v.referencedFields, memberRef)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func logFieldsReferencedInExpr(expr string) ([]string, error) {
|
||||
// parse abstract syntax tree for expr
|
||||
exprAst, err := parser.Parse(expr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse expr: %w", err)
|
||||
}
|
||||
|
||||
// walk ast for expr to collect all member references.
|
||||
v := &logFieldsInExprExtractor{}
|
||||
ast.Walk(&exprAst.Node, v)
|
||||
|
||||
return v.referencedFields, nil
|
||||
}
|
||||
|
@ -608,6 +608,104 @@ func TestAttributePathsContainingDollarDoNotBreakCollector(t *testing.T) {
|
||||
require.Equal("test", result[0].Attributes_string["$test1"])
|
||||
}
|
||||
|
||||
func TestMembershipOpInProcessorFieldExpressions(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
testLogs := []model.SignozLog{
|
||||
makeTestSignozLog("test log", map[string]interface{}{
|
||||
"http.method": "GET",
|
||||
"order.products": `{"ids": ["pid0", "pid1"]}`,
|
||||
}),
|
||||
}
|
||||
|
||||
testPipeline := Pipeline{
|
||||
OrderId: 1,
|
||||
Name: "pipeline1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "http.method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
},
|
||||
Operator: "=",
|
||||
Value: "GET",
|
||||
},
|
||||
},
|
||||
},
|
||||
Config: []PipelineOperator{
|
||||
{
|
||||
ID: "move",
|
||||
Type: "move",
|
||||
Enabled: true,
|
||||
Name: "move",
|
||||
From: `attributes["http.method"]`,
|
||||
To: `attributes["test.http.method"]`,
|
||||
}, {
|
||||
ID: "json",
|
||||
Type: "json_parser",
|
||||
Enabled: true,
|
||||
Name: "json",
|
||||
ParseFrom: `attributes["order.products"]`,
|
||||
ParseTo: `attributes["order.products"]`,
|
||||
}, {
|
||||
ID: "move1",
|
||||
Type: "move",
|
||||
Enabled: true,
|
||||
Name: "move1",
|
||||
From: `attributes["order.products"].ids`,
|
||||
To: `attributes["order.product_ids"]`,
|
||||
}, {
|
||||
ID: "move2",
|
||||
Type: "move",
|
||||
Enabled: true,
|
||||
Name: "move2",
|
||||
From: `attributes.test?.doesnt_exist`,
|
||||
To: `attributes["test.doesnt_exist"]`,
|
||||
}, {
|
||||
ID: "add",
|
||||
Type: "add",
|
||||
Enabled: true,
|
||||
Name: "add",
|
||||
Field: `attributes["order.pids"].missing_field`,
|
||||
Value: `EXPR(attributes.a["b.c"].d[4].e + resource.f)`,
|
||||
}, {
|
||||
ID: "add2",
|
||||
Type: "add",
|
||||
Enabled: true,
|
||||
Name: "add2",
|
||||
Field: `attributes["order.pids.pid0"]`,
|
||||
Value: `EXPR(attributes["order.product_ids"][0])`,
|
||||
}, {
|
||||
ID: "add3",
|
||||
Type: "add",
|
||||
Enabled: true,
|
||||
Name: "add3",
|
||||
Field: `attributes["attrs.test.value"]`,
|
||||
Value: `EXPR(attributes.test?.value)`,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing(
|
||||
context.Background(),
|
||||
[]Pipeline{testPipeline},
|
||||
testLogs,
|
||||
)
|
||||
require.Nil(err)
|
||||
require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n"))
|
||||
require.Equal(1, len(result))
|
||||
|
||||
_, methodAttrExists := result[0].Attributes_string["http.method"]
|
||||
require.False(methodAttrExists)
|
||||
require.Equal("GET", result[0].Attributes_string["test.http.method"])
|
||||
require.Equal("pid0", result[0].Attributes_string["order.pids.pid0"])
|
||||
}
|
||||
|
||||
func TestTemporaryWorkaroundForSupportingAttribsContainingDots(t *testing.T) {
|
||||
// TODO(Raj): Remove this after dots are supported
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user