From 626da7533ef693893a787ae20c4bfc1cb42d3ba4 Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+rkssisodiya@users.noreply.github.com> Date: Wed, 1 Nov 2023 22:12:35 +0530 Subject: [PATCH] Fix: log pipelines: generated operators should have appropriate if condition to avoid spamming collector logs (#3870) * chore: experiment with using a tmp file for getting collector logs in simulator * chore: collector simulator: cleaned up tmp file based collector logs capture * chore: add test validating regex proc doesn't error for logs that dont match * chore: return collector error logs from pipeline preview API * chore: add test validating regex processor doesn't log errors for mismatched logs * chore: add if condition for generated regex processors * chore: add test case validating json parser ignore non json logs * chore: add if condition for operator generated for json parser * chore: add test case validating move processor ignores logs with missing field * chore: add if condition for operator generated for move * chore: add test case validating copy processor ignores logs with missing field * chore: add if condition for operator generated for copy * chore: add test case validating remove processor ignores logs with missing field * chore: add if condition for operator generated for remove * chore: log pipelines: ensuring json parser ignores log if json field is missing * chore: log pipelines: ensure regex parser ignores log if field is missing --- .../app/logparsingpipeline/controller.go | 2 +- .../app/logparsingpipeline/model.go | 1 + .../app/logparsingpipeline/pipelineBuilder.go | 33 +++- .../pipelineBuilder_test.go | 166 ++++++++++++++++++ .../app/logparsingpipeline/preview.go | 16 +- .../app/logparsingpipeline/preview_test.go | 12 +- .../collectorsimulator/collectorsimulator.go | 78 +++++--- pkg/query-service/collectorsimulator/logs.go | 9 +- 8 files changed, 271 insertions(+), 46 deletions(-) diff --git a/pkg/query-service/app/logparsingpipeline/controller.go b/pkg/query-service/app/logparsingpipeline/controller.go index 72b6c6b76e..c4b1b0e3ff 100644 --- a/pkg/query-service/app/logparsingpipeline/controller.go +++ b/pkg/query-service/app/logparsingpipeline/controller.go @@ -140,7 +140,7 @@ func (ic *LogParsingPipelineController) PreviewLogsPipelines( ctx context.Context, request *PipelinesPreviewRequest, ) (*PipelinesPreviewResponse, *model.ApiError) { - result, err := SimulatePipelinesProcessing( + result, _, err := SimulatePipelinesProcessing( ctx, request.Pipelines, request.Logs, ) diff --git a/pkg/query-service/app/logparsingpipeline/model.go b/pkg/query-service/app/logparsingpipeline/model.go index eb0a9c66d1..0c4da3df37 100644 --- a/pkg/query-service/app/logparsingpipeline/model.go +++ b/pkg/query-service/app/logparsingpipeline/model.go @@ -41,6 +41,7 @@ type PipelineOperator struct { ID string `json:"id,omitempty" yaml:"id,omitempty"` Output string `json:"output,omitempty" yaml:"output,omitempty"` OnError string `json:"on_error,omitempty" yaml:"on_error,omitempty"` + If string `json:"if,omitempty" yaml:"if,omitempty"` // don't need the following in the final config OrderId int `json:"orderId" yaml:"-"` diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go index f5beaddd8e..fa9e095de3 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go @@ -1,6 +1,9 @@ package logparsingpipeline import ( + "fmt" + "strings" + "github.com/pkg/errors" "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr" @@ -73,7 +76,35 @@ func getOperators(ops []PipelineOperator) []PipelineOperator { filteredOp[len(filteredOp)-1].Output = operator.ID } - if operator.Type == "trace_parser" { + if operator.Type == "regex_parser" { + parseFromParts := strings.Split(operator.ParseFrom, ".") + parseFromPath := strings.Join(parseFromParts, "?.") + operator.If = fmt.Sprintf( + `%s != nil && %s matches "%s"`, + parseFromPath, + parseFromPath, + strings.ReplaceAll( + strings.ReplaceAll(operator.Regex, `\`, `\\`), + `"`, `\"`, + ), + ) + + } else if operator.Type == "json_parser" { + parseFromParts := strings.Split(operator.ParseFrom, ".") + parseFromPath := strings.Join(parseFromParts, "?.") + operator.If = fmt.Sprintf(`%s != nil && %s matches "^\\s*{.*}\\s*$"`, parseFromPath, parseFromPath) + + } else if operator.Type == "move" || operator.Type == "copy" { + fromParts := strings.Split(operator.From, ".") + fromPath := strings.Join(fromParts, "?.") + operator.If = fmt.Sprintf(`%s != nil`, fromPath) + + } else if operator.Type == "remove" { + fieldParts := strings.Split(operator.Field, ".") + fieldPath := strings.Join(fieldParts, "?.") + operator.If = fmt.Sprintf(`%s != nil`, fieldPath) + + } else if operator.Type == "trace_parser" { cleanTraceParser(&operator) } diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go index 3dc0ff7cf1..34afddccce 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go @@ -1,9 +1,17 @@ package logparsingpipeline import ( + "context" + "strings" "testing" + "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" . "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/require" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" ) var prepareProcessorTestData = []struct { @@ -195,3 +203,161 @@ func TestPreparePipelineProcessor(t *testing.T) { }) } } + +func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { + require := require.New(t) + + testPipelineFilter := &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "GET", + }, + }, + } + makeTestPipeline := func(config []PipelineOperator) Pipeline { + return Pipeline{ + OrderId: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + Filter: testPipelineFilter, + Config: config, + } + } + + makeTestLog := func( + body string, + attributes map[string]string, + ) model.SignozLog { + attributes["method"] = "GET" + + testTraceId, err := utils.RandomHex(16) + require.Nil(err) + + testSpanId, err := utils.RandomHex(8) + require.Nil(err) + + return model.SignozLog{ + Timestamp: uint64(time.Now().UnixNano()), + Body: body, + Attributes_string: attributes, + Resources_string: attributes, + SeverityText: entry.Info.String(), + SeverityNumber: uint8(entry.Info), + SpanID: testSpanId, + TraceID: testTraceId, + } + } + + testCases := []struct { + Name string + Operator PipelineOperator + NonMatchingLog model.SignozLog + }{ + { + "regex processor should ignore log with missing field", + PipelineOperator{ + ID: "regex", + Type: "regex_parser", + Enabled: true, + Name: "regex parser", + ParseFrom: "attributes.test_regex_target", + ParseTo: "attributes", + Regex: `^\s*(?P{.*})\s*$`, + }, + makeTestLog("mismatching log", map[string]string{}), + }, { + "regex processor should ignore non-matching log", + PipelineOperator{ + ID: "regex", + Type: "regex_parser", + Enabled: true, + Name: "regex parser", + ParseFrom: "body", + ParseTo: "attributes", + Regex: `^\s*(?P{.*})\s*$`, + }, + makeTestLog("mismatching log", map[string]string{}), + }, { + "json parser should ignore logs with missing field.", + PipelineOperator{ + ID: "json", + Type: "json_parser", + Enabled: true, + Name: "json parser", + ParseFrom: "attributes.test_json", + ParseTo: "attributes", + }, + makeTestLog("mismatching log", map[string]string{}), + }, + { + "json parser should ignore log with non JSON target field value", + PipelineOperator{ + ID: "json", + Type: "json_parser", + Enabled: true, + Name: "json parser", + ParseFrom: "attributes.test_json", + ParseTo: "attributes", + }, + makeTestLog("mismatching log", map[string]string{ + "test_json": "bad json", + }), + }, { + "move parser should ignore non matching logs", + PipelineOperator{ + ID: "move", + Type: "move", + Enabled: true, + Name: "move", + From: "attributes.test1", + To: "attributes.test2", + }, + makeTestLog("mismatching log", map[string]string{}), + }, { + "copy parser should ignore non matching logs", + PipelineOperator{ + ID: "copy", + Type: "copy", + Enabled: true, + Name: "copy", + From: "attributes.test1", + To: "attributes.test2", + }, + makeTestLog("mismatching log", map[string]string{}), + }, { + "remove parser should ignore non matching logs", + PipelineOperator{ + ID: "remove", + Type: "remove", + Enabled: true, + Name: "remove", + Field: "attributes.test", + }, + makeTestLog("mismatching log", map[string]string{}), + }, + // TODO(Raj): see if there is an error scenario for grok parser. + // TODO(Raj): see if there is an error scenario for trace parser. + // TODO(Raj): see if there is an error scenario for Add operator. + } + + for _, testCase := range testCases { + testPipelines := []Pipeline{makeTestPipeline([]PipelineOperator{testCase.Operator})} + + result, collectorErrorLogs, err := SimulatePipelinesProcessing( + context.Background(), + testPipelines, + []model.SignozLog{testCase.NonMatchingLog}, + ) + require.Nil(err) + require.Equal(0, len(collectorErrorLogs), strings.Join(collectorErrorLogs, "\n")) + require.Equal(1, len(result)) + } +} diff --git a/pkg/query-service/app/logparsingpipeline/preview.go b/pkg/query-service/app/logparsingpipeline/preview.go index f25a1d6922..c1863649e6 100644 --- a/pkg/query-service/app/logparsingpipeline/preview.go +++ b/pkg/query-service/app/logparsingpipeline/preview.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "sort" - "strings" "time" _ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok" @@ -23,11 +22,11 @@ func SimulatePipelinesProcessing( pipelines []Pipeline, logs []model.SignozLog, ) ( - []model.SignozLog, *model.ApiError, + output []model.SignozLog, collectorErrorLogs []string, apiErr *model.ApiError, ) { if len(pipelines) < 1 { - return logs, nil + return logs, nil, nil } // Collector simulation does not guarantee that logs will come @@ -46,7 +45,7 @@ func SimulatePipelinesProcessing( // Simulate processing of logs through an otel collector processorConfigs, err := collectorProcessorsForPipelines(pipelines) if err != nil { - return nil, model.BadRequest(errors.Wrap( + return nil, nil, model.BadRequest(errors.Wrap( err, "could not prepare otel processors for pipelines", )) } @@ -55,7 +54,7 @@ func SimulatePipelinesProcessing( logstransformprocessor.NewFactory(), ) if err != nil { - return nil, model.InternalError(errors.Wrap( + return nil, nil, model.InternalError(errors.Wrap( err, "could not construct processor factory map", )) } @@ -75,10 +74,9 @@ func SimulatePipelinesProcessing( simulatorInputPLogs, timeout, ) - collectorErrsText := strings.Join(collectorErrs, "\n") if apiErr != nil { - return nil, model.WrapApiError(apiErr, fmt.Sprintf( - "could not simulate log pipelines processing.\nCollector errors: %s\n", collectorErrsText, + return nil, collectorErrs, model.WrapApiError(apiErr, fmt.Sprintf( + "could not simulate log pipelines processing.\nCollector errors", )) } @@ -94,7 +92,7 @@ func SimulatePipelinesProcessing( delete(sigLog.Attributes_int64, inputOrderAttribute) } - return outputSignozLogs, nil + return outputSignozLogs, collectorErrs, nil } func collectorProcessorsForPipelines(pipelines []Pipeline) ( diff --git a/pkg/query-service/app/logparsingpipeline/preview_test.go b/pkg/query-service/app/logparsingpipeline/preview_test.go index be3816e08d..9911fd26c2 100644 --- a/pkg/query-service/app/logparsingpipeline/preview_test.go +++ b/pkg/query-service/app/logparsingpipeline/preview_test.go @@ -104,7 +104,7 @@ func TestPipelinePreview(t *testing.T) { }, ) - result, err := SimulatePipelinesProcessing( + result, collectorErrorLogs, err := SimulatePipelinesProcessing( context.Background(), testPipelines, []model.SignozLog{ @@ -114,6 +114,7 @@ func TestPipelinePreview(t *testing.T) { ) require.Nil(err) + require.Equal(0, len(collectorErrorLogs)) require.Equal(2, len(result)) // matching log should have been modified as expected. @@ -189,7 +190,7 @@ func TestGrokParsingPreview(t *testing.T) { "method": "GET", }, ) - result, err := SimulatePipelinesProcessing( + result, collectorErrorLogs, err := SimulatePipelinesProcessing( context.Background(), testPipelines, []model.SignozLog{ @@ -198,6 +199,7 @@ func TestGrokParsingPreview(t *testing.T) { ) require.Nil(err) + require.Equal(0, len(collectorErrorLogs)) require.Equal(1, len(result)) processed := result[0] @@ -278,7 +280,7 @@ func TestTraceParsingPreview(t *testing.T) { TraceFlags: 0, } - result, err := SimulatePipelinesProcessing( + result, collectorErrorLogs, err := SimulatePipelinesProcessing( context.Background(), testPipelines, []model.SignozLog{ @@ -287,6 +289,7 @@ func TestTraceParsingPreview(t *testing.T) { ) require.Nil(err) require.Equal(1, len(result)) + require.Equal(0, len(collectorErrorLogs)) processed := result[0] require.Equal(testTraceId, processed.TraceID) @@ -298,7 +301,7 @@ func TestTraceParsingPreview(t *testing.T) { // trace parser should work even if parse_from value is empty testPipelines[0].Config[0].SpanId.ParseFrom = "" - result, err = SimulatePipelinesProcessing( + result, collectorErrorLogs, err = SimulatePipelinesProcessing( context.Background(), testPipelines, []model.SignozLog{ @@ -307,6 +310,7 @@ func TestTraceParsingPreview(t *testing.T) { ) require.Nil(err) require.Equal(1, len(result)) + require.Equal(0, len(collectorErrorLogs)) require.Equal("", result[0].SpanID) } diff --git a/pkg/query-service/collectorsimulator/collectorsimulator.go b/pkg/query-service/collectorsimulator/collectorsimulator.go index c4537cf3ee..96a2d9fdf7 100644 --- a/pkg/query-service/collectorsimulator/collectorsimulator.go +++ b/pkg/query-service/collectorsimulator/collectorsimulator.go @@ -1,9 +1,9 @@ package collectorsimulator import ( - "bytes" "context" "fmt" + "os" "strings" "github.com/google/uuid" @@ -19,8 +19,6 @@ import ( "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" "go.signoz.io/signoz/pkg/query-service/collectorsimulator/inmemoryexporter" "go.signoz.io/signoz/pkg/query-service/collectorsimulator/inmemoryreceiver" @@ -33,8 +31,8 @@ type CollectorSimulator struct { // collector service to be used for the simulation collectorSvc *service.Service - // Buffer where collectorSvc will log errors. - collectorErrorLogsBuffer *bytes.Buffer + // tmp file where collectorSvc will log errors. + collectorLogsOutputFilePath string // error channel where collector components will report fatal errors // Gets passed in as AsyncErrorChannel in service.Settings when creating a collector service. @@ -51,15 +49,15 @@ func NewCollectorSimulator( signalType component.DataType, processorFactories map[component.Type]processor.Factory, processorConfigs []ProcessorConfig, -) (*CollectorSimulator, *model.ApiError) { +) (simulator *CollectorSimulator, cleanupFn func(), apiErr *model.ApiError) { // Put together collector component factories for use in the simulation receiverFactories, err := receiver.MakeFactoryMap(inmemoryreceiver.NewFactory()) if err != nil { - return nil, model.InternalError(errors.Wrap(err, "could not create receiver factories.")) + return nil, nil, model.InternalError(errors.Wrap(err, "could not create receiver factories.")) } exporterFactories, err := exporter.MakeFactoryMap(inmemoryexporter.NewFactory()) if err != nil { - return nil, model.InternalError(errors.Wrap(err, "could not create processor factories.")) + return nil, nil, model.InternalError(errors.Wrap(err, "could not create processor factories.")) } factories := otelcol.Factories{ Receivers: receiverFactories, @@ -71,11 +69,30 @@ func NewCollectorSimulator( inMemoryReceiverId := uuid.NewString() inMemoryExporterId := uuid.NewString() + logsOutputFile, err := os.CreateTemp("", "collector-simulator-logs-*") + if err != nil { + return nil, nil, model.InternalError(errors.Wrap( + err, "could not create tmp file for capturing collector logs", + )) + } + collectorLogsOutputFilePath := logsOutputFile.Name() + cleanupFn = func() { + os.Remove(collectorLogsOutputFilePath) + } + err = logsOutputFile.Close() + if err != nil { + return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not close tmp collector log file")) + } + collectorConfYaml, err := generateSimulationConfig( - signalType, inMemoryReceiverId, processorConfigs, inMemoryExporterId, + signalType, + inMemoryReceiverId, + processorConfigs, + inMemoryExporterId, + collectorLogsOutputFilePath, ) if err != nil { - return nil, model.BadRequest(errors.Wrap(err, "could not generate collector config")) + return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "could not generate collector config")) } // Parse and validate collector config @@ -87,20 +104,19 @@ func NewCollectorSimulator( }, }) if err != nil { - return nil, model.BadRequest(errors.Wrap(err, "could not create config provider.")) + return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "could not create config provider.")) } collectorCfg, err := confProvider.Get(ctx, factories) if err != nil { - return nil, model.BadRequest(errors.Wrap(err, "failed to parse collector config")) + return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "failed to parse collector config")) } if err = collectorCfg.Validate(); err != nil { - return nil, model.BadRequest(errors.Wrap(err, "invalid collector config")) + return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "invalid collector config")) } // Build and start collector service. collectorErrChan := make(chan error) - var collectorErrBuf bytes.Buffer svcSettings := service.Settings{ Receivers: receiver.NewBuilder(collectorCfg.Receivers, factories.Receivers), Processors: processor.NewBuilder(collectorCfg.Processors, factories.Processors), @@ -108,23 +124,20 @@ func NewCollectorSimulator( Connectors: connector.NewBuilder(collectorCfg.Connectors, factories.Connectors), Extensions: extension.NewBuilder(collectorCfg.Extensions, factories.Extensions), AsyncErrorChannel: collectorErrChan, - LoggingOptions: []zap.Option{ - zap.ErrorOutput(zapcore.AddSync(&collectorErrBuf)), - }, } collectorSvc, err := service.New(ctx, svcSettings, collectorCfg.Service) if err != nil { - return nil, model.InternalError(errors.Wrap(err, "could not instantiate collector service")) + return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not instantiate collector service")) } return &CollectorSimulator{ - inMemoryReceiverId: inMemoryReceiverId, - inMemoryExporterId: inMemoryExporterId, - collectorSvc: collectorSvc, - collectorErrorLogsBuffer: &collectorErrBuf, - collectorErrorChannel: collectorErrChan, - }, nil + inMemoryReceiverId: inMemoryReceiverId, + inMemoryExporterId: inMemoryExporterId, + collectorSvc: collectorSvc, + collectorErrorChannel: collectorErrChan, + collectorLogsOutputFilePath: collectorLogsOutputFilePath, + }, cleanupFn, nil } func (l *CollectorSimulator) Start(ctx context.Context) ( @@ -168,10 +181,15 @@ func (l *CollectorSimulator) Shutdown(ctx context.Context) ( simulationErrs = append(simulationErrs, reportedErr.Error()) } - if l.collectorErrorLogsBuffer.Len() > 0 { - errBufText := strings.TrimSpace(l.collectorErrorLogsBuffer.String()) - errBufLines := strings.Split(errBufText, "\n") - simulationErrs = append(simulationErrs, errBufLines...) + collectorErrorLogs, err := os.ReadFile(l.collectorLogsOutputFilePath) + if err != nil { + return nil, model.InternalError(fmt.Errorf( + "could not read collector logs from tmp file: %w", err, + )) + } + if len(collectorErrorLogs) > 0 { + errorLines := strings.Split(string(collectorErrorLogs), "\n") + simulationErrs = append(simulationErrs, errorLines...) } if shutdownErr != nil { @@ -187,6 +205,7 @@ func generateSimulationConfig( receiverId string, processorConfigs []ProcessorConfig, exporterId string, + collectorLogsOutputPath string, ) ([]byte, error) { baseConf := fmt.Sprintf(` receivers: @@ -201,7 +220,8 @@ func generateSimulationConfig( level: none logs: level: error - `, receiverId, exporterId) + output_paths: ["%s"] + `, receiverId, exporterId, collectorLogsOutputPath) simulationConf, err := yaml.Parser().Unmarshal([]byte(baseConf)) if err != nil { diff --git a/pkg/query-service/collectorsimulator/logs.go b/pkg/query-service/collectorsimulator/logs.go index ab445f79eb..7d67eb5d55 100644 --- a/pkg/query-service/collectorsimulator/logs.go +++ b/pkg/query-service/collectorsimulator/logs.go @@ -29,16 +29,21 @@ func SimulateLogsProcessing( outputLogs []plog.Logs, collectorErrs []string, apiErr *model.ApiError, ) { // Construct and start a simulator (wraps a collector service) - simulator, apiErr := NewCollectorSimulator( + simulator, simulatorInitCleanup, apiErr := NewCollectorSimulator( ctx, component.DataTypeLogs, processorFactories, processorConfigs, ) + if simulatorInitCleanup != nil { + defer simulatorInitCleanup() + } if apiErr != nil { return nil, nil, model.WrapApiError(apiErr, "could not create logs processing simulator") } simulatorCleanup, apiErr := simulator.Start(ctx) // We can not rely on collector service to shutdown successfully and cleanup refs to inmemory components. - defer simulatorCleanup() + if simulatorCleanup != nil { + defer simulatorCleanup() + } if apiErr != nil { return nil, nil, apiErr }