mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 06:39:03 +08:00
Fix: pipeline alias collisions shouldnt lead to duplicate log processors (#5372)
* chore: add test validating pipeline alias collisions dont lead to bad config recommendations * chore: emit error log on detecting duplicate processors in generated config * chore: ensure collector config processor names for pipelines are unique * chore: minor cleanups
This commit is contained in:
parent
c5d23336a7
commit
fd603b8fdf
@ -142,6 +142,11 @@ func checkDuplicateString(pipeline []string) bool {
|
||||
for _, processor := range pipeline {
|
||||
name := processor
|
||||
if _, ok := exists[name]; ok {
|
||||
zap.L().Error(
|
||||
"duplicate processor name detected in generated collector config for log pipelines",
|
||||
zap.String("processor", processor),
|
||||
zap.Any("pipeline", pipeline),
|
||||
)
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -5,7 +5,10 @@ import (
|
||||
"testing"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
var buildProcessorTestData = []struct {
|
||||
@ -204,3 +207,89 @@ func TestBuildLogsPipeline(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPipelineAliasCollisionsDontResultInDuplicateCollectorProcessors(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
baseConf := []byte(`
|
||||
receivers:
|
||||
memory:
|
||||
id: in-memory-receiver
|
||||
exporters:
|
||||
memory:
|
||||
id: in-memory-exporter
|
||||
service:
|
||||
pipelines:
|
||||
logs:
|
||||
receivers:
|
||||
- memory
|
||||
processors: []
|
||||
exporters:
|
||||
- memory
|
||||
`)
|
||||
|
||||
makeTestPipeline := func(name string, alias string) Pipeline {
|
||||
return Pipeline{
|
||||
OrderId: 1,
|
||||
Name: name,
|
||||
Alias: alias,
|
||||
Enabled: true,
|
||||
Filter: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
},
|
||||
Operator: "=",
|
||||
Value: "GET",
|
||||
},
|
||||
},
|
||||
},
|
||||
Config: []PipelineOperator{
|
||||
{
|
||||
ID: "regex",
|
||||
Type: "regex_parser",
|
||||
Enabled: true,
|
||||
Name: "regex parser",
|
||||
ParseFrom: "attributes.test_regex_target",
|
||||
ParseTo: "attributes",
|
||||
Regex: `^\s*(?P<json_data>{.*})\s*$`,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
testPipelines := []Pipeline{
|
||||
makeTestPipeline("test pipeline 1", "pipeline-alias"),
|
||||
makeTestPipeline("test pipeline 2", "pipeline-alias"),
|
||||
}
|
||||
|
||||
recommendedConfYaml, apiErr := GenerateCollectorConfigWithPipelines(
|
||||
baseConf, testPipelines,
|
||||
)
|
||||
require.Nil(apiErr, fmt.Sprintf("couldn't generate config recommendation: %v", apiErr))
|
||||
|
||||
var recommendedConf map[string]interface{}
|
||||
err := yaml.Unmarshal(recommendedConfYaml, &recommendedConf)
|
||||
require.Nil(err, "couldn't unmarshal recommended config")
|
||||
|
||||
logsProcessors := recommendedConf["service"].(map[string]any)["pipelines"].(map[string]any)["logs"].(map[string]any)["processors"].([]any)
|
||||
|
||||
require.Equal(
|
||||
len(logsProcessors), len(testPipelines),
|
||||
"test pipelines not included in recommended config as expected",
|
||||
)
|
||||
|
||||
recommendedConfYaml2, apiErr := GenerateCollectorConfigWithPipelines(
|
||||
baseConf, testPipelines,
|
||||
)
|
||||
require.Nil(apiErr, fmt.Sprintf("couldn't generate config recommendation again: %v", apiErr))
|
||||
require.Equal(
|
||||
string(recommendedConfYaml), string(recommendedConfYaml2),
|
||||
"collector config should not change across recommendations for same set of pipelines",
|
||||
)
|
||||
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ func CollectorConfProcessorName(p Pipeline) string {
|
||||
func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []string, error) {
|
||||
processors := map[string]interface{}{}
|
||||
names := []string{}
|
||||
for _, v := range pipelines {
|
||||
for pipelineIdx, v := range pipelines {
|
||||
if !v.Enabled {
|
||||
continue
|
||||
}
|
||||
@ -70,6 +70,12 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s
|
||||
Operators: v.Config,
|
||||
}
|
||||
name := CollectorConfProcessorName(v)
|
||||
|
||||
// Ensure name is unique
|
||||
if _, nameExists := processors[name]; nameExists {
|
||||
name = fmt.Sprintf("%s-%d", name, pipelineIdx)
|
||||
}
|
||||
|
||||
processors[name] = processor
|
||||
names = append(names, name)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user