diff --git a/pkg/query-service/agentConf/manager.go b/pkg/query-service/agentConf/manager.go index 8abcbacaa7..b26d382070 100644 --- a/pkg/query-service/agentConf/manager.go +++ b/pkg/query-service/agentConf/manager.go @@ -212,7 +212,7 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi } // UpsertLogParsingProcessors updates the agent with log parsing processors -func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []interface{}) error { +func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []string) error { if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) { return fmt.Errorf("agent updater is busy") } diff --git a/pkg/query-service/app/opamp/logspipeline.go b/pkg/query-service/app/opamp/logspipeline.go index c1dffa0980..f2467e05e0 100644 --- a/pkg/query-service/app/opamp/logspipeline.go +++ b/pkg/query-service/app/opamp/logspipeline.go @@ -3,6 +3,7 @@ package opamp import ( "context" "crypto/sha256" + "encoding/json" "fmt" "strings" "sync" @@ -16,7 +17,7 @@ import ( var lockLogsPipelineSpec sync.RWMutex -func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[string]interface{}, parsingProcessorsNames []interface{}, callback func(string, string, error)) (string, error) { +func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[string]interface{}, parsingProcessorsNames []string, callback func(string, string, error)) (string, error) { confHash := "" if opAmpServer == nil { return confHash, fmt.Errorf("opamp server is down, unable to push config to agent at this moment") @@ -36,15 +37,20 @@ func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[strin buildLogParsingProcessors(c, parsingProcessors) - // get the processor list - logs := c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] - processors := logs.(map[string]interface{})["processors"].([]interface{}) + p, err := getOtelPipelinFromConfig(c) + if err != nil { + return confHash, err + } + if p.Pipelines.Logs == nil { + return confHash, fmt.Errorf("logs pipeline doesn't exist") + } // build the new processor list - updatedProcessorList, _ := buildLogsProcessors(processors, parsingProcessorsNames) + updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, parsingProcessorsNames) + p.Pipelines.Logs.Processors = updatedProcessorList - // add the new processor to the data - c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"].(map[string]interface{})["processors"] = updatedProcessorList + // add the new processor to the data ( no checks required as the keys will exists) + c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs updatedConf, err := yaml.Parser().Marshal(c) if err != nil { @@ -106,19 +112,44 @@ func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface return nil } -func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{}) ([]interface{}, error) { +type otelPipeline struct { + Pipelines struct { + Logs *struct { + Exporters []string `json:"exporters" yaml:"exporters"` + Processors []string `json:"processors" yaml:"processors"` + Receivers []string `json:"receivers" yaml:"receivers"` + } `json:"logs" yaml:"logs"` + } `json:"pipelines" yaml:"pipelines"` +} + +func getOtelPipelinFromConfig(config map[string]interface{}) (*otelPipeline, error) { + if _, ok := config["service"]; !ok { + return nil, fmt.Errorf("service not found in OTEL config") + } + b, err := json.Marshal(config["service"]) + if err != nil { + return nil, err + } + p := otelPipeline{} + if err := json.Unmarshal(b, &p); err != nil { + return nil, err + } + return &p, nil +} + +func buildLogsProcessors(current []string, logsParserPipeline []string) ([]string, error) { lockLogsPipelineSpec.Lock() defer lockLogsPipelineSpec.Unlock() exists := map[string]struct{}{} for _, v := range logsParserPipeline { - exists[v.(string)] = struct{}{} + exists[v] = struct{}{} } // removed the old processors which are not used - var pipeline []interface{} + var pipeline []string for _, v := range current { - k := v.(string) + k := v if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) { pipeline = append(pipeline, v) } @@ -127,7 +158,7 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} // create a reverse map of existing config processors and their position existing := map[string]int{} for i, p := range current { - name := p.(string) + name := p existing[name] = i } @@ -137,7 +168,7 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} // go through plan and map its elements to current positions in effective config for i, m := range logsParserPipeline { - if loc, ok := existing[m.(string)]; ok { + if loc, ok := existing[m]; ok { specVsExistingMap[i] = loc } } @@ -153,13 +184,13 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} } else { if lastMatched <= 0 { zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position 0:", m) - pipeline = append([]interface{}{m}, pipeline[lastMatched:]...) + pipeline = append([]string{m}, pipeline[lastMatched:]...) lastMatched++ } else { zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position :", lastMatched, " ", m) - prior := make([]interface{}, len(pipeline[:lastMatched])) - next := make([]interface{}, len(pipeline[lastMatched:])) + prior := make([]string, len(pipeline[:lastMatched])) + next := make([]string, len(pipeline[lastMatched:])) copy(prior, pipeline[:lastMatched]) copy(next, pipeline[lastMatched:]) @@ -170,7 +201,7 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} } } - if checkDuplicates(pipeline) { + if checkDuplicateString(pipeline) { // duplicates are most likely because the processor sequence in effective config conflicts // with the planned sequence as per planned pipeline return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline) @@ -178,3 +209,17 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} return pipeline, nil } + +func checkDuplicateString(pipeline []string) bool { + exists := make(map[string]bool, len(pipeline)) + zap.S().Debugf("checking duplicate processors in the pipeline:", pipeline) + for _, processor := range pipeline { + name := processor + if _, ok := exists[name]; ok { + return true + } + + exists[name] = true + } + return false +} diff --git a/pkg/query-service/app/opamp/logspipeline_test.go b/pkg/query-service/app/opamp/logspipeline_test.go index 703d5ecb6e..011e33473a 100644 --- a/pkg/query-service/app/opamp/logspipeline_test.go +++ b/pkg/query-service/app/opamp/logspipeline_test.go @@ -22,11 +22,11 @@ var buildProcessorTestData = []struct { }, }, pipelineProcessor: map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, }, outputConf: map[string]interface{}{ "processors": map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, @@ -35,7 +35,7 @@ var buildProcessorTestData = []struct { Name: "Remove", agentConf: map[string]interface{}{ "processors": map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, @@ -50,17 +50,17 @@ var buildProcessorTestData = []struct { Name: "remove and upsert 1", agentConf: map[string]interface{}{ "processors": map[string]interface{}{ - constants.LogsPPLPfx + "_a": struct{}{}, - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "a": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, pipelineProcessor: map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, }, outputConf: map[string]interface{}{ "processors": map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, @@ -69,19 +69,19 @@ var buildProcessorTestData = []struct { Name: "remove and upsert 2", agentConf: map[string]interface{}{ "processors": map[string]interface{}{ - "memory_limiter": struct{}{}, - constants.LogsPPLPfx + "_a": struct{}{}, - constants.LogsPPLPfx + "_b": struct{}{}, + "memorylimiter": struct{}{}, + constants.LogsPPLPfx + "a": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, pipelineProcessor: map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, }, outputConf: map[string]interface{}{ "processors": map[string]interface{}{ - "memory_limiter": struct{}{}, - constants.LogsPPLPfx + "_b": struct{}{}, + "memorylimiter": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, @@ -89,7 +89,7 @@ var buildProcessorTestData = []struct { } func TestBuildLogParsingProcessors(t *testing.T) { - for _, test := range buildProcessorTestData { + for , test := range buildProcessorTestData { Convey(test.Name, t, func() { err := buildLogParsingProcessors(test.agentConf, test.pipelineProcessor) So(err, ShouldBeNil) @@ -101,56 +101,56 @@ func TestBuildLogParsingProcessors(t *testing.T) { var BuildLogsPipelineTestData = []struct { Name string - currentPipeline []interface{} - logsPipeline []interface{} - expectedPipeline []interface{} + currentPipeline []string + logsPipeline []string + expectedPipeline []string }{ { Name: "Add new pipelines", - currentPipeline: []interface{}{"processor1", "processor2"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"}, - expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", "processor1", "processor2"}, + currentPipeline: []string{"processor1", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"}, }, { Name: "Add new pipeline and respect custom processors", - currentPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", constants.LogsPPLPfx + "_b", "processor2"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c"}, - expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "processor2"}, + currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"}, }, { Name: "Add new pipeline and respect custom processors in the beginning and middle", - currentPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", "batch"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c"}, - expectedPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch"}, + currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"}, + expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"}, }, { Name: "Remove old pipeline add add new", - currentPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", "processor1", "processor2"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a"}, - expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", "processor2"}, + currentPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"}, }, { Name: "Remove old pipeline from middle", - currentPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a"}, - expectedPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "batch"}, + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a"}, + expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"}, }, { Name: "Remove old pipeline from middle and add new pipeline", - currentPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"}, - expectedPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "processor3", "batch"}, + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"}, + expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c", "processor3", "batch"}, }, { Name: "Remove multiple old pipelines from middle and add multiple new ones", - currentPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_c", "processor4", constants.LogsPPLPfx + "_d", "processor5", "batch"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_a1", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_c1"}, - expectedPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_a1", "processor2", "processor3", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_c1", "processor4", "processor5", "batch"}, + currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"}, + expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"}, }, } func TestBuildLogsPipeline(t *testing.T) { - for _, test := range BuildLogsPipelineTestData { + for , test := range BuildLogsPipelineTestData { Convey(test.Name, t, func() { v, err := buildLogsProcessors(test.currentPipeline, test.logsPipeline) So(err, ShouldBeNil) diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 208089ca8a..191e7c6e5f 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -222,4 +222,4 @@ const ( ) // logsPPLPfx is a short constant for logsPipelinePrefix -const LogsPPLPfx = "logstransform/pipeline" +const LogsPPLPfx = "logstransform/pipeline_"