diff --git a/pkg/query-service/app/logparsingpipeline/model.go b/pkg/query-service/app/logparsingpipeline/model.go index 21493115bd..eb0a9c66d1 100644 --- a/pkg/query-service/app/logparsingpipeline/model.go +++ b/pkg/query-service/app/logparsingpipeline/model.go @@ -48,20 +48,20 @@ type PipelineOperator struct { Name string `json:"name,omitempty" yaml:"-"` // optional keys depending on the type - ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"` - Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"` - Regex string `json:"regex,omitempty" yaml:"regex,omitempty"` - ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"` - Timestamp *TimestampParser `json:"timestamp,omitempty" yaml:"timestamp,omitempty"` - TraceParser *TraceParser `json:"trace_parser,omitempty" yaml:"trace_parser,omitempty"` - Field string `json:"field,omitempty" yaml:"field,omitempty"` - Value string `json:"value,omitempty" yaml:"value,omitempty"` - From string `json:"from,omitempty" yaml:"from,omitempty"` - To string `json:"to,omitempty" yaml:"to,omitempty"` - Expr string `json:"expr,omitempty" yaml:"expr,omitempty"` - Routes *[]Route `json:"routes,omitempty" yaml:"routes,omitempty"` - Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"` - Default string `json:"default,omitempty" yaml:"default,omitempty"` + ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"` + Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"` + Regex string `json:"regex,omitempty" yaml:"regex,omitempty"` + ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"` + Timestamp *TimestampParser `json:"timestamp,omitempty" yaml:"timestamp,omitempty"` + *TraceParser `yaml:",inline,omitempty"` + Field string `json:"field,omitempty" yaml:"field,omitempty"` + Value string `json:"value,omitempty" yaml:"value,omitempty"` + From string `json:"from,omitempty" yaml:"from,omitempty"` + To string `json:"to,omitempty" yaml:"to,omitempty"` + Expr string `json:"expr,omitempty" yaml:"expr,omitempty"` + Routes *[]Route `json:"routes,omitempty" yaml:"routes,omitempty"` + Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"` + Default string `json:"default,omitempty" yaml:"default,omitempty"` } type TimestampParser struct { diff --git a/pkg/query-service/app/logparsingpipeline/postablePipeline.go b/pkg/query-service/app/logparsingpipeline/postablePipeline.go index d11fb5b952..472303b527 100644 --- a/pkg/query-service/app/logparsingpipeline/postablePipeline.go +++ b/pkg/query-service/app/logparsingpipeline/postablePipeline.go @@ -137,20 +137,35 @@ func isValidOperator(op PipelineOperator) error { if op.Field == "" { return fmt.Errorf(fmt.Sprintf("field of %s remove operator cannot be empty", op.ID)) } - case "traceParser": + case "trace_parser": if op.TraceParser == nil { return fmt.Errorf(fmt.Sprintf("field of %s remove operator cannot be empty", op.ID)) } - if op.TraceParser.SpanId.ParseFrom == "" && op.TraceParser.TraceId.ParseFrom == "" && op.TraceParser.TraceFlags.ParseFrom == "" { - return fmt.Errorf(fmt.Sprintf("one of trace_id,span_id,parse_from of %s traceParser operator must be present", op.ID)) + hasTraceIdParseFrom := (op.TraceParser.TraceId != nil && op.TraceParser.TraceId.ParseFrom != "") + hasSpanIdParseFrom := (op.TraceParser.SpanId != nil && op.TraceParser.SpanId.ParseFrom != "") + hasTraceFlagsParseFrom := (op.TraceParser.TraceFlags != nil && op.TraceParser.TraceFlags.ParseFrom != "") + + if !(hasTraceIdParseFrom || hasSpanIdParseFrom || hasTraceFlagsParseFrom) { + return fmt.Errorf(fmt.Sprintf("one of trace_id, span_id, trace_flags of %s trace_parser operator must be present", op.ID)) } + + if hasTraceIdParseFrom && !isValidOtelValue(op.TraceParser.TraceId.ParseFrom) { + return fmt.Errorf("trace id can't be parsed from %s", op.TraceParser.TraceId.ParseFrom) + } + if hasSpanIdParseFrom && !isValidOtelValue(op.TraceParser.SpanId.ParseFrom) { + return fmt.Errorf("span id can't be parsed from %s", op.TraceParser.SpanId.ParseFrom) + } + if hasTraceFlagsParseFrom && !isValidOtelValue(op.TraceParser.TraceFlags.ParseFrom) { + return fmt.Errorf("trace flags can't be parsed from %s", op.TraceParser.TraceFlags.ParseFrom) + } + case "retain": if len(op.Fields) == 0 { return fmt.Errorf(fmt.Sprintf("fields of %s retain operator cannot be empty", op.ID)) } default: - return fmt.Errorf(fmt.Sprintf("operator type %s not supported for %s, use one of (grok_parser, regex_parser, copy, move, add, remove, traceParser, retain)", op.Type, op.ID)) + return fmt.Errorf(fmt.Sprintf("operator type %s not supported for %s, use one of (grok_parser, regex_parser, copy, move, add, remove, trace_parser, retain)", op.Type, op.ID)) } if !isValidOtelValue(op.ParseFrom) || diff --git a/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go b/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go index dd2c61e748..d2f9ec9b09 100644 --- a/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go +++ b/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go @@ -250,6 +250,31 @@ var operatorTest = []struct { ParseTo: "attributes", }, IsValid: false, + }, { + Name: "Trace Parser - invalid - no trace_parser spec", + Operator: PipelineOperator{ + ID: "trace", + Type: "trace_parser", + }, + IsValid: false, + }, { + Name: "Trace Parser - invalid - no ParseFrom specified", + Operator: PipelineOperator{ + ID: "trace", + Type: "trace_parser", + TraceParser: &TraceParser{}, + }, + IsValid: false, + }, { + Name: "Trace Parser - invalid - bad parsefrom attribute", + Operator: PipelineOperator{ + ID: "trace", + Type: "trace_parser", + TraceParser: &TraceParser{ + TraceId: &ParseFrom{ParseFrom: "trace_id"}, + }, + }, + IsValid: false, }, } diff --git a/pkg/query-service/app/logparsingpipeline/preview_test.go b/pkg/query-service/app/logparsingpipeline/preview_test.go index b6cfb855f3..67f77a8e40 100644 --- a/pkg/query-service/app/logparsingpipeline/preview_test.go +++ b/pkg/query-service/app/logparsingpipeline/preview_test.go @@ -2,6 +2,8 @@ package logparsingpipeline import ( "context" + "encoding/json" + "strconv" "testing" "time" @@ -10,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" ) func TestPipelinePreview(t *testing.T) { @@ -202,6 +205,98 @@ func TestGrokParsingPreview(t *testing.T) { require.Equal("route/server.go:71", processed.Attributes_string["location"]) } +func TestTraceParsingPreview(t *testing.T) { + require := require.New(t) + + testPipelines := []Pipeline{ + { + OrderId: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + Filter: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "GET", + }, + }, + }, + Config: []PipelineOperator{}, + }, + } + + // Start with JSON serialized trace parser to validate deserialization too + var traceParserOp PipelineOperator + err := json.Unmarshal([]byte(` + { + "orderId": 1, + "enabled": true, + "type": "trace_parser", + "name": "Test trace parser", + "id": "test-trace-parser", + "trace_id": { + "parse_from": "attributes.test_trace_id" + }, + "span_id": { + "parse_from": "attributes.test_span_id" + }, + "trace_flags": { + "parse_from": "attributes.test_trace_flags" + } + } + `), &traceParserOp) + require.Nil(err) + testPipelines[0].Config = append(testPipelines[0].Config, traceParserOp) + + testTraceId, err := utils.RandomHex(16) + require.Nil(err) + + testSpanId, err := utils.RandomHex(8) + require.Nil(err) + + testTraceFlags, err := utils.RandomHex(1) + require.Nil(err) + + testLog := model.SignozLog{ + Timestamp: uint64(time.Now().UnixNano()), + Body: "test log", + Attributes_string: map[string]string{ + "method": "GET", + "test_trace_id": testTraceId, + "test_span_id": testSpanId, + "test_trace_flags": testTraceFlags, + }, + SpanID: "", + TraceID: "", + TraceFlags: 0, + } + + result, err := SimulatePipelinesProcessing( + context.Background(), + testPipelines, + []model.SignozLog{ + testLog, + }, + ) + require.Nil(err) + require.Equal(1, len(result)) + processed := result[0] + + require.Equal(testTraceId, processed.TraceID) + require.Equal(testSpanId, processed.SpanID) + + expectedTraceFlags, err := strconv.ParseUint(testTraceFlags, 16, 16) + require.Nil(err) + require.Equal(uint32(expectedTraceFlags), processed.TraceFlags) +} + func makeTestLogEntry( body string, attributes map[string]string, diff --git a/pkg/query-service/auth/auth.go b/pkg/query-service/auth/auth.go index 7f78fa3660..6190e87826 100644 --- a/pkg/query-service/auth/auth.go +++ b/pkg/query-service/auth/auth.go @@ -31,7 +31,7 @@ var ( func Invite(ctx context.Context, req *model.InviteRequest) (*model.InviteResponse, error) { zap.S().Debugf("Got an invite request for email: %s\n", req.Email) - token, err := randomHex(opaqueTokenSize) + token, err := utils.RandomHex(opaqueTokenSize) if err != nil { return nil, errors.Wrap(err, "failed to generate invite token") } @@ -140,7 +140,7 @@ func ValidateInvite(ctx context.Context, req *RegisterRequest) (*model.Invitatio } func CreateResetPasswordToken(ctx context.Context, userId string) (*model.ResetPasswordEntry, error) { - token, err := randomHex(opaqueTokenSize) + token, err := utils.RandomHex(opaqueTokenSize) if err != nil { return nil, errors.Wrap(err, "failed to generate reset password token") } diff --git a/pkg/query-service/auth/utils.go b/pkg/query-service/auth/utils.go index d76beca5c4..a6a639c710 100644 --- a/pkg/query-service/auth/utils.go +++ b/pkg/query-service/auth/utils.go @@ -1,9 +1,6 @@ package auth import ( - "crypto/rand" - "encoding/hex" - "github.com/pkg/errors" "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/model" @@ -18,14 +15,6 @@ var ( ErrorAskAdmin = errors.New("An invitation is needed to create an account. Please ask your admin (the person who has first installed SIgNoz) to send an invite.") ) -func randomHex(sz int) (string, error) { - bytes := make([]byte, sz) - if _, err := rand.Read(bytes); err != nil { - return "", err - } - return hex.EncodeToString(bytes), nil -} - func isValidRole(role string) bool { switch role { case constants.AdminGroup, constants.EditorGroup, constants.ViewerGroup: diff --git a/pkg/query-service/utils/random.go b/pkg/query-service/utils/random.go new file mode 100644 index 0000000000..10a3680e7a --- /dev/null +++ b/pkg/query-service/utils/random.go @@ -0,0 +1,14 @@ +package utils + +import ( + "crypto/rand" + "encoding/hex" +) + +func RandomHex(sz int) (string, error) { + bytes := make([]byte, sz) + if _, err := rand.Read(bytes); err != nil { + return "", err + } + return hex.EncodeToString(bytes), nil +}