mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 02:19:02 +08:00
feat: processor builder updated with new logic and tests
This commit is contained in:
parent
b5debe6ea2
commit
eb4ac18162
@ -157,7 +157,7 @@ func buildLogsProcessors(current []string, logsParserPipeline []string) ([]strin
|
|||||||
|
|
||||||
// create a reverse map of existing config processors and their position
|
// create a reverse map of existing config processors and their position
|
||||||
existing := map[string]int{}
|
existing := map[string]int{}
|
||||||
for i, p := range current {
|
for i, p := range pipeline {
|
||||||
name := p
|
name := p
|
||||||
existing[name] = i
|
existing[name] = i
|
||||||
}
|
}
|
||||||
@ -165,49 +165,47 @@ func buildLogsProcessors(current []string, logsParserPipeline []string) ([]strin
|
|||||||
// create mapping from our logsParserPipeline to position in existing processors (from current config)
|
// create mapping from our logsParserPipeline to position in existing processors (from current config)
|
||||||
// this means, if "batch" holds position 3 in the current effective config, and 2 in our config, the map will be [2]: 3
|
// this means, if "batch" holds position 3 in the current effective config, and 2 in our config, the map will be [2]: 3
|
||||||
specVsExistingMap := map[int]int{}
|
specVsExistingMap := map[int]int{}
|
||||||
|
existingVsSpec := map[int]int{}
|
||||||
|
|
||||||
// go through plan and map its elements to current positions in effective config
|
// go through plan and map its elements to current positions in effective config
|
||||||
for i, m := range logsParserPipeline {
|
for i, m := range logsParserPipeline {
|
||||||
if loc, ok := existing[m]; ok {
|
if loc, ok := existing[m]; ok {
|
||||||
specVsExistingMap[i] = loc
|
specVsExistingMap[i] = loc
|
||||||
|
existingVsSpec[loc] = i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lastMatched := 0
|
lastMatched := 0
|
||||||
|
newPipeline := []string{}
|
||||||
|
|
||||||
// go through plan again in the increasing order
|
|
||||||
for i := 0; i < len(logsParserPipeline); i++ {
|
for i := 0; i < len(logsParserPipeline); i++ {
|
||||||
m := logsParserPipeline[i]
|
m := logsParserPipeline[i]
|
||||||
|
|
||||||
if loc, ok := specVsExistingMap[i]; ok {
|
if loc, ok := specVsExistingMap[i]; ok {
|
||||||
|
for j := lastMatched; j < loc; j++ {
|
||||||
|
if strings.HasPrefix(pipeline[j], constants.LogsPPLPfx) {
|
||||||
|
delete(specVsExistingMap, existingVsSpec[j])
|
||||||
|
} else {
|
||||||
|
newPipeline = append(newPipeline, pipeline[j])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
newPipeline = append(newPipeline, pipeline[loc])
|
||||||
lastMatched = loc + 1
|
lastMatched = loc + 1
|
||||||
} else {
|
} else {
|
||||||
if lastMatched <= 0 {
|
newPipeline = append(newPipeline, m)
|
||||||
zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position 0:", m)
|
|
||||||
pipeline = append([]string{m}, pipeline[lastMatched:]...)
|
|
||||||
lastMatched++
|
|
||||||
} else {
|
|
||||||
zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position :", lastMatched, " ", m)
|
|
||||||
|
|
||||||
prior := make([]string, len(pipeline[:lastMatched]))
|
|
||||||
next := make([]string, len(pipeline[lastMatched:]))
|
|
||||||
|
|
||||||
copy(prior, pipeline[:lastMatched])
|
|
||||||
copy(next, pipeline[lastMatched:])
|
|
||||||
|
|
||||||
pipeline = append(prior, m)
|
|
||||||
pipeline = append(pipeline, next...)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
if lastMatched < len(pipeline) {
|
||||||
|
newPipeline = append(newPipeline, pipeline[lastMatched:]...)
|
||||||
}
|
}
|
||||||
|
|
||||||
if checkDuplicateString(pipeline) {
|
if checkDuplicateString(newPipeline) {
|
||||||
// duplicates are most likely because the processor sequence in effective config conflicts
|
// duplicates are most likely because the processor sequence in effective config conflicts
|
||||||
// with the planned sequence as per planned pipeline
|
// with the planned sequence as per planned pipeline
|
||||||
return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline)
|
return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline)
|
||||||
}
|
}
|
||||||
|
|
||||||
return pipeline, nil
|
return newPipeline, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkDuplicateString(pipeline []string) bool {
|
func checkDuplicateString(pipeline []string) bool {
|
||||||
|
@ -117,6 +117,12 @@ var BuildLogsPipelineTestData = []struct {
|
|||||||
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
|
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
|
||||||
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"},
|
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "Add new pipeline and respect custom processors",
|
||||||
|
currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
|
||||||
|
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d", "processor2"},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Name: "Add new pipeline and respect custom processors in the beginning and middle",
|
Name: "Add new pipeline and respect custom processors in the beginning and middle",
|
||||||
currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"},
|
currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"},
|
||||||
@ -147,6 +153,45 @@ var BuildLogsPipelineTestData = []struct {
|
|||||||
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"},
|
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"},
|
||||||
expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"},
|
expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
// working
|
||||||
|
{
|
||||||
|
Name: "rearrange pipelines",
|
||||||
|
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"},
|
||||||
|
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "rearrange pipelines with new processor",
|
||||||
|
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"},
|
||||||
|
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"},
|
||||||
|
// expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "delete processor",
|
||||||
|
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||||
|
logsPipeline: []string{},
|
||||||
|
expectedPipeline: []string{"processor1", "processor2", "processor3", "batch"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "last to first",
|
||||||
|
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
|
||||||
|
expectedPipeline: []string{"processor1", "processor2", "processor3", "processor4", "batch", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "multiple rearrange pipelines",
|
||||||
|
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
|
||||||
|
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "multiple rearrange with new pipelines",
|
||||||
|
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
|
||||||
|
expectedPipeline: []string{constants.LogsPPLPfx + "_z", "processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuildLogsPipeline(t *testing.T) {
|
func TestBuildLogsPipeline(t *testing.T) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user