From 500ab02c4728231e1850560266d809d652b319cb Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Wed, 15 Mar 2023 17:42:24 +0530 Subject: [PATCH 1/6] chore: logs parsing pipeline support in opamp --- pkg/query-service/agentConf/manager.go | 18 ++ pkg/query-service/app/opamp/logspipeline.go | 180 ++++++++++++++++++ .../app/opamp/logspipeline_test.go | 161 ++++++++++++++++ pkg/query-service/app/opamp/opamp_server.go | 95 --------- pkg/query-service/constants/constants.go | 3 + pkg/query-service/model/logparsingpipeline.go | 95 +++++++++ 6 files changed, 457 insertions(+), 95 deletions(-) create mode 100644 pkg/query-service/app/opamp/logspipeline.go create mode 100644 pkg/query-service/app/opamp/logspipeline_test.go create mode 100644 pkg/query-service/model/logparsingpipeline.go diff --git a/pkg/query-service/agentConf/manager.go b/pkg/query-service/agentConf/manager.go index 27478b332a..7544fb7ac1 100644 --- a/pkg/query-service/agentConf/manager.go +++ b/pkg/query-service/agentConf/manager.go @@ -210,3 +210,21 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml)) return nil } + +// UpsertLogParsingProcessors updates the agent with log parsing processors +func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []interface{}) error { + if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) { + return fmt.Errorf("agent updater is busy") + } + defer atomic.StoreUint32(&m.lock, 0) + + // send the changes to opamp. + configHash, err := opamp.UpsertLogsParsingProcessor(context.Background(), config, names, m.OnConfigUpdate) + if err != nil { + zap.S().Errorf("failed to call agent config update for log parsing processor:", err) + return err + } + + m.updateDeployStatus(ctx, ElementTypeLogPipelines, version, string(DeployInitiated), "Deployment started", configHash, string(rawPipelineData)) + return nil +} diff --git a/pkg/query-service/app/opamp/logspipeline.go b/pkg/query-service/app/opamp/logspipeline.go new file mode 100644 index 0000000000..e3f4dbeed0 --- /dev/null +++ b/pkg/query-service/app/opamp/logspipeline.go @@ -0,0 +1,180 @@ +package opamp + +import ( + "context" + "crypto/sha256" + "fmt" + "strings" + "sync" + + "github.com/knadh/koanf/parsers/yaml" + "github.com/open-telemetry/opamp-go/protobufs" + model "go.signoz.io/signoz/pkg/query-service/app/opamp/model" + "go.signoz.io/signoz/pkg/query-service/constants" + "go.uber.org/zap" +) + +var lockLogsPipelineSpec sync.RWMutex + +func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[string]interface{}, parsingProcessorsNames []interface{}, callback func(string, string, error)) (string, error) { + confHash := "" + if opAmpServer == nil { + return confHash, fmt.Errorf("opamp server is down, unable to push config to agent at this moment") + } + + agents := opAmpServer.agents.GetAllAgents() + if len(agents) == 0 { + return confHash, fmt.Errorf("no agents available at the moment") + } + + for _, agent := range agents { + config := agent.EffectiveConfig + c, err := yaml.Parser().Unmarshal([]byte(config)) + if err != nil { + return confHash, err + } + + BuildLogParsingProcessors(c, parsingProcessors) + + // get the processor list + logs := c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] + processors := logs.(map[string]interface{})["processors"].([]interface{}) + + // build the new processor list + updatedProcessorList, _ := buildLogsProcessors(processors, parsingProcessorsNames) + + // add the new processor to the data + c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"].(map[string]interface{})["processors"] = updatedProcessorList + + updatedConf, err := yaml.Parser().Marshal(c) + if err != nil { + return confHash, err + } + + // zap.S().Infof("sending new config", string(updatedConf)) + hash := sha256.New() + _, err = hash.Write(updatedConf) + if err != nil { + return confHash, err + } + agent.EffectiveConfig = string(updatedConf) + err = agent.Upsert() + if err != nil { + return confHash, err + } + + agent.SendToAgent(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "collector.yaml": { + Body: updatedConf, + ContentType: "application/x-yaml", + }, + }, + }, + ConfigHash: hash.Sum(nil), + }, + }) + + if confHash == "" { + confHash = string(hash.Sum(nil)) + model.ListenToConfigUpdate(agent.ID, confHash, callback) + } + } + + return confHash, nil +} + +// check if the processors already exist +// if yes then update the processor. +// if something doesn't exists then remove it. +func BuildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error { + agentProcessors := agentConf["processors"].(map[string]interface{}) + exists := map[string]struct{}{} + for key, params := range parsingProcessors { + agentProcessors[key] = params + exists[key] = struct{}{} + } + // remove the old unwanted processors + for k := range agentProcessors { + if _, ok := exists[k]; !ok && strings.HasPrefix(k, constants.LogsPPLPfx) { + delete(agentProcessors, k) + } + } + agentConf["processors"] = agentProcessors + return nil +} + +func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{}) ([]interface{}, error) { + lockLogsPipelineSpec.Lock() + defer lockLogsPipelineSpec.Unlock() + + exists := map[string]struct{}{} + for _, v := range logsParserPipeline { + exists[v.(string)] = struct{}{} + } + + // removed the old processors which are not used + var pipeline []interface{} + for _, v := range current { + k := v.(string) + if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) { + pipeline = append(pipeline, v) + } + } + + // create a reverse map of existing config processors and their position + existing := map[string]int{} + for i, p := range current { + name := p.(string) + existing[name] = i + } + + // create mapping from our tracesPipelinePlan (processors managed by us) to position in existing processors (from current config) + // this means, if "batch" holds position 3 in the current effective config, and 2 in our config, the map will be [2]: 3 + specVsExistingMap := map[int]int{} + + // go through plan and map its elements to current positions in effective config + for i, m := range logsParserPipeline { + if loc, ok := existing[m.(string)]; ok { + specVsExistingMap[i] = loc + } + } + + lastMatched := 0 + + // go through plan again in the increasing order + for i := 0; i < len(logsParserPipeline); i++ { + m := logsParserPipeline[i] + + if loc, ok := specVsExistingMap[i]; ok { + lastMatched = loc + 1 + } else { + if lastMatched <= 0 { + zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position 0:", m) + pipeline = append([]interface{}{m}, pipeline[lastMatched:]...) + lastMatched++ + } else { + zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position :", lastMatched, " ", m) + + prior := make([]interface{}, len(pipeline[:lastMatched])) + next := make([]interface{}, len(pipeline[lastMatched:])) + + copy(prior, pipeline[:lastMatched]) + copy(next, pipeline[lastMatched:]) + + pipeline = append(prior, m) + pipeline = append(pipeline, next...) + } + } + } + + if checkDuplicates(pipeline) { + // duplicates are most likely because the processor sequence in effective config conflicts + // with the planned sequence as per planned pipeline + return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline) + } + + return pipeline, nil +} diff --git a/pkg/query-service/app/opamp/logspipeline_test.go b/pkg/query-service/app/opamp/logspipeline_test.go new file mode 100644 index 0000000000..d612959d19 --- /dev/null +++ b/pkg/query-service/app/opamp/logspipeline_test.go @@ -0,0 +1,161 @@ +package opamp + +import ( + "fmt" + "testing" + + . "github.com/smartystreets/goconvey/convey" + "go.signoz.io/signoz/pkg/query-service/constants" +) + +var BuildProcessorTestData = []struct { + Name string + agentConf map[string]interface{} + pipelineProcessor map[string]interface{} + outputConf map[string]interface{} +}{ + { + Name: "Add", + agentConf: map[string]interface{}{ + "processors": map[string]interface{}{ + "batch": struct{}{}, + }, + }, + pipelineProcessor: map[string]interface{}{ + constants.LogsPPLPfx + "_b": struct{}{}, + }, + outputConf: map[string]interface{}{ + "processors": map[string]interface{}{ + constants.LogsPPLPfx + "_b": struct{}{}, + "batch": struct{}{}, + }, + }, + }, + { + Name: "Remove", + agentConf: map[string]interface{}{ + "processors": map[string]interface{}{ + constants.LogsPPLPfx + "_b": struct{}{}, + "batch": struct{}{}, + }, + }, + pipelineProcessor: map[string]interface{}{}, + outputConf: map[string]interface{}{ + "processors": map[string]interface{}{ + "batch": struct{}{}, + }, + }, + }, + { + Name: "remove and upsert 1", + agentConf: map[string]interface{}{ + "processors": map[string]interface{}{ + constants.LogsPPLPfx + "_a": struct{}{}, + constants.LogsPPLPfx + "_b": struct{}{}, + "batch": struct{}{}, + }, + }, + pipelineProcessor: map[string]interface{}{ + constants.LogsPPLPfx + "_b": struct{}{}, + }, + outputConf: map[string]interface{}{ + "processors": map[string]interface{}{ + constants.LogsPPLPfx + "_b": struct{}{}, + "batch": struct{}{}, + }, + }, + }, + { + Name: "remove and upsert 2", + agentConf: map[string]interface{}{ + "processors": map[string]interface{}{ + "memory_limiter": struct{}{}, + constants.LogsPPLPfx + "_a": struct{}{}, + constants.LogsPPLPfx + "_b": struct{}{}, + "batch": struct{}{}, + }, + }, + pipelineProcessor: map[string]interface{}{ + constants.LogsPPLPfx + "_b": struct{}{}, + }, + outputConf: map[string]interface{}{ + "processors": map[string]interface{}{ + "memory_limiter": struct{}{}, + constants.LogsPPLPfx + "_b": struct{}{}, + "batch": struct{}{}, + }, + }, + }, +} + +func TestBuildLogParsingProcessors(t *testing.T) { + for _, test := range BuildProcessorTestData { + Convey(test.Name, t, func() { + err := BuildLogParsingProcessors(test.agentConf, test.pipelineProcessor) + So(err, ShouldBeNil) + So(test.agentConf, ShouldResemble, test.outputConf) + }) + } + +} + +var BuildLogsPipelineTestData = []struct { + Name string + currentPipeline []interface{} + logsPipeline []interface{} + expectedPipeline []interface{} +}{ + { + Name: "Add new pipelines", + currentPipeline: []interface{}{"processor1", "processor2"}, + logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"}, + expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", "processor1", "processor2"}, + }, + { + Name: "Add new pipeline and respect custom processors", + currentPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", constants.LogsPPLPfx + "_b", "processor2"}, + logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c"}, + expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "processor2"}, + }, + { + Name: "Add new pipeline and respect custom processors in the beginning and middle", + currentPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", "batch"}, + logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c"}, + expectedPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch"}, + }, + { + Name: "Remove old pipeline add add new", + currentPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", "processor1", "processor2"}, + logsPipeline: []interface{}{constants.LogsPPLPfx + "_a"}, + expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", "processor2"}, + }, + { + Name: "Remove old pipeline from middle", + currentPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, + logsPipeline: []interface{}{constants.LogsPPLPfx + "_a"}, + expectedPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "batch"}, + }, + { + Name: "Remove old pipeline from middle and add new pipeline", + currentPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, + logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"}, + expectedPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "processor3", "batch"}, + }, + { + Name: "Remove multiple old pipelines from middle and add multiple new ones", + currentPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_c", "processor4", constants.LogsPPLPfx + "_d", "processor5", "batch"}, + logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_a1", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_c1"}, + expectedPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_a1", "processor2", "processor3", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_c1", "processor4", "processor5", "batch"}, + }, +} + +func TestBuildLogsPipeline(t *testing.T) { + for _, test := range BuildLogsPipelineTestData { + Convey(test.Name, t, func() { + v, err := buildLogsProcessors(test.currentPipeline, test.logsPipeline) + So(err, ShouldBeNil) + fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline) + So(v, ShouldResemble, test.expectedPipeline) + }) + } +} diff --git a/pkg/query-service/app/opamp/opamp_server.go b/pkg/query-service/app/opamp/opamp_server.go index 237b07f121..cee50ba90c 100644 --- a/pkg/query-service/app/opamp/opamp_server.go +++ b/pkg/query-service/app/opamp/opamp_server.go @@ -2,15 +2,10 @@ package opamp import ( "context" - "crypto/sha256" - "strings" - - "github.com/knadh/koanf/parsers/yaml" "github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/server" "github.com/open-telemetry/opamp-go/server/types" - "go.opentelemetry.io/collector/confmap" model "go.signoz.io/signoz/pkg/query-service/app/opamp/model" "go.uber.org/zap" @@ -112,93 +107,3 @@ func Ready() bool { func Subscribe(agentId string, hash string, f model.OnChangeCallback) { model.ListenToConfigUpdate(agentId, hash, f) } - -func UpsertProcessor(ctx context.Context, processors map[string]interface{}, names []interface{}) error { - x := map[string]interface{}{ - "processors": processors, - } - - newConf := confmap.NewFromStringMap(x) - - agents := opAmpServer.agents.GetAllAgents() - for _, agent := range agents { - config := agent.EffectiveConfig - c, err := yaml.Parser().Unmarshal([]byte(config)) - if err != nil { - return err - } - agentConf := confmap.NewFromStringMap(c) - - err = agentConf.Merge(newConf) - if err != nil { - return err - } - - service := agentConf.Get("service") - - logs := service.(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] - processors := logs.(map[string]interface{})["processors"].([]interface{}) - userProcessors := []interface{}{} - // remove old ones - for _, v := range processors { - if !strings.HasPrefix(v.(string), "logstransform/pipeline_") { - userProcessors = append(userProcessors, v) - } - } - // all user processors are pushed after pipelines - processors = append(names, userProcessors...) - - service.(map[string]interface{})["pipelines"].(map[string]interface{})["logs"].(map[string]interface{})["processors"] = processors - - s := map[string]interface{}{ - "service": map[string]interface{}{ - "pipelines": map[string]interface{}{ - "logs": map[string]interface{}{ - "processors": processors, - }, - }, - }, - } - - serviceC := confmap.NewFromStringMap(s) - - err = agentConf.Merge(serviceC) - if err != nil { - return err - } - - // ------ complete adding processor - configR, err := yaml.Parser().Marshal(agentConf.ToStringMap()) - if err != nil { - return err - } - - zap.S().Infof("sending new config", string(configR)) - hash := sha256.New() - _, err = hash.Write(configR) - if err != nil { - return err - } - agent.EffectiveConfig = string(configR) - err = agent.Upsert() - if err != nil { - return err - } - - agent.SendToAgent(&protobufs.ServerToAgent{ - RemoteConfig: &protobufs.AgentRemoteConfig{ - Config: &protobufs.AgentConfigMap{ - ConfigMap: map[string]*protobufs.AgentConfigFile{ - "collector.yaml": { - Body: configR, - ContentType: "application/x-yaml", - }, - }, - }, - ConfigHash: hash.Sum(nil), - }, - }) - } - - return nil -} diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index a18a69ac9a..208089ca8a 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -220,3 +220,6 @@ const ( NumberTagMapCol = "numberTagMap" BoolTagMapCol = "boolTagMap" ) + +// logsPPLPfx is a short constant for logsPipelinePrefix +const LogsPPLPfx = "logstransform/pipeline" diff --git a/pkg/query-service/model/logparsingpipeline.go b/pkg/query-service/model/logparsingpipeline.go new file mode 100644 index 0000000000..3eec51bdc3 --- /dev/null +++ b/pkg/query-service/model/logparsingpipeline.go @@ -0,0 +1,95 @@ +package model + +import ( + "encoding/json" + "time" + + "github.com/pkg/errors" +) + +// Pipeline is stored and also deployed finally to collector config +type Pipeline struct { + Id string `json:"id,omitempty" db:"id"` + OrderId int `json:"orderId" db:"order_id"` + Name string `json:"name,omitempty" db:"name"` + Alias string `json:"alias" db:"alias"` + Description *string `json:"description" db:"description"` + Enabled bool `json:"enabled" db:"enabled"` + Filter string `json:"filter" db:"filter"` + + // configuration for pipeline + RawConfig string `db:"config_json" json:"-"` + + Config []PipelineOperator `json:"config"` + + // Updater not required as any change will result in new version + Creator +} + +type Creator struct { + CreatedBy string `json:"createdBy" db:"created_by"` + CreatedAt time.Time `json:"createdAt" db:"created_at"` +} + +type Processor struct { + Operators []PipelineOperator `json:"operators" yaml:"operators"` +} + +type PipelineOperator struct { + Type string `json:"type" yaml:"type"` + ID string `json:"id,omitempty" yaml:"id,omitempty"` + Output string `json:"output,omitempty" yaml:"output,omitempty"` + OnError string `json:"on_error,omitempty" yaml:"on_error,omitempty"` + + // don't need the following in the final config + OrderId int `json:"orderId" yaml:"-"` + Enabled bool `json:"enabled" yaml:"-"` + Name string `json:"name,omitempty" yaml:"-"` + + // optional keys depending on the type + ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"` + Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"` + Regex string `json:"regex,omitempty" yaml:"regex,omitempty"` + ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"` + Timestamp *TimestampParser `json:"timestamp,omitempty" yaml:"timestamp,omitempty"` + TraceParser *TraceParser `json:"trace_parser,omitempty" yaml:"trace_parser,omitempty"` + Field string `json:"field,omitempty" yaml:"field,omitempty"` + Value string `json:"value,omitempty" yaml:"value,omitempty"` + From string `json:"from,omitempty" yaml:"from,omitempty"` + To string `json:"to,omitempty" yaml:"to,omitempty"` + Expr string `json:"expr,omitempty" yaml:"expr,omitempty"` + Routes *[]Route `json:"routes,omitempty" yaml:"routes,omitempty"` + Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"` + Default string `json:"default,omitempty" yaml:"default,omitempty"` +} + +type TimestampParser struct { + Layout string `json:"layout" yaml:"layout"` + LayoutType string `json:"layout_type" yaml:"layout_type"` + ParseFrom string `json:"parse_from" yaml:"parse_from"` +} + +type TraceParser struct { + TraceId *ParseFrom `json:"trace_id,omitempty" yaml:"trace_id,omitempty"` + SpanId *ParseFrom `json:"span_id,omitempty" yaml:"span_id,omitempty"` + TraceFlags *ParseFrom `json:"trace_flags,omitempty" yaml:"trace_flags,omitempty"` +} + +type ParseFrom struct { + ParseFrom string `json:"parse_from" yaml:"parse_from"` +} + +type Route struct { + Output string `json:"output" yaml:"output"` + Expr string `json:"expr" yaml:"expr"` +} + +func (i *Pipeline) ParseRawConfig() error { + c := []PipelineOperator{} + err := json.Unmarshal([]byte(i.RawConfig), &c) + if err != nil { + return errors.Wrap(err, "failed to parse ingestion rule config") + } + i.Config = c + return nil +} From 755d64061e95019c53d4c327eff34d3b53536db6 Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Wed, 15 Mar 2023 17:55:02 +0530 Subject: [PATCH 2/6] fix: minor spelling fixes --- pkg/query-service/app/opamp/logspipeline.go | 6 +++--- pkg/query-service/app/opamp/logspipeline_test.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/query-service/app/opamp/logspipeline.go b/pkg/query-service/app/opamp/logspipeline.go index e3f4dbeed0..c1dffa0980 100644 --- a/pkg/query-service/app/opamp/logspipeline.go +++ b/pkg/query-service/app/opamp/logspipeline.go @@ -34,7 +34,7 @@ func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[strin return confHash, err } - BuildLogParsingProcessors(c, parsingProcessors) + buildLogParsingProcessors(c, parsingProcessors) // get the processor list logs := c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] @@ -89,7 +89,7 @@ func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[strin // check if the processors already exist // if yes then update the processor. // if something doesn't exists then remove it. -func BuildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error { +func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error { agentProcessors := agentConf["processors"].(map[string]interface{}) exists := map[string]struct{}{} for key, params := range parsingProcessors { @@ -131,7 +131,7 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} existing[name] = i } - // create mapping from our tracesPipelinePlan (processors managed by us) to position in existing processors (from current config) + // create mapping from our logsParserPipeline to position in existing processors (from current config) // this means, if "batch" holds position 3 in the current effective config, and 2 in our config, the map will be [2]: 3 specVsExistingMap := map[int]int{} diff --git a/pkg/query-service/app/opamp/logspipeline_test.go b/pkg/query-service/app/opamp/logspipeline_test.go index d612959d19..703d5ecb6e 100644 --- a/pkg/query-service/app/opamp/logspipeline_test.go +++ b/pkg/query-service/app/opamp/logspipeline_test.go @@ -8,7 +8,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/constants" ) -var BuildProcessorTestData = []struct { +var buildProcessorTestData = []struct { Name string agentConf map[string]interface{} pipelineProcessor map[string]interface{} @@ -89,9 +89,9 @@ var BuildProcessorTestData = []struct { } func TestBuildLogParsingProcessors(t *testing.T) { - for _, test := range BuildProcessorTestData { + for _, test := range buildProcessorTestData { Convey(test.Name, t, func() { - err := BuildLogParsingProcessors(test.agentConf, test.pipelineProcessor) + err := buildLogParsingProcessors(test.agentConf, test.pipelineProcessor) So(err, ShouldBeNil) So(test.agentConf, ShouldResemble, test.outputConf) }) From e1219ea94219c63cafe8482b97f1f7a5cd94704a Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Thu, 16 Mar 2023 10:20:57 +0530 Subject: [PATCH 3/6] fix: use structs instead of interface --- pkg/query-service/agentConf/manager.go | 2 +- pkg/query-service/app/opamp/logspipeline.go | 79 +++++++++++++++---- .../app/opamp/logspipeline_test.go | 78 +++++++++--------- pkg/query-service/constants/constants.go | 2 +- 4 files changed, 103 insertions(+), 58 deletions(-) diff --git a/pkg/query-service/agentConf/manager.go b/pkg/query-service/agentConf/manager.go index 8abcbacaa7..b26d382070 100644 --- a/pkg/query-service/agentConf/manager.go +++ b/pkg/query-service/agentConf/manager.go @@ -212,7 +212,7 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi } // UpsertLogParsingProcessors updates the agent with log parsing processors -func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []interface{}) error { +func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []string) error { if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) { return fmt.Errorf("agent updater is busy") } diff --git a/pkg/query-service/app/opamp/logspipeline.go b/pkg/query-service/app/opamp/logspipeline.go index c1dffa0980..f2467e05e0 100644 --- a/pkg/query-service/app/opamp/logspipeline.go +++ b/pkg/query-service/app/opamp/logspipeline.go @@ -3,6 +3,7 @@ package opamp import ( "context" "crypto/sha256" + "encoding/json" "fmt" "strings" "sync" @@ -16,7 +17,7 @@ import ( var lockLogsPipelineSpec sync.RWMutex -func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[string]interface{}, parsingProcessorsNames []interface{}, callback func(string, string, error)) (string, error) { +func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[string]interface{}, parsingProcessorsNames []string, callback func(string, string, error)) (string, error) { confHash := "" if opAmpServer == nil { return confHash, fmt.Errorf("opamp server is down, unable to push config to agent at this moment") @@ -36,15 +37,20 @@ func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[strin buildLogParsingProcessors(c, parsingProcessors) - // get the processor list - logs := c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] - processors := logs.(map[string]interface{})["processors"].([]interface{}) + p, err := getOtelPipelinFromConfig(c) + if err != nil { + return confHash, err + } + if p.Pipelines.Logs == nil { + return confHash, fmt.Errorf("logs pipeline doesn't exist") + } // build the new processor list - updatedProcessorList, _ := buildLogsProcessors(processors, parsingProcessorsNames) + updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, parsingProcessorsNames) + p.Pipelines.Logs.Processors = updatedProcessorList - // add the new processor to the data - c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"].(map[string]interface{})["processors"] = updatedProcessorList + // add the new processor to the data ( no checks required as the keys will exists) + c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs updatedConf, err := yaml.Parser().Marshal(c) if err != nil { @@ -106,19 +112,44 @@ func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface return nil } -func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{}) ([]interface{}, error) { +type otelPipeline struct { + Pipelines struct { + Logs *struct { + Exporters []string `json:"exporters" yaml:"exporters"` + Processors []string `json:"processors" yaml:"processors"` + Receivers []string `json:"receivers" yaml:"receivers"` + } `json:"logs" yaml:"logs"` + } `json:"pipelines" yaml:"pipelines"` +} + +func getOtelPipelinFromConfig(config map[string]interface{}) (*otelPipeline, error) { + if _, ok := config["service"]; !ok { + return nil, fmt.Errorf("service not found in OTEL config") + } + b, err := json.Marshal(config["service"]) + if err != nil { + return nil, err + } + p := otelPipeline{} + if err := json.Unmarshal(b, &p); err != nil { + return nil, err + } + return &p, nil +} + +func buildLogsProcessors(current []string, logsParserPipeline []string) ([]string, error) { lockLogsPipelineSpec.Lock() defer lockLogsPipelineSpec.Unlock() exists := map[string]struct{}{} for _, v := range logsParserPipeline { - exists[v.(string)] = struct{}{} + exists[v] = struct{}{} } // removed the old processors which are not used - var pipeline []interface{} + var pipeline []string for _, v := range current { - k := v.(string) + k := v if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) { pipeline = append(pipeline, v) } @@ -127,7 +158,7 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} // create a reverse map of existing config processors and their position existing := map[string]int{} for i, p := range current { - name := p.(string) + name := p existing[name] = i } @@ -137,7 +168,7 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} // go through plan and map its elements to current positions in effective config for i, m := range logsParserPipeline { - if loc, ok := existing[m.(string)]; ok { + if loc, ok := existing[m]; ok { specVsExistingMap[i] = loc } } @@ -153,13 +184,13 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} } else { if lastMatched <= 0 { zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position 0:", m) - pipeline = append([]interface{}{m}, pipeline[lastMatched:]...) + pipeline = append([]string{m}, pipeline[lastMatched:]...) lastMatched++ } else { zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position :", lastMatched, " ", m) - prior := make([]interface{}, len(pipeline[:lastMatched])) - next := make([]interface{}, len(pipeline[lastMatched:])) + prior := make([]string, len(pipeline[:lastMatched])) + next := make([]string, len(pipeline[lastMatched:])) copy(prior, pipeline[:lastMatched]) copy(next, pipeline[lastMatched:]) @@ -170,7 +201,7 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} } } - if checkDuplicates(pipeline) { + if checkDuplicateString(pipeline) { // duplicates are most likely because the processor sequence in effective config conflicts // with the planned sequence as per planned pipeline return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline) @@ -178,3 +209,17 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} return pipeline, nil } + +func checkDuplicateString(pipeline []string) bool { + exists := make(map[string]bool, len(pipeline)) + zap.S().Debugf("checking duplicate processors in the pipeline:", pipeline) + for _, processor := range pipeline { + name := processor + if _, ok := exists[name]; ok { + return true + } + + exists[name] = true + } + return false +} diff --git a/pkg/query-service/app/opamp/logspipeline_test.go b/pkg/query-service/app/opamp/logspipeline_test.go index 703d5ecb6e..011e33473a 100644 --- a/pkg/query-service/app/opamp/logspipeline_test.go +++ b/pkg/query-service/app/opamp/logspipeline_test.go @@ -22,11 +22,11 @@ var buildProcessorTestData = []struct { }, }, pipelineProcessor: map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, }, outputConf: map[string]interface{}{ "processors": map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, @@ -35,7 +35,7 @@ var buildProcessorTestData = []struct { Name: "Remove", agentConf: map[string]interface{}{ "processors": map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, @@ -50,17 +50,17 @@ var buildProcessorTestData = []struct { Name: "remove and upsert 1", agentConf: map[string]interface{}{ "processors": map[string]interface{}{ - constants.LogsPPLPfx + "_a": struct{}{}, - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "a": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, pipelineProcessor: map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, }, outputConf: map[string]interface{}{ "processors": map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, @@ -69,19 +69,19 @@ var buildProcessorTestData = []struct { Name: "remove and upsert 2", agentConf: map[string]interface{}{ "processors": map[string]interface{}{ - "memory_limiter": struct{}{}, - constants.LogsPPLPfx + "_a": struct{}{}, - constants.LogsPPLPfx + "_b": struct{}{}, + "memorylimiter": struct{}{}, + constants.LogsPPLPfx + "a": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, pipelineProcessor: map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, }, outputConf: map[string]interface{}{ "processors": map[string]interface{}{ - "memory_limiter": struct{}{}, - constants.LogsPPLPfx + "_b": struct{}{}, + "memorylimiter": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, @@ -89,7 +89,7 @@ var buildProcessorTestData = []struct { } func TestBuildLogParsingProcessors(t *testing.T) { - for _, test := range buildProcessorTestData { + for , test := range buildProcessorTestData { Convey(test.Name, t, func() { err := buildLogParsingProcessors(test.agentConf, test.pipelineProcessor) So(err, ShouldBeNil) @@ -101,56 +101,56 @@ func TestBuildLogParsingProcessors(t *testing.T) { var BuildLogsPipelineTestData = []struct { Name string - currentPipeline []interface{} - logsPipeline []interface{} - expectedPipeline []interface{} + currentPipeline []string + logsPipeline []string + expectedPipeline []string }{ { Name: "Add new pipelines", - currentPipeline: []interface{}{"processor1", "processor2"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"}, - expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", "processor1", "processor2"}, + currentPipeline: []string{"processor1", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"}, }, { Name: "Add new pipeline and respect custom processors", - currentPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", constants.LogsPPLPfx + "_b", "processor2"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c"}, - expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "processor2"}, + currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"}, }, { Name: "Add new pipeline and respect custom processors in the beginning and middle", - currentPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", "batch"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c"}, - expectedPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch"}, + currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"}, + expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"}, }, { Name: "Remove old pipeline add add new", - currentPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", "processor1", "processor2"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a"}, - expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", "processor2"}, + currentPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"}, }, { Name: "Remove old pipeline from middle", - currentPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a"}, - expectedPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "batch"}, + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a"}, + expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"}, }, { Name: "Remove old pipeline from middle and add new pipeline", - currentPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"}, - expectedPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "processor3", "batch"}, + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"}, + expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c", "processor3", "batch"}, }, { Name: "Remove multiple old pipelines from middle and add multiple new ones", - currentPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_c", "processor4", constants.LogsPPLPfx + "_d", "processor5", "batch"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_a1", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_c1"}, - expectedPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_a1", "processor2", "processor3", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_c1", "processor4", "processor5", "batch"}, + currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"}, + expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"}, }, } func TestBuildLogsPipeline(t *testing.T) { - for _, test := range BuildLogsPipelineTestData { + for , test := range BuildLogsPipelineTestData { Convey(test.Name, t, func() { v, err := buildLogsProcessors(test.currentPipeline, test.logsPipeline) So(err, ShouldBeNil) diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 208089ca8a..191e7c6e5f 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -222,4 +222,4 @@ const ( ) // logsPPLPfx is a short constant for logsPipelinePrefix -const LogsPPLPfx = "logstransform/pipeline" +const LogsPPLPfx = "logstransform/pipeline_" From bac717e9e6891715d980252135d922e0ce76b7e9 Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Thu, 16 Mar 2023 10:20:57 +0530 Subject: [PATCH 4/6] fix: use structs instead of interface --- pkg/query-service/agentConf/manager.go | 2 +- pkg/query-service/app/opamp/logspipeline.go | 79 +++++++++++++++---- .../app/opamp/logspipeline_test.go | 78 +++++++++--------- pkg/query-service/constants/constants.go | 2 +- 4 files changed, 103 insertions(+), 58 deletions(-) diff --git a/pkg/query-service/agentConf/manager.go b/pkg/query-service/agentConf/manager.go index 8abcbacaa7..b26d382070 100644 --- a/pkg/query-service/agentConf/manager.go +++ b/pkg/query-service/agentConf/manager.go @@ -212,7 +212,7 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi } // UpsertLogParsingProcessors updates the agent with log parsing processors -func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []interface{}) error { +func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []string) error { if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) { return fmt.Errorf("agent updater is busy") } diff --git a/pkg/query-service/app/opamp/logspipeline.go b/pkg/query-service/app/opamp/logspipeline.go index c1dffa0980..f2467e05e0 100644 --- a/pkg/query-service/app/opamp/logspipeline.go +++ b/pkg/query-service/app/opamp/logspipeline.go @@ -3,6 +3,7 @@ package opamp import ( "context" "crypto/sha256" + "encoding/json" "fmt" "strings" "sync" @@ -16,7 +17,7 @@ import ( var lockLogsPipelineSpec sync.RWMutex -func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[string]interface{}, parsingProcessorsNames []interface{}, callback func(string, string, error)) (string, error) { +func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[string]interface{}, parsingProcessorsNames []string, callback func(string, string, error)) (string, error) { confHash := "" if opAmpServer == nil { return confHash, fmt.Errorf("opamp server is down, unable to push config to agent at this moment") @@ -36,15 +37,20 @@ func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[strin buildLogParsingProcessors(c, parsingProcessors) - // get the processor list - logs := c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] - processors := logs.(map[string]interface{})["processors"].([]interface{}) + p, err := getOtelPipelinFromConfig(c) + if err != nil { + return confHash, err + } + if p.Pipelines.Logs == nil { + return confHash, fmt.Errorf("logs pipeline doesn't exist") + } // build the new processor list - updatedProcessorList, _ := buildLogsProcessors(processors, parsingProcessorsNames) + updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, parsingProcessorsNames) + p.Pipelines.Logs.Processors = updatedProcessorList - // add the new processor to the data - c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"].(map[string]interface{})["processors"] = updatedProcessorList + // add the new processor to the data ( no checks required as the keys will exists) + c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs updatedConf, err := yaml.Parser().Marshal(c) if err != nil { @@ -106,19 +112,44 @@ func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface return nil } -func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{}) ([]interface{}, error) { +type otelPipeline struct { + Pipelines struct { + Logs *struct { + Exporters []string `json:"exporters" yaml:"exporters"` + Processors []string `json:"processors" yaml:"processors"` + Receivers []string `json:"receivers" yaml:"receivers"` + } `json:"logs" yaml:"logs"` + } `json:"pipelines" yaml:"pipelines"` +} + +func getOtelPipelinFromConfig(config map[string]interface{}) (*otelPipeline, error) { + if _, ok := config["service"]; !ok { + return nil, fmt.Errorf("service not found in OTEL config") + } + b, err := json.Marshal(config["service"]) + if err != nil { + return nil, err + } + p := otelPipeline{} + if err := json.Unmarshal(b, &p); err != nil { + return nil, err + } + return &p, nil +} + +func buildLogsProcessors(current []string, logsParserPipeline []string) ([]string, error) { lockLogsPipelineSpec.Lock() defer lockLogsPipelineSpec.Unlock() exists := map[string]struct{}{} for _, v := range logsParserPipeline { - exists[v.(string)] = struct{}{} + exists[v] = struct{}{} } // removed the old processors which are not used - var pipeline []interface{} + var pipeline []string for _, v := range current { - k := v.(string) + k := v if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) { pipeline = append(pipeline, v) } @@ -127,7 +158,7 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} // create a reverse map of existing config processors and their position existing := map[string]int{} for i, p := range current { - name := p.(string) + name := p existing[name] = i } @@ -137,7 +168,7 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} // go through plan and map its elements to current positions in effective config for i, m := range logsParserPipeline { - if loc, ok := existing[m.(string)]; ok { + if loc, ok := existing[m]; ok { specVsExistingMap[i] = loc } } @@ -153,13 +184,13 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} } else { if lastMatched <= 0 { zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position 0:", m) - pipeline = append([]interface{}{m}, pipeline[lastMatched:]...) + pipeline = append([]string{m}, pipeline[lastMatched:]...) lastMatched++ } else { zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position :", lastMatched, " ", m) - prior := make([]interface{}, len(pipeline[:lastMatched])) - next := make([]interface{}, len(pipeline[lastMatched:])) + prior := make([]string, len(pipeline[:lastMatched])) + next := make([]string, len(pipeline[lastMatched:])) copy(prior, pipeline[:lastMatched]) copy(next, pipeline[lastMatched:]) @@ -170,7 +201,7 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} } } - if checkDuplicates(pipeline) { + if checkDuplicateString(pipeline) { // duplicates are most likely because the processor sequence in effective config conflicts // with the planned sequence as per planned pipeline return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline) @@ -178,3 +209,17 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{} return pipeline, nil } + +func checkDuplicateString(pipeline []string) bool { + exists := make(map[string]bool, len(pipeline)) + zap.S().Debugf("checking duplicate processors in the pipeline:", pipeline) + for _, processor := range pipeline { + name := processor + if _, ok := exists[name]; ok { + return true + } + + exists[name] = true + } + return false +} diff --git a/pkg/query-service/app/opamp/logspipeline_test.go b/pkg/query-service/app/opamp/logspipeline_test.go index 703d5ecb6e..011e33473a 100644 --- a/pkg/query-service/app/opamp/logspipeline_test.go +++ b/pkg/query-service/app/opamp/logspipeline_test.go @@ -22,11 +22,11 @@ var buildProcessorTestData = []struct { }, }, pipelineProcessor: map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, }, outputConf: map[string]interface{}{ "processors": map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, @@ -35,7 +35,7 @@ var buildProcessorTestData = []struct { Name: "Remove", agentConf: map[string]interface{}{ "processors": map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, @@ -50,17 +50,17 @@ var buildProcessorTestData = []struct { Name: "remove and upsert 1", agentConf: map[string]interface{}{ "processors": map[string]interface{}{ - constants.LogsPPLPfx + "_a": struct{}{}, - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "a": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, pipelineProcessor: map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, }, outputConf: map[string]interface{}{ "processors": map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, @@ -69,19 +69,19 @@ var buildProcessorTestData = []struct { Name: "remove and upsert 2", agentConf: map[string]interface{}{ "processors": map[string]interface{}{ - "memory_limiter": struct{}{}, - constants.LogsPPLPfx + "_a": struct{}{}, - constants.LogsPPLPfx + "_b": struct{}{}, + "memorylimiter": struct{}{}, + constants.LogsPPLPfx + "a": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, pipelineProcessor: map[string]interface{}{ - constants.LogsPPLPfx + "_b": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, }, outputConf: map[string]interface{}{ "processors": map[string]interface{}{ - "memory_limiter": struct{}{}, - constants.LogsPPLPfx + "_b": struct{}{}, + "memorylimiter": struct{}{}, + constants.LogsPPLPfx + "b": struct{}{}, "batch": struct{}{}, }, }, @@ -89,7 +89,7 @@ var buildProcessorTestData = []struct { } func TestBuildLogParsingProcessors(t *testing.T) { - for _, test := range buildProcessorTestData { + for , test := range buildProcessorTestData { Convey(test.Name, t, func() { err := buildLogParsingProcessors(test.agentConf, test.pipelineProcessor) So(err, ShouldBeNil) @@ -101,56 +101,56 @@ func TestBuildLogParsingProcessors(t *testing.T) { var BuildLogsPipelineTestData = []struct { Name string - currentPipeline []interface{} - logsPipeline []interface{} - expectedPipeline []interface{} + currentPipeline []string + logsPipeline []string + expectedPipeline []string }{ { Name: "Add new pipelines", - currentPipeline: []interface{}{"processor1", "processor2"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"}, - expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", "processor1", "processor2"}, + currentPipeline: []string{"processor1", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"}, }, { Name: "Add new pipeline and respect custom processors", - currentPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", constants.LogsPPLPfx + "_b", "processor2"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c"}, - expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "processor2"}, + currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"}, }, { Name: "Add new pipeline and respect custom processors in the beginning and middle", - currentPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", "batch"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c"}, - expectedPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch"}, + currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"}, + expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"}, }, { Name: "Remove old pipeline add add new", - currentPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", "processor1", "processor2"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a"}, - expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", "processor2"}, + currentPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"}, }, { Name: "Remove old pipeline from middle", - currentPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a"}, - expectedPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "batch"}, + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a"}, + expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"}, }, { Name: "Remove old pipeline from middle and add new pipeline", - currentPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"}, - expectedPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "processor3", "batch"}, + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"}, + expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c", "processor3", "batch"}, }, { Name: "Remove multiple old pipelines from middle and add multiple new ones", - currentPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_c", "processor4", constants.LogsPPLPfx + "_d", "processor5", "batch"}, - logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_a1", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_c1"}, - expectedPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_a1", "processor2", "processor3", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_c1", "processor4", "processor5", "batch"}, + currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"}, + expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"}, }, } func TestBuildLogsPipeline(t *testing.T) { - for _, test := range BuildLogsPipelineTestData { + for , test := range BuildLogsPipelineTestData { Convey(test.Name, t, func() { v, err := buildLogsProcessors(test.currentPipeline, test.logsPipeline) So(err, ShouldBeNil) diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 208089ca8a..191e7c6e5f 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -222,4 +222,4 @@ const ( ) // logsPPLPfx is a short constant for logsPipelinePrefix -const LogsPPLPfx = "logstransform/pipeline" +const LogsPPLPfx = "logstransform/pipeline_" From 7367f8dd4b58b5550fcb540fa535c9cbee12c687 Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Thu, 16 Mar 2023 10:24:20 +0530 Subject: [PATCH 5/6] fix: tests fixed --- pkg/query-service/app/opamp/logspipeline_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/query-service/app/opamp/logspipeline_test.go b/pkg/query-service/app/opamp/logspipeline_test.go index 011e33473a..26dec4e34d 100644 --- a/pkg/query-service/app/opamp/logspipeline_test.go +++ b/pkg/query-service/app/opamp/logspipeline_test.go @@ -27,7 +27,7 @@ var buildProcessorTestData = []struct { outputConf: map[string]interface{}{ "processors": map[string]interface{}{ constants.LogsPPLPfx + "b": struct{}{}, - "batch": struct{}{}, + "batch": struct{}{}, }, }, }, @@ -36,7 +36,7 @@ var buildProcessorTestData = []struct { agentConf: map[string]interface{}{ "processors": map[string]interface{}{ constants.LogsPPLPfx + "b": struct{}{}, - "batch": struct{}{}, + "batch": struct{}{}, }, }, pipelineProcessor: map[string]interface{}{}, @@ -52,7 +52,7 @@ var buildProcessorTestData = []struct { "processors": map[string]interface{}{ constants.LogsPPLPfx + "a": struct{}{}, constants.LogsPPLPfx + "b": struct{}{}, - "batch": struct{}{}, + "batch": struct{}{}, }, }, pipelineProcessor: map[string]interface{}{ @@ -61,7 +61,7 @@ var buildProcessorTestData = []struct { outputConf: map[string]interface{}{ "processors": map[string]interface{}{ constants.LogsPPLPfx + "b": struct{}{}, - "batch": struct{}{}, + "batch": struct{}{}, }, }, }, @@ -72,7 +72,7 @@ var buildProcessorTestData = []struct { "memorylimiter": struct{}{}, constants.LogsPPLPfx + "a": struct{}{}, constants.LogsPPLPfx + "b": struct{}{}, - "batch": struct{}{}, + "batch": struct{}{}, }, }, pipelineProcessor: map[string]interface{}{ @@ -82,14 +82,14 @@ var buildProcessorTestData = []struct { "processors": map[string]interface{}{ "memorylimiter": struct{}{}, constants.LogsPPLPfx + "b": struct{}{}, - "batch": struct{}{}, + "batch": struct{}{}, }, }, }, } func TestBuildLogParsingProcessors(t *testing.T) { - for , test := range buildProcessorTestData { + for _, test := range buildProcessorTestData { Convey(test.Name, t, func() { err := buildLogParsingProcessors(test.agentConf, test.pipelineProcessor) So(err, ShouldBeNil) @@ -150,7 +150,7 @@ var BuildLogsPipelineTestData = []struct { } func TestBuildLogsPipeline(t *testing.T) { - for , test := range BuildLogsPipelineTestData { + for _, test := range BuildLogsPipelineTestData { Convey(test.Name, t, func() { v, err := buildLogsProcessors(test.currentPipeline, test.logsPipeline) So(err, ShouldBeNil) From eb4ac18162c5f7f6589ce4639be746636af75fbf Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Fri, 17 Mar 2023 17:39:28 +0530 Subject: [PATCH 6/6] feat: processor builder updated with new logic and tests --- pkg/query-service/app/opamp/logspipeline.go | 40 ++++++++--------- .../app/opamp/logspipeline_test.go | 45 +++++++++++++++++++ 2 files changed, 64 insertions(+), 21 deletions(-) diff --git a/pkg/query-service/app/opamp/logspipeline.go b/pkg/query-service/app/opamp/logspipeline.go index f2467e05e0..36f4a1473b 100644 --- a/pkg/query-service/app/opamp/logspipeline.go +++ b/pkg/query-service/app/opamp/logspipeline.go @@ -157,7 +157,7 @@ func buildLogsProcessors(current []string, logsParserPipeline []string) ([]strin // create a reverse map of existing config processors and their position existing := map[string]int{} - for i, p := range current { + for i, p := range pipeline { name := p existing[name] = i } @@ -165,49 +165,47 @@ func buildLogsProcessors(current []string, logsParserPipeline []string) ([]strin // create mapping from our logsParserPipeline to position in existing processors (from current config) // this means, if "batch" holds position 3 in the current effective config, and 2 in our config, the map will be [2]: 3 specVsExistingMap := map[int]int{} + existingVsSpec := map[int]int{} // go through plan and map its elements to current positions in effective config for i, m := range logsParserPipeline { if loc, ok := existing[m]; ok { specVsExistingMap[i] = loc + existingVsSpec[loc] = i } } lastMatched := 0 + newPipeline := []string{} - // go through plan again in the increasing order for i := 0; i < len(logsParserPipeline); i++ { m := logsParserPipeline[i] - if loc, ok := specVsExistingMap[i]; ok { + for j := lastMatched; j < loc; j++ { + if strings.HasPrefix(pipeline[j], constants.LogsPPLPfx) { + delete(specVsExistingMap, existingVsSpec[j]) + } else { + newPipeline = append(newPipeline, pipeline[j]) + } + } + newPipeline = append(newPipeline, pipeline[loc]) lastMatched = loc + 1 } else { - if lastMatched <= 0 { - zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position 0:", m) - pipeline = append([]string{m}, pipeline[lastMatched:]...) - lastMatched++ - } else { - zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position :", lastMatched, " ", m) - - prior := make([]string, len(pipeline[:lastMatched])) - next := make([]string, len(pipeline[lastMatched:])) - - copy(prior, pipeline[:lastMatched]) - copy(next, pipeline[lastMatched:]) - - pipeline = append(prior, m) - pipeline = append(pipeline, next...) - } + newPipeline = append(newPipeline, m) } + + } + if lastMatched < len(pipeline) { + newPipeline = append(newPipeline, pipeline[lastMatched:]...) } - if checkDuplicateString(pipeline) { + if checkDuplicateString(newPipeline) { // duplicates are most likely because the processor sequence in effective config conflicts // with the planned sequence as per planned pipeline return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline) } - return pipeline, nil + return newPipeline, nil } func checkDuplicateString(pipeline []string) bool { diff --git a/pkg/query-service/app/opamp/logspipeline_test.go b/pkg/query-service/app/opamp/logspipeline_test.go index 26dec4e34d..eef08870dd 100644 --- a/pkg/query-service/app/opamp/logspipeline_test.go +++ b/pkg/query-service/app/opamp/logspipeline_test.go @@ -117,6 +117,12 @@ var BuildLogsPipelineTestData = []struct { logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"}, expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"}, }, + { + Name: "Add new pipeline and respect custom processors", + currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"}, + logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"}, + expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d", "processor2"}, + }, { Name: "Add new pipeline and respect custom processors in the beginning and middle", currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"}, @@ -147,6 +153,45 @@ var BuildLogsPipelineTestData = []struct { logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"}, expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"}, }, + + // working + { + Name: "rearrange pipelines", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"}, + expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch"}, + }, + { + Name: "rearrange pipelines with new processor", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, + logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"}, + expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"}, + // expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"}, + }, + { + Name: "delete processor", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, + logsPipeline: []string{}, + expectedPipeline: []string{"processor1", "processor2", "processor3", "batch"}, + }, + { + Name: "last to first", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"}, + logsPipeline: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"}, + expectedPipeline: []string{"processor1", "processor2", "processor3", "processor4", "batch", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"}, + }, + { + Name: "multiple rearrange pipelines", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"}, + logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"}, + expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"}, + }, + { + Name: "multiple rearrange with new pipelines", + currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"}, + logsPipeline: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"}, + expectedPipeline: []string{constants.LogsPPLPfx + "_z", "processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"}, + }, } func TestBuildLogsPipeline(t *testing.T) {