Fix: Query Service: get trace parser working in log parsing pipelines (#3820)

* chore: add test for ensuring pipeline previews work for trace parser processors

* chore: updates to trace parser validation in postable pipelines

* chore: extract auth.randomHex into utils.RandomHex for reuse

* chore: get trace parser preview test passing

* chore: start with JSON serialized trace parser in test to cover deserialization

* chore: address PR feedback
This commit is contained in:
Raj Kamal Singh 2023-10-29 16:58:31 +05:30 committed by GitHub
parent fc49833c9f
commit 79aef73767
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 169 additions and 31 deletions

View File

@ -48,20 +48,20 @@ type PipelineOperator struct {
Name string `json:"name,omitempty" yaml:"-"` Name string `json:"name,omitempty" yaml:"-"`
// optional keys depending on the type // optional keys depending on the type
ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"` ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"`
Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"` Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"`
Regex string `json:"regex,omitempty" yaml:"regex,omitempty"` Regex string `json:"regex,omitempty" yaml:"regex,omitempty"`
ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"` ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"`
Timestamp *TimestampParser `json:"timestamp,omitempty" yaml:"timestamp,omitempty"` Timestamp *TimestampParser `json:"timestamp,omitempty" yaml:"timestamp,omitempty"`
TraceParser *TraceParser `json:"trace_parser,omitempty" yaml:"trace_parser,omitempty"` *TraceParser `yaml:",inline,omitempty"`
Field string `json:"field,omitempty" yaml:"field,omitempty"` Field string `json:"field,omitempty" yaml:"field,omitempty"`
Value string `json:"value,omitempty" yaml:"value,omitempty"` Value string `json:"value,omitempty" yaml:"value,omitempty"`
From string `json:"from,omitempty" yaml:"from,omitempty"` From string `json:"from,omitempty" yaml:"from,omitempty"`
To string `json:"to,omitempty" yaml:"to,omitempty"` To string `json:"to,omitempty" yaml:"to,omitempty"`
Expr string `json:"expr,omitempty" yaml:"expr,omitempty"` Expr string `json:"expr,omitempty" yaml:"expr,omitempty"`
Routes *[]Route `json:"routes,omitempty" yaml:"routes,omitempty"` Routes *[]Route `json:"routes,omitempty" yaml:"routes,omitempty"`
Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"` Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"`
Default string `json:"default,omitempty" yaml:"default,omitempty"` Default string `json:"default,omitempty" yaml:"default,omitempty"`
} }
type TimestampParser struct { type TimestampParser struct {

View File

@ -137,20 +137,35 @@ func isValidOperator(op PipelineOperator) error {
if op.Field == "" { if op.Field == "" {
return fmt.Errorf(fmt.Sprintf("field of %s remove operator cannot be empty", op.ID)) return fmt.Errorf(fmt.Sprintf("field of %s remove operator cannot be empty", op.ID))
} }
case "traceParser": case "trace_parser":
if op.TraceParser == nil { if op.TraceParser == nil {
return fmt.Errorf(fmt.Sprintf("field of %s remove operator cannot be empty", op.ID)) return fmt.Errorf(fmt.Sprintf("field of %s remove operator cannot be empty", op.ID))
} }
if op.TraceParser.SpanId.ParseFrom == "" && op.TraceParser.TraceId.ParseFrom == "" && op.TraceParser.TraceFlags.ParseFrom == "" { hasTraceIdParseFrom := (op.TraceParser.TraceId != nil && op.TraceParser.TraceId.ParseFrom != "")
return fmt.Errorf(fmt.Sprintf("one of trace_id,span_id,parse_from of %s traceParser operator must be present", op.ID)) hasSpanIdParseFrom := (op.TraceParser.SpanId != nil && op.TraceParser.SpanId.ParseFrom != "")
hasTraceFlagsParseFrom := (op.TraceParser.TraceFlags != nil && op.TraceParser.TraceFlags.ParseFrom != "")
if !(hasTraceIdParseFrom || hasSpanIdParseFrom || hasTraceFlagsParseFrom) {
return fmt.Errorf(fmt.Sprintf("one of trace_id, span_id, trace_flags of %s trace_parser operator must be present", op.ID))
} }
if hasTraceIdParseFrom && !isValidOtelValue(op.TraceParser.TraceId.ParseFrom) {
return fmt.Errorf("trace id can't be parsed from %s", op.TraceParser.TraceId.ParseFrom)
}
if hasSpanIdParseFrom && !isValidOtelValue(op.TraceParser.SpanId.ParseFrom) {
return fmt.Errorf("span id can't be parsed from %s", op.TraceParser.SpanId.ParseFrom)
}
if hasTraceFlagsParseFrom && !isValidOtelValue(op.TraceParser.TraceFlags.ParseFrom) {
return fmt.Errorf("trace flags can't be parsed from %s", op.TraceParser.TraceFlags.ParseFrom)
}
case "retain": case "retain":
if len(op.Fields) == 0 { if len(op.Fields) == 0 {
return fmt.Errorf(fmt.Sprintf("fields of %s retain operator cannot be empty", op.ID)) return fmt.Errorf(fmt.Sprintf("fields of %s retain operator cannot be empty", op.ID))
} }
default: default:
return fmt.Errorf(fmt.Sprintf("operator type %s not supported for %s, use one of (grok_parser, regex_parser, copy, move, add, remove, traceParser, retain)", op.Type, op.ID)) return fmt.Errorf(fmt.Sprintf("operator type %s not supported for %s, use one of (grok_parser, regex_parser, copy, move, add, remove, trace_parser, retain)", op.Type, op.ID))
} }
if !isValidOtelValue(op.ParseFrom) || if !isValidOtelValue(op.ParseFrom) ||

View File

@ -250,6 +250,31 @@ var operatorTest = []struct {
ParseTo: "attributes", ParseTo: "attributes",
}, },
IsValid: false, IsValid: false,
}, {
Name: "Trace Parser - invalid - no trace_parser spec",
Operator: PipelineOperator{
ID: "trace",
Type: "trace_parser",
},
IsValid: false,
}, {
Name: "Trace Parser - invalid - no ParseFrom specified",
Operator: PipelineOperator{
ID: "trace",
Type: "trace_parser",
TraceParser: &TraceParser{},
},
IsValid: false,
}, {
Name: "Trace Parser - invalid - bad parsefrom attribute",
Operator: PipelineOperator{
ID: "trace",
Type: "trace_parser",
TraceParser: &TraceParser{
TraceId: &ParseFrom{ParseFrom: "trace_id"},
},
},
IsValid: false,
}, },
} }

