Fix: log pipelines: generated operators should have appropriate if condition to avoid spamming collector logs (#3870)

* chore: experiment with using a tmp file for getting collector logs in simulator

* chore: collector simulator: cleaned up tmp file based collector logs capture

* chore: add test validating regex proc doesn't error for logs that dont match

* chore: return collector error logs from pipeline preview API

* chore: add test validating regex processor doesn't log errors for mismatched logs

* chore: add if condition for generated regex processors

* chore: add test case validating json parser ignore non json logs

* chore: add if condition for operator generated for json parser

* chore: add test case validating move processor ignores logs with missing field

* chore: add if condition for operator generated for move

* chore: add test case validating copy processor ignores logs with missing field

* chore: add if condition for operator generated for copy

* chore: add test case validating remove processor ignores logs with missing field

* chore: add if condition for operator generated for remove

* chore: log pipelines: ensuring json parser ignores log if json field is missing

* chore: log pipelines: ensure regex parser ignores log if field is missing
This commit is contained in:
Raj Kamal Singh 2023-11-01 22:12:35 +05:30 committed by GitHub
parent bff7142a61
commit 626da7533e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 271 additions and 46 deletions

View File

@ -140,7 +140,7 @@ func (ic *LogParsingPipelineController) PreviewLogsPipelines(
ctx context.Context,
request *PipelinesPreviewRequest,
) (*PipelinesPreviewResponse, *model.ApiError) {
result, err := SimulatePipelinesProcessing(
result, _, err := SimulatePipelinesProcessing(
ctx, request.Pipelines, request.Logs,
)

View File

@ -41,6 +41,7 @@ type PipelineOperator struct {
ID string `json:"id,omitempty" yaml:"id,omitempty"`
Output string `json:"output,omitempty" yaml:"output,omitempty"`
OnError string `json:"on_error,omitempty" yaml:"on_error,omitempty"`
If string `json:"if,omitempty" yaml:"if,omitempty"`
// don't need the following in the final config
OrderId int `json:"orderId" yaml:"-"`

View File

@ -1,6 +1,9 @@
package logparsingpipeline
import (
"fmt"
"strings"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
@ -73,7 +76,35 @@ func getOperators(ops []PipelineOperator) []PipelineOperator {
filteredOp[len(filteredOp)-1].Output = operator.ID
}
if operator.Type == "trace_parser" {
if operator.Type == "regex_parser" {
parseFromParts := strings.Split(operator.ParseFrom, ".")
parseFromPath := strings.Join(parseFromParts, "?.")
operator.If = fmt.Sprintf(
`%s != nil && %s matches "%s"`,
parseFromPath,
parseFromPath,
strings.ReplaceAll(
strings.ReplaceAll(operator.Regex, `\`, `\\`),
`"`, `\"`,
),
)
} else if operator.Type == "json_parser" {
parseFromParts := strings.Split(operator.ParseFrom, ".")
parseFromPath := strings.Join(parseFromParts, "?.")
operator.If = fmt.Sprintf(`%s != nil && %s matches "^\\s*{.*}\\s*$"`, parseFromPath, parseFromPath)
} else if operator.Type == "move" || operator.Type == "copy" {
fromParts := strings.Split(operator.From, ".")
fromPath := strings.Join(fromParts, "?.")
operator.If = fmt.Sprintf(`%s != nil`, fromPath)
} else if operator.Type == "remove" {
fieldParts := strings.Split(operator.Field, ".")
fieldPath := strings.Join(fieldParts, "?.")
operator.If = fmt.Sprintf(`%s != nil`, fieldPath)
} else if operator.Type == "trace_parser" {
cleanTraceParser(&operator)
}

View File

@ -1,9 +1,17 @@
package logparsingpipeline
import (
"context"
"strings"
"testing"
"time"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
)
var prepareProcessorTestData = []struct {
@ -195,3 +203,161 @@ func TestPreparePipelineProcessor(t *testing.T) {
})
}
}
func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) {
require := require.New(t)
testPipelineFilter := &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "method",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
},
},
}
makeTestPipeline := func(config []PipelineOperator) Pipeline {
return Pipeline{
OrderId: 1,
Name: "pipeline1",
Alias: "pipeline1",
Enabled: true,
Filter: testPipelineFilter,
Config: config,
}
}
makeTestLog := func(
body string,
attributes map[string]string,
) model.SignozLog {
attributes["method"] = "GET"
testTraceId, err := utils.RandomHex(16)
require.Nil(err)
testSpanId, err := utils.RandomHex(8)
require.Nil(err)
return model.SignozLog{
Timestamp: uint64(time.Now().UnixNano()),
Body: body,
Attributes_string: attributes,
Resources_string: attributes,
SeverityText: entry.Info.String(),
SeverityNumber: uint8(entry.Info),
SpanID: testSpanId,
TraceID: testTraceId,
}
}
testCases := []struct {
Name string
Operator PipelineOperator
NonMatchingLog model.SignozLog
}{
{
"regex processor should ignore log with missing field",
PipelineOperator{
ID: "regex",
Type: "regex_parser",
Enabled: true,
Name: "regex parser",
ParseFrom: "attributes.test_regex_target",
ParseTo: "attributes",
Regex: `^\s*(?P<json_data>{.*})\s*$`,
},
makeTestLog("mismatching log", map[string]string{}),
}, {
"regex processor should ignore non-matching log",
PipelineOperator{
ID: "regex",
Type: "regex_parser",
Enabled: true,
Name: "regex parser",
ParseFrom: "body",
ParseTo: "attributes",
Regex: `^\s*(?P<body_json>{.*})\s*$`,
},
makeTestLog("mismatching log", map[string]string{}),
}, {
"json parser should ignore logs with missing field.",
PipelineOperator{
ID: "json",
Type: "json_parser",
Enabled: true,
Name: "json parser",
ParseFrom: "attributes.test_json",
ParseTo: "attributes",
},
makeTestLog("mismatching log", map[string]string{}),
},
{
"json parser should ignore log with non JSON target field value",
PipelineOperator{
ID: "json",
Type: "json_parser",
Enabled: true,
Name: "json parser",
ParseFrom: "attributes.test_json",
ParseTo: "attributes",
},
makeTestLog("mismatching log", map[string]string{
"test_json": "bad json",
}),
}, {
"move parser should ignore non matching logs",
PipelineOperator{
ID: "move",
Type: "move",
Enabled: true,
Name: "move",
From: "attributes.test1",
To: "attributes.test2",
},
makeTestLog("mismatching log", map[string]string{}),
}, {
"copy parser should ignore non matching logs",
PipelineOperator{
ID: "copy",
Type: "copy",
Enabled: true,
Name: "copy",
From: "attributes.test1",
To: "attributes.test2",
},
makeTestLog("mismatching log", map[string]string{}),
}, {
"remove parser should ignore non matching logs",
PipelineOperator{
ID: "remove",
Type: "remove",
Enabled: true,
Name: "remove",
Field: "attributes.test",
},
makeTestLog("mismatching log", map[string]string{}),
},
// TODO(Raj): see if there is an error scenario for grok parser.
// TODO(Raj): see if there is an error scenario for trace parser.
// TODO(Raj): see if there is an error scenario for Add operator.
}
for _, testCase := range testCases {
testPipelines := []Pipeline{makeTestPipeline([]PipelineOperator{testCase.Operator})}
result, collectorErrorLogs, err := SimulatePipelinesProcessing(
context.Background(),
testPipelines,
[]model.SignozLog{testCase.NonMatchingLog},
)
require.Nil(err)
require.Equal(0, len(collectorErrorLogs), strings.Join(collectorErrorLogs, "\n"))
require.Equal(1, len(result))
}
}

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sort"
"strings"
"time"
_ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok"
@ -23,11 +22,11 @@ func SimulatePipelinesProcessing(
pipelines []Pipeline,
logs []model.SignozLog,
) (
[]model.SignozLog, *model.ApiError,
output []model.SignozLog, collectorErrorLogs []string, apiErr *model.ApiError,
) {
if len(pipelines) < 1 {
return logs, nil
return logs, nil, nil
}
// Collector simulation does not guarantee that logs will come
@ -46,7 +45,7 @@ func SimulatePipelinesProcessing(
// Simulate processing of logs through an otel collector
processorConfigs, err := collectorProcessorsForPipelines(pipelines)
if err != nil {
return nil, model.BadRequest(errors.Wrap(
return nil, nil, model.BadRequest(errors.Wrap(
err, "could not prepare otel processors for pipelines",
))
}
@ -55,7 +54,7 @@ func SimulatePipelinesProcessing(
logstransformprocessor.NewFactory(),
)
if err != nil {
return nil, model.InternalError(errors.Wrap(
return nil, nil, model.InternalError(errors.Wrap(
err, "could not construct processor factory map",
))
}
@ -75,10 +74,9 @@ func SimulatePipelinesProcessing(
simulatorInputPLogs,
timeout,
)
collectorErrsText := strings.Join(collectorErrs, "\n")
if apiErr != nil {
return nil, model.WrapApiError(apiErr, fmt.Sprintf(
"could not simulate log pipelines processing.\nCollector errors: %s\n", collectorErrsText,
return nil, collectorErrs, model.WrapApiError(apiErr, fmt.Sprintf(
"could not simulate log pipelines processing.\nCollector errors",
))
}
@ -94,7 +92,7 @@ func SimulatePipelinesProcessing(
delete(sigLog.Attributes_int64, inputOrderAttribute)
}
return outputSignozLogs, nil
return outputSignozLogs, collectorErrs, nil
}
func collectorProcessorsForPipelines(pipelines []Pipeline) (

View File

@ -104,7 +104,7 @@ func TestPipelinePreview(t *testing.T) {
},
)
result, err := SimulatePipelinesProcessing(
result, collectorErrorLogs, err := SimulatePipelinesProcessing(
context.Background(),
testPipelines,
[]model.SignozLog{
@ -114,6 +114,7 @@ func TestPipelinePreview(t *testing.T) {
)
require.Nil(err)
require.Equal(0, len(collectorErrorLogs))
require.Equal(2, len(result))
// matching log should have been modified as expected.
@ -189,7 +190,7 @@ func TestGrokParsingPreview(t *testing.T) {
"method": "GET",
},
)
result, err := SimulatePipelinesProcessing(
result, collectorErrorLogs, err := SimulatePipelinesProcessing(
context.Background(),
testPipelines,
[]model.SignozLog{
@ -198,6 +199,7 @@ func TestGrokParsingPreview(t *testing.T) {
)
require.Nil(err)
require.Equal(0, len(collectorErrorLogs))
require.Equal(1, len(result))
processed := result[0]
@ -278,7 +280,7 @@ func TestTraceParsingPreview(t *testing.T) {
TraceFlags: 0,
}
result, err := SimulatePipelinesProcessing(
result, collectorErrorLogs, err := SimulatePipelinesProcessing(
context.Background(),
testPipelines,
[]model.SignozLog{
@ -287,6 +289,7 @@ func TestTraceParsingPreview(t *testing.T) {
)
require.Nil(err)
require.Equal(1, len(result))
require.Equal(0, len(collectorErrorLogs))
processed := result[0]
require.Equal(testTraceId, processed.TraceID)
@ -298,7 +301,7 @@ func TestTraceParsingPreview(t *testing.T) {
// trace parser should work even if parse_from value is empty
testPipelines[0].Config[0].SpanId.ParseFrom = ""
result, err = SimulatePipelinesProcessing(
result, collectorErrorLogs, err = SimulatePipelinesProcessing(
context.Background(),
testPipelines,
[]model.SignozLog{
@ -307,6 +310,7 @@ func TestTraceParsingPreview(t *testing.T) {
)
require.Nil(err)
require.Equal(1, len(result))
require.Equal(0, len(collectorErrorLogs))
require.Equal("", result[0].SpanID)
}

View File

@ -1,9 +1,9 @@
package collectorsimulator
import (
"bytes"
"context"
"fmt"
"os"
"strings"
"github.com/google/uuid"
@ -19,8 +19,6 @@ import (
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.signoz.io/signoz/pkg/query-service/collectorsimulator/inmemoryexporter"
"go.signoz.io/signoz/pkg/query-service/collectorsimulator/inmemoryreceiver"
@ -33,8 +31,8 @@ type CollectorSimulator struct {
// collector service to be used for the simulation
collectorSvc *service.Service
// Buffer where collectorSvc will log errors.
collectorErrorLogsBuffer *bytes.Buffer
// tmp file where collectorSvc will log errors.
collectorLogsOutputFilePath string
// error channel where collector components will report fatal errors
// Gets passed in as AsyncErrorChannel in service.Settings when creating a collector service.
@ -51,15 +49,15 @@ func NewCollectorSimulator(
signalType component.DataType,
processorFactories map[component.Type]processor.Factory,
processorConfigs []ProcessorConfig,
) (*CollectorSimulator, *model.ApiError) {
) (simulator *CollectorSimulator, cleanupFn func(), apiErr *model.ApiError) {
// Put together collector component factories for use in the simulation
receiverFactories, err := receiver.MakeFactoryMap(inmemoryreceiver.NewFactory())
if err != nil {
return nil, model.InternalError(errors.Wrap(err, "could not create receiver factories."))
return nil, nil, model.InternalError(errors.Wrap(err, "could not create receiver factories."))
}
exporterFactories, err := exporter.MakeFactoryMap(inmemoryexporter.NewFactory())
if err != nil {
return nil, model.InternalError(errors.Wrap(err, "could not create processor factories."))
return nil, nil, model.InternalError(errors.Wrap(err, "could not create processor factories."))
}
factories := otelcol.Factories{
Receivers: receiverFactories,
@ -71,11 +69,30 @@ func NewCollectorSimulator(
inMemoryReceiverId := uuid.NewString()
inMemoryExporterId := uuid.NewString()
logsOutputFile, err := os.CreateTemp("", "collector-simulator-logs-*")
if err != nil {
return nil, nil, model.InternalError(errors.Wrap(
err, "could not create tmp file for capturing collector logs",
))
}
collectorLogsOutputFilePath := logsOutputFile.Name()
cleanupFn = func() {
os.Remove(collectorLogsOutputFilePath)
}
err = logsOutputFile.Close()
if err != nil {
return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not close tmp collector log file"))
}
collectorConfYaml, err := generateSimulationConfig(
signalType, inMemoryReceiverId, processorConfigs, inMemoryExporterId,
signalType,
inMemoryReceiverId,
processorConfigs,
inMemoryExporterId,
collectorLogsOutputFilePath,
)
if err != nil {
return nil, model.BadRequest(errors.Wrap(err, "could not generate collector config"))
return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "could not generate collector config"))
}
// Parse and validate collector config
@ -87,20 +104,19 @@ func NewCollectorSimulator(
},
})
if err != nil {
return nil, model.BadRequest(errors.Wrap(err, "could not create config provider."))
return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "could not create config provider."))
}
collectorCfg, err := confProvider.Get(ctx, factories)
if err != nil {
return nil, model.BadRequest(errors.Wrap(err, "failed to parse collector config"))
return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "failed to parse collector config"))
}
if err = collectorCfg.Validate(); err != nil {
return nil, model.BadRequest(errors.Wrap(err, "invalid collector config"))
return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "invalid collector config"))
}
// Build and start collector service.
collectorErrChan := make(chan error)
var collectorErrBuf bytes.Buffer
svcSettings := service.Settings{
Receivers: receiver.NewBuilder(collectorCfg.Receivers, factories.Receivers),
Processors: processor.NewBuilder(collectorCfg.Processors, factories.Processors),
@ -108,23 +124,20 @@ func NewCollectorSimulator(
Connectors: connector.NewBuilder(collectorCfg.Connectors, factories.Connectors),
Extensions: extension.NewBuilder(collectorCfg.Extensions, factories.Extensions),
AsyncErrorChannel: collectorErrChan,
LoggingOptions: []zap.Option{
zap.ErrorOutput(zapcore.AddSync(&collectorErrBuf)),
},
}
collectorSvc, err := service.New(ctx, svcSettings, collectorCfg.Service)
if err != nil {
return nil, model.InternalError(errors.Wrap(err, "could not instantiate collector service"))
return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not instantiate collector service"))
}
return &CollectorSimulator{
inMemoryReceiverId: inMemoryReceiverId,
inMemoryExporterId: inMemoryExporterId,
collectorSvc: collectorSvc,
collectorErrorLogsBuffer: &collectorErrBuf,
collectorErrorChannel: collectorErrChan,
}, nil
inMemoryReceiverId: inMemoryReceiverId,
inMemoryExporterId: inMemoryExporterId,
collectorSvc: collectorSvc,
collectorErrorChannel: collectorErrChan,
collectorLogsOutputFilePath: collectorLogsOutputFilePath,
}, cleanupFn, nil
}
func (l *CollectorSimulator) Start(ctx context.Context) (
@ -168,10 +181,15 @@ func (l *CollectorSimulator) Shutdown(ctx context.Context) (
simulationErrs = append(simulationErrs, reportedErr.Error())
}
if l.collectorErrorLogsBuffer.Len() > 0 {
errBufText := strings.TrimSpace(l.collectorErrorLogsBuffer.String())
errBufLines := strings.Split(errBufText, "\n")
simulationErrs = append(simulationErrs, errBufLines...)
collectorErrorLogs, err := os.ReadFile(l.collectorLogsOutputFilePath)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not read collector logs from tmp file: %w", err,
))
}
if len(collectorErrorLogs) > 0 {
errorLines := strings.Split(string(collectorErrorLogs), "\n")
simulationErrs = append(simulationErrs, errorLines...)
}
if shutdownErr != nil {
@ -187,6 +205,7 @@ func generateSimulationConfig(
receiverId string,
processorConfigs []ProcessorConfig,
exporterId string,
collectorLogsOutputPath string,
) ([]byte, error) {
baseConf := fmt.Sprintf(`
receivers:
@ -201,7 +220,8 @@ func generateSimulationConfig(
level: none
logs:
level: error
`, receiverId, exporterId)
output_paths: ["%s"]
`, receiverId, exporterId, collectorLogsOutputPath)
simulationConf, err := yaml.Parser().Unmarshal([]byte(baseConf))
if err != nil {

View File

@ -29,16 +29,21 @@ func SimulateLogsProcessing(
outputLogs []plog.Logs, collectorErrs []string, apiErr *model.ApiError,
) {
// Construct and start a simulator (wraps a collector service)
simulator, apiErr := NewCollectorSimulator(
simulator, simulatorInitCleanup, apiErr := NewCollectorSimulator(
ctx, component.DataTypeLogs, processorFactories, processorConfigs,
)
if simulatorInitCleanup != nil {
defer simulatorInitCleanup()
}
if apiErr != nil {
return nil, nil, model.WrapApiError(apiErr, "could not create logs processing simulator")
}
simulatorCleanup, apiErr := simulator.Start(ctx)
// We can not rely on collector service to shutdown successfully and cleanup refs to inmemory components.
defer simulatorCleanup()
if simulatorCleanup != nil {
defer simulatorCleanup()
}
if apiErr != nil {
return nil, nil, apiErr
}