From eb4ac18162c5f7f6589ce4639be746636af75fbf Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Fri, 17 Mar 2023 17:39:28 +0530 Subject: [PATCH] feat: processor builder updated with new logic and tests --- pkg/query-service/app/opamp/logspipeline.go | 40 ++++++++--------- .../app/opamp/logspipeline_test.go | 45 +++++++++++++++++++ 2 files changed, 64 insertions(+), 21 deletions(-) diff --git a/pkg/query-service/app/opamp/logspipeline.go b/pkg/query-service/app/opamp/logspipeline.go index f2467e05e0..36f4a1473b 100644 --- a/pkg/query-service/app/opamp/logspipeline.go +++ b/pkg/query-service/app/opamp/logspipeline.go @@ -157,7 +157,7 @@ func buildLogsProcessors(current []string, logsParserPipeline []string) ([]strin // create a reverse map of existing config processors and their position existing := map[string]int{} - for i, p := range current { + for i, p := range pipeline { name := p existing[name] = i } @@ -165,49 +165,47 @@ func buildLogsProcessors(current []string, logsParserPipeline []string) ([]strin // create mapping from our logsParserPipeline to position in existing processors (from current config) // this means, if "batch" holds position 3 in the current effective config, and 2 in our config, the map will be [2]: 3 specVsExistingMap := map[int]int{} + existingVsSpec := map[int]int{} // go through plan and map its elements to current positions in effective config for i, m := range logsParserPipeline { if loc, ok := existing[m]; ok { specVsExistingMap[i] = loc + existingVsSpec[loc] = i } } lastMatched := 0 + newPipeline := []string{} - // go through plan again in the increasing order for i := 0; i < len(logsParserPipeline); i++ { m := logsParserPipeline[i] - if loc, ok := specVsExistingMap[i]; ok { + for j := lastMatched; j < loc; j++ { + if strings.HasPrefix(pipeline[j], constants.LogsPPLPfx) { + delete(specVsExistingMap, existingVsSpec[j]) + } else { + newPipeline = append(newPipeline, pipeline[j]) + } + } + newPipeline = append(newPipeline, pipeline[loc]) lastMatched = loc + 1 } else { - if lastMatched <= 0 { - zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position 0:", m) - pipeline = append([]string{m}, pipeline[lastMatched:]...) - lastMatched++ - } else { - zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position :", lastMatched, " ", m) - - prior := make([]string, len(pipeline[:lastMatched])) - next := make([]string, len(pipeline[lastMatched:])) - - copy(prior, pipeline[:lastMatched]) - copy(next, pipeline[lastMatched:]) - - pipeline = append(prior, m) - pipeline = append(pipeline, next...) - } + newPipeline = append(newPipeline, m) } + + } + if lastMatched < len(pipeline) { + newPipeline = append(newPipeline, pipeline[lastMatched:]...) } - if checkDuplicateString(pipeline) { + if checkDuplicateString(newPipeline) { // duplicates are most likely because the processor sequence in effective config conflicts // with the planned sequence as per planned pipeline return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline) } - return pipeline, nil + return newPipeline, nil } func checkDuplicateString(pipeline []string) bool { diff --git a/pkg/query-service/app/opamp/logspipeline_test.go b/pkg/query-service/app/opamp/logspipeline_test.go index 26dec4e34d..eef08870dd 100644 --- a/pkg/query-service/app/opamp/logspipeline_test.go +++ b/pkg/query-service/app/opamp/logspipeline_test.go @@ -117,6 +117,12 @@ var BuildLogsPipelineTestData = []struct { logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"}, expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"}, }, + { + Name: "Add new pipeline and respect custom processors", + currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d", "processor2"}, + }, { Name: "Add new pipeline and respect custom processors in the beginning and middle", currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"}, @@ -147,6 +153,45 @@ var BuildLogsPipelineTestData = []struct { logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"}, expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"}, }, + + // working + { + Name: "rearrange pipelines", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"}, + expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch"}, + }, + { + Name: "rearrange pipelines with new processor", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"}, + expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"}, + // expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"}, + }, + { + Name: "delete processor", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, + logsPipeline: []string{}, + expectedPipeline: []string{"processor1", "processor2", "processor3", "batch"}, + }, + { + Name: "last to first", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"}, + logsPipeline: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"}, + expectedPipeline: []string{"processor1", "processor2", "processor3", "processor4", "batch", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"}, + }, + { + Name: "multiple rearrange pipelines", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"}, + logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"}, + expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"}, + }, + { + Name: "multiple rearrange with new pipelines", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"}, + logsPipeline: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"}, + expectedPipeline: []string{constants.LogsPPLPfx + "_z", "processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"}, + }, } func TestBuildLogsPipeline(t *testing.T) {