View File

@ -2,6 +2,8 @@ package logparsingpipeline
import ( import (
"context" "context"
"encoding/json"
"strconv"
"testing" "testing"
"time" "time"
@ -10,6 +12,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
) )
func TestPipelinePreview(t *testing.T) { func TestPipelinePreview(t *testing.T) {
@ -202,6 +205,98 @@ func TestGrokParsingPreview(t *testing.T) {
require.Equal("route/server.go:71", processed.Attributes_string["location"]) require.Equal("route/server.go:71", processed.Attributes_string["location"])
} }
func TestTraceParsingPreview(t *testing.T) {
require := require.New(t)
testPipelines := []Pipeline{
{
OrderId: 1,
Name: "pipeline1",
Alias: "pipeline1",
Enabled: true,
Filter: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "method",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
},
},
},
Config: []PipelineOperator{},
},
}
// Start with JSON serialized trace parser to validate deserialization too
var traceParserOp PipelineOperator
err := json.Unmarshal([]byte(`
{
"orderId": 1,
"enabled": true,
"type": "trace_parser",
"name": "Test trace parser",
"id": "test-trace-parser",
"trace_id": {
"parse_from": "attributes.test_trace_id"
},
"span_id": {
"parse_from": "attributes.test_span_id"
},
"trace_flags": {
"parse_from": "attributes.test_trace_flags"
}
}
`), &traceParserOp)
require.Nil(err)
testPipelines[0].Config = append(testPipelines[0].Config, traceParserOp)
testTraceId, err := utils.RandomHex(16)
require.Nil(err)
testSpanId, err := utils.RandomHex(8)
require.Nil(err)
testTraceFlags, err := utils.RandomHex(1)
require.Nil(err)
testLog := model.SignozLog{
Timestamp: uint64(time.Now().UnixNano()),
Body: "test log",
Attributes_string: map[string]string{
"method": "GET",
"test_trace_id": testTraceId,
"test_span_id": testSpanId,
"test_trace_flags": testTraceFlags,
},
SpanID: "",
TraceID: "",
TraceFlags: 0,
}
result, err := SimulatePipelinesProcessing(
context.Background(),
testPipelines,
[]model.SignozLog{
testLog,
},
)
require.Nil(err)
require.Equal(1, len(result))
processed := result[0]
require.Equal(testTraceId, processed.TraceID)
require.Equal(testSpanId, processed.SpanID)
expectedTraceFlags, err := strconv.ParseUint(testTraceFlags, 16, 16)
require.Nil(err)
require.Equal(uint32(expectedTraceFlags), processed.TraceFlags)
}
func makeTestLogEntry( func makeTestLogEntry(
body string, body string,
attributes map[string]string, attributes map[string]string,

View File

@ -31,7 +31,7 @@ var (
func Invite(ctx context.Context, req *model.InviteRequest) (*model.InviteResponse, error) { func Invite(ctx context.Context, req *model.InviteRequest) (*model.InviteResponse, error) {
zap.S().Debugf("Got an invite request for email: %s\n", req.Email) zap.S().Debugf("Got an invite request for email: %s\n", req.Email)
token, err := randomHex(opaqueTokenSize) token, err := utils.RandomHex(opaqueTokenSize)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to generate invite token") return nil, errors.Wrap(err, "failed to generate invite token")
} }
@ -140,7 +140,7 @@ func ValidateInvite(ctx context.Context, req *RegisterRequest) (*model.Invitatio
} }
func CreateResetPasswordToken(ctx context.Context, userId string) (*model.ResetPasswordEntry, error) { func CreateResetPasswordToken(ctx context.Context, userId string) (*model.ResetPasswordEntry, error) {
token, err := randomHex(opaqueTokenSize) token, err := utils.RandomHex(opaqueTokenSize)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to generate reset password token") return nil, errors.Wrap(err, "failed to generate reset password token")
} }

View File

@ -1,9 +1,6 @@
package auth package auth
import ( import (
"crypto/rand"
"encoding/hex"
"github.com/pkg/errors" "github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model"
@ -18,14 +15,6 @@ var (
ErrorAskAdmin = errors.New("An invitation is needed to create an account. Please ask your admin (the person who has first installed SIgNoz) to send an invite.") ErrorAskAdmin = errors.New("An invitation is needed to create an account. Please ask your admin (the person who has first installed SIgNoz) to send an invite.")
) )
func randomHex(sz int) (string, error) {
bytes := make([]byte, sz)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}
func isValidRole(role string) bool { func isValidRole(role string) bool {
switch role { switch role {
case constants.AdminGroup, constants.EditorGroup, constants.ViewerGroup: case constants.AdminGroup, constants.EditorGroup, constants.ViewerGroup:

View File

@ -0,0 +1,14 @@
package utils
import (
"crypto/rand"
"encoding/hex"
)
func RandomHex(sz int) (string, error) {
bytes := make([]byte, sz)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}