From fd603b8fdf71ad124141c6010480286a79c04f8b Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+raj-k-singh@users.noreply.github.com> Date: Fri, 28 Jun 2024 09:31:21 +0530 Subject: [PATCH] Fix: pipeline alias collisions shouldnt lead to duplicate log processors (#5372) * chore: add test validating pipeline alias collisions dont lead to bad config recommendations * chore: emit error log on detecting duplicate processors in generated config * chore: ensure collector config processor names for pipelines are unique * chore: minor cleanups --- .../logparsingpipeline/collector_config.go | 5 ++ .../collector_config_test.go | 89 +++++++++++++++++++ .../app/logparsingpipeline/pipelineBuilder.go | 8 +- 3 files changed, 101 insertions(+), 1 deletion(-) diff --git a/pkg/query-service/app/logparsingpipeline/collector_config.go b/pkg/query-service/app/logparsingpipeline/collector_config.go index 17b8d96c1e..49f697fbd3 100644 --- a/pkg/query-service/app/logparsingpipeline/collector_config.go +++ b/pkg/query-service/app/logparsingpipeline/collector_config.go @@ -142,6 +142,11 @@ func checkDuplicateString(pipeline []string) bool { for _, processor := range pipeline { name := processor if _, ok := exists[name]; ok { + zap.L().Error( + "duplicate processor name detected in generated collector config for log pipelines", + zap.String("processor", processor), + zap.Any("pipeline", pipeline), + ) return true } diff --git a/pkg/query-service/app/logparsingpipeline/collector_config_test.go b/pkg/query-service/app/logparsingpipeline/collector_config_test.go index 8ef79875d5..f5ba7a352b 100644 --- a/pkg/query-service/app/logparsingpipeline/collector_config_test.go +++ b/pkg/query-service/app/logparsingpipeline/collector_config_test.go @@ -5,7 +5,10 @@ import ( "testing" . "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/require" "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "gopkg.in/yaml.v3" ) var buildProcessorTestData = []struct { @@ -204,3 +207,89 @@ func TestBuildLogsPipeline(t *testing.T) { }) } } + +func TestPipelineAliasCollisionsDontResultInDuplicateCollectorProcessors(t *testing.T) { + require := require.New(t) + + baseConf := []byte(` + receivers: + memory: + id: in-memory-receiver + exporters: + memory: + id: in-memory-exporter + service: + pipelines: + logs: + receivers: + - memory + processors: [] + exporters: + - memory + `) + + makeTestPipeline := func(name string, alias string) Pipeline { + return Pipeline{ + OrderId: 1, + Name: name, + Alias: alias, + Enabled: true, + Filter: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "GET", + }, + }, + }, + Config: []PipelineOperator{ + { + ID: "regex", + Type: "regex_parser", + Enabled: true, + Name: "regex parser", + ParseFrom: "attributes.test_regex_target", + ParseTo: "attributes", + Regex: `^\s*(?P{.*})\s*$`, + }, + }, + } + } + + testPipelines := []Pipeline{ + makeTestPipeline("test pipeline 1", "pipeline-alias"), + makeTestPipeline("test pipeline 2", "pipeline-alias"), + } + + recommendedConfYaml, apiErr := GenerateCollectorConfigWithPipelines( + baseConf, testPipelines, + ) + require.Nil(apiErr, fmt.Sprintf("couldn't generate config recommendation: %v", apiErr)) + + var recommendedConf map[string]interface{} + err := yaml.Unmarshal(recommendedConfYaml, &recommendedConf) + require.Nil(err, "couldn't unmarshal recommended config") + + logsProcessors := recommendedConf["service"].(map[string]any)["pipelines"].(map[string]any)["logs"].(map[string]any)["processors"].([]any) + + require.Equal( + len(logsProcessors), len(testPipelines), + "test pipelines not included in recommended config as expected", + ) + + recommendedConfYaml2, apiErr := GenerateCollectorConfigWithPipelines( + baseConf, testPipelines, + ) + require.Nil(apiErr, fmt.Sprintf("couldn't generate config recommendation again: %v", apiErr)) + require.Equal( + string(recommendedConfYaml), string(recommendedConfYaml2), + "collector config should not change across recommendations for same set of pipelines", + ) + +} diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go index e569d88a82..5dd6406a79 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go @@ -24,7 +24,7 @@ func CollectorConfProcessorName(p Pipeline) string { func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []string, error) { processors := map[string]interface{}{} names := []string{} - for _, v := range pipelines { + for pipelineIdx, v := range pipelines { if !v.Enabled { continue } @@ -70,6 +70,12 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s Operators: v.Config, } name := CollectorConfProcessorName(v) + + // Ensure name is unique + if _, nameExists := processors[name]; nameExists { + name = fmt.Sprintf("%s-%d", name, pipelineIdx) + } + processors[name] = processor names = append(names, name) }