From 1411ae41c357c42a918be81d7f86f7ff8ab62e56 Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+raj-k-singh@users.noreply.github.com> Date: Mon, 30 Sep 2024 23:46:34 +0530 Subject: [PATCH] =?UTF-8?q?Revert=20"Feat:=20use=20new=20logspipelineproce?= =?UTF-8?q?ssor=20for=20generating=20logs=20pipeline=20coll=E2=80=A6"=20(#?= =?UTF-8?q?6099)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit e4d1452f5ff63483b95c913cae37442b286e9aae. --- .../logparsingpipeline/collector_config.go | 67 ++++++++----------- .../collector_config_test.go | 4 +- .../app/logparsingpipeline/preview.go | 4 +- pkg/query-service/constants/constants.go | 4 +- 4 files changed, 33 insertions(+), 46 deletions(-) diff --git a/pkg/query-service/app/logparsingpipeline/collector_config.go b/pkg/query-service/app/logparsingpipeline/collector_config.go index 4b8da7b18a..49f697fbd3 100644 --- a/pkg/query-service/app/logparsingpipeline/collector_config.go +++ b/pkg/query-service/app/logparsingpipeline/collector_config.go @@ -19,28 +19,24 @@ var lockLogsPipelineSpec sync.RWMutex // check if the processors already exist // if yes then update the processor. // if something doesn't exists then remove it. -func updateProcessorConfigsInCollectorConf( - collectorConf map[string]interface{}, - signozPipelineProcessors map[string]interface{}, -) error { +func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error { agentProcessors := map[string]interface{}{} - if collectorConf["processors"] != nil { - agentProcessors = (collectorConf["processors"]).(map[string]interface{}) + if agentConf["processors"] != nil { + agentProcessors = (agentConf["processors"]).(map[string]interface{}) } exists := map[string]struct{}{} - for key, params := range signozPipelineProcessors { + for key, params := range parsingProcessors { agentProcessors[key] = params exists[key] = struct{}{} } - // remove the old unwanted pipeline processors + // remove the old unwanted processors for k := range agentProcessors { - _, isInDesiredPipelineProcs := exists[k] - if hasSignozPipelineProcessorPrefix(k) && !isInDesiredPipelineProcs { + if _, ok := exists[k]; !ok && strings.HasPrefix(k, constants.LogsPPLPfx) { delete(agentProcessors, k) } } - collectorConf["processors"] = agentProcessors + agentConf["processors"] = agentProcessors return nil } @@ -69,24 +65,21 @@ func getOtelPipelineFromConfig(config map[string]interface{}) (*otelPipeline, er return &p, nil } -func buildCollectorPipelineProcessorsList( - currentCollectorProcessors []string, - signozPipelineProcessorNames []string, -) ([]string, error) { +func buildLogsProcessors(current []string, logsParserPipeline []string) ([]string, error) { lockLogsPipelineSpec.Lock() defer lockLogsPipelineSpec.Unlock() exists := map[string]struct{}{} - for _, v := range signozPipelineProcessorNames { + for _, v := range logsParserPipeline { exists[v] = struct{}{} } // removed the old processors which are not used var pipeline []string - for _, procName := range currentCollectorProcessors { - _, isInDesiredPipelineProcs := exists[procName] - if isInDesiredPipelineProcs || !hasSignozPipelineProcessorPrefix(procName) { - pipeline = append(pipeline, procName) + for _, v := range current { + k := v + if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) { + pipeline = append(pipeline, v) } } @@ -103,7 +96,7 @@ func buildCollectorPipelineProcessorsList( existingVsSpec := map[int]int{} // go through plan and map its elements to current positions in effective config - for i, m := range signozPipelineProcessorNames { + for i, m := range logsParserPipeline { if loc, ok := existing[m]; ok { specVsExistingMap[i] = loc existingVsSpec[loc] = i @@ -113,11 +106,11 @@ func buildCollectorPipelineProcessorsList( lastMatched := 0 newPipeline := []string{} - for i := 0; i < len(signozPipelineProcessorNames); i++ { - m := signozPipelineProcessorNames[i] + for i := 0; i < len(logsParserPipeline); i++ { + m := logsParserPipeline[i] if loc, ok := specVsExistingMap[i]; ok { for j := lastMatched; j < loc; j++ { - if hasSignozPipelineProcessorPrefix(pipeline[j]) { + if strings.HasPrefix(pipeline[j], constants.LogsPPLPfx) { delete(specVsExistingMap, existingVsSpec[j]) } else { newPipeline = append(newPipeline, pipeline[j]) @@ -166,13 +159,13 @@ func GenerateCollectorConfigWithPipelines( config []byte, pipelines []Pipeline, ) ([]byte, *coreModel.ApiError) { - var collectorConf map[string]interface{} - err := yaml.Unmarshal([]byte(config), &collectorConf) + var c map[string]interface{} + err := yaml.Unmarshal([]byte(config), &c) if err != nil { return nil, coreModel.BadRequest(err) } - signozPipelineProcessors, signozPipelineProcNames, err := PreparePipelineProcessor(pipelines) + processors, procNames, err := PreparePipelineProcessor(pipelines) if err != nil { return nil, coreModel.BadRequest(errors.Wrap( err, "could not prepare otel collector processors for log pipelines", @@ -181,8 +174,8 @@ func GenerateCollectorConfigWithPipelines( // Escape any `$`s as `$$` in config generated for pipelines, to ensure any occurrences // like $data do not end up being treated as env vars when loading collector config. - for _, procName := range signozPipelineProcNames { - procConf := signozPipelineProcessors[procName] + for _, procName := range procNames { + procConf := processors[procName] serializedProcConf, err := yaml.Marshal(procConf) if err != nil { return nil, coreModel.InternalError(fmt.Errorf( @@ -201,14 +194,14 @@ func GenerateCollectorConfigWithPipelines( )) } - signozPipelineProcessors[procName] = escapedConf + processors[procName] = escapedConf } // Add processors to unmarshaled collector config `c` - updateProcessorConfigsInCollectorConf(collectorConf, signozPipelineProcessors) + buildLogParsingProcessors(c, processors) // build the new processor list in service.pipelines.logs - p, err := getOtelPipelineFromConfig(collectorConf) + p, err := getOtelPipelineFromConfig(c) if err != nil { return nil, coreModel.BadRequest(err) } @@ -218,20 +211,16 @@ func GenerateCollectorConfigWithPipelines( )) } - updatedProcessorList, _ := buildCollectorPipelineProcessorsList(p.Pipelines.Logs.Processors, signozPipelineProcNames) + updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, procNames) p.Pipelines.Logs.Processors = updatedProcessorList // add the new processor to the data ( no checks required as the keys will exists) - collectorConf["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs + c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs - updatedConf, err := yaml.Marshal(collectorConf) + updatedConf, err := yaml.Marshal(c) if err != nil { return nil, coreModel.BadRequest(err) } return updatedConf, nil } - -func hasSignozPipelineProcessorPrefix(procName string) bool { - return strings.HasPrefix(procName, constants.LogsPPLPfx) || strings.HasPrefix(procName, constants.OldLogsPPLPfx) -} diff --git a/pkg/query-service/app/logparsingpipeline/collector_config_test.go b/pkg/query-service/app/logparsingpipeline/collector_config_test.go index 2f2d898416..f5ba7a352b 100644 --- a/pkg/query-service/app/logparsingpipeline/collector_config_test.go +++ b/pkg/query-service/app/logparsingpipeline/collector_config_test.go @@ -94,7 +94,7 @@ var buildProcessorTestData = []struct { func TestBuildLogParsingProcessors(t *testing.T) { for _, test := range buildProcessorTestData { Convey(test.Name, t, func() { - err := updateProcessorConfigsInCollectorConf(test.agentConf, test.pipelineProcessor) + err := buildLogParsingProcessors(test.agentConf, test.pipelineProcessor) So(err, ShouldBeNil) So(test.agentConf, ShouldResemble, test.outputConf) }) @@ -200,7 +200,7 @@ var BuildLogsPipelineTestData = []struct { func TestBuildLogsPipeline(t *testing.T) { for _, test := range BuildLogsPipelineTestData { Convey(test.Name, t, func() { - v, err := buildCollectorPipelineProcessorsList(test.currentPipeline, test.logsPipeline) + v, err := buildLogsProcessors(test.currentPipeline, test.logsPipeline) So(err, ShouldBeNil) fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline) So(v, ShouldResemble, test.expectedPipeline) diff --git a/pkg/query-service/app/logparsingpipeline/preview.go b/pkg/query-service/app/logparsingpipeline/preview.go index 548c1ee2f5..b37295eb96 100644 --- a/pkg/query-service/app/logparsingpipeline/preview.go +++ b/pkg/query-service/app/logparsingpipeline/preview.go @@ -7,7 +7,7 @@ import ( "time" _ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok" - "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor" "github.com/pkg/errors" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -42,7 +42,7 @@ func SimulatePipelinesProcessing( simulatorInputPLogs := SignozLogsToPLogs(logs) processorFactories, err := processor.MakeFactoryMap( - signozlogspipelineprocessor.NewFactory(), + logstransformprocessor.NewFactory(), ) if err != nil { return nil, nil, model.InternalError(errors.Wrap( diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 541d46d5ca..78ee31e1a1 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -341,9 +341,7 @@ var ReservedColumnTargetAliases = map[string]struct{}{ } // logsPPLPfx is a short constant for logsPipelinePrefix -// TODO(Raj): Remove old prefix after new processor based pipelines have been rolled out -const LogsPPLPfx = "signozlogspipeline/pipeline_" -const OldLogsPPLPfx = "logstransform/pipeline_" +const LogsPPLPfx = "logstransform/pipeline_" const IntegrationPipelineIdPrefix = "integration"