diff --git a/pkg/query-service/app/logparsingpipeline/collector_config.go b/pkg/query-service/app/logparsingpipeline/collector_config.go index 49f697fbd3..4b8da7b18a 100644 --- a/pkg/query-service/app/logparsingpipeline/collector_config.go +++ b/pkg/query-service/app/logparsingpipeline/collector_config.go @@ -19,24 +19,28 @@ var lockLogsPipelineSpec sync.RWMutex // check if the processors already exist // if yes then update the processor. // if something doesn't exists then remove it. -func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error { +func updateProcessorConfigsInCollectorConf( + collectorConf map[string]interface{}, + signozPipelineProcessors map[string]interface{}, +) error { agentProcessors := map[string]interface{}{} - if agentConf["processors"] != nil { - agentProcessors = (agentConf["processors"]).(map[string]interface{}) + if collectorConf["processors"] != nil { + agentProcessors = (collectorConf["processors"]).(map[string]interface{}) } exists := map[string]struct{}{} - for key, params := range parsingProcessors { + for key, params := range signozPipelineProcessors { agentProcessors[key] = params exists[key] = struct{}{} } - // remove the old unwanted processors + // remove the old unwanted pipeline processors for k := range agentProcessors { - if _, ok := exists[k]; !ok && strings.HasPrefix(k, constants.LogsPPLPfx) { + _, isInDesiredPipelineProcs := exists[k] + if hasSignozPipelineProcessorPrefix(k) && !isInDesiredPipelineProcs { delete(agentProcessors, k) } } - agentConf["processors"] = agentProcessors + collectorConf["processors"] = agentProcessors return nil } @@ -65,21 +69,24 @@ func getOtelPipelineFromConfig(config map[string]interface{}) (*otelPipeline, er return &p, nil } -func buildLogsProcessors(current []string, logsParserPipeline []string) ([]string, error) { +func buildCollectorPipelineProcessorsList( + currentCollectorProcessors []string, + signozPipelineProcessorNames []string, +) ([]string, error) { lockLogsPipelineSpec.Lock() defer lockLogsPipelineSpec.Unlock() exists := map[string]struct{}{} - for _, v := range logsParserPipeline { + for _, v := range signozPipelineProcessorNames { exists[v] = struct{}{} } // removed the old processors which are not used var pipeline []string - for _, v := range current { - k := v - if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) { - pipeline = append(pipeline, v) + for _, procName := range currentCollectorProcessors { + _, isInDesiredPipelineProcs := exists[procName] + if isInDesiredPipelineProcs || !hasSignozPipelineProcessorPrefix(procName) { + pipeline = append(pipeline, procName) } } @@ -96,7 +103,7 @@ func buildLogsProcessors(current []string, logsParserPipeline []string) ([]strin existingVsSpec := map[int]int{} // go through plan and map its elements to current positions in effective config - for i, m := range logsParserPipeline { + for i, m := range signozPipelineProcessorNames { if loc, ok := existing[m]; ok { specVsExistingMap[i] = loc existingVsSpec[loc] = i @@ -106,11 +113,11 @@ func buildLogsProcessors(current []string, logsParserPipeline []string) ([]strin lastMatched := 0 newPipeline := []string{} - for i := 0; i < len(logsParserPipeline); i++ { - m := logsParserPipeline[i] + for i := 0; i < len(signozPipelineProcessorNames); i++ { + m := signozPipelineProcessorNames[i] if loc, ok := specVsExistingMap[i]; ok { for j := lastMatched; j < loc; j++ { - if strings.HasPrefix(pipeline[j], constants.LogsPPLPfx) { + if hasSignozPipelineProcessorPrefix(pipeline[j]) { delete(specVsExistingMap, existingVsSpec[j]) } else { newPipeline = append(newPipeline, pipeline[j]) @@ -159,13 +166,13 @@ func GenerateCollectorConfigWithPipelines( config []byte, pipelines []Pipeline, ) ([]byte, *coreModel.ApiError) { - var c map[string]interface{} - err := yaml.Unmarshal([]byte(config), &c) + var collectorConf map[string]interface{} + err := yaml.Unmarshal([]byte(config), &collectorConf) if err != nil { return nil, coreModel.BadRequest(err) } - processors, procNames, err := PreparePipelineProcessor(pipelines) + signozPipelineProcessors, signozPipelineProcNames, err := PreparePipelineProcessor(pipelines) if err != nil { return nil, coreModel.BadRequest(errors.Wrap( err, "could not prepare otel collector processors for log pipelines", @@ -174,8 +181,8 @@ func GenerateCollectorConfigWithPipelines( // Escape any `$`s as `$$` in config generated for pipelines, to ensure any occurrences // like $data do not end up being treated as env vars when loading collector config. - for _, procName := range procNames { - procConf := processors[procName] + for _, procName := range signozPipelineProcNames { + procConf := signozPipelineProcessors[procName] serializedProcConf, err := yaml.Marshal(procConf) if err != nil { return nil, coreModel.InternalError(fmt.Errorf( @@ -194,14 +201,14 @@ func GenerateCollectorConfigWithPipelines( )) } - processors[procName] = escapedConf + signozPipelineProcessors[procName] = escapedConf } // Add processors to unmarshaled collector config `c` - buildLogParsingProcessors(c, processors) + updateProcessorConfigsInCollectorConf(collectorConf, signozPipelineProcessors) // build the new processor list in service.pipelines.logs - p, err := getOtelPipelineFromConfig(c) + p, err := getOtelPipelineFromConfig(collectorConf) if err != nil { return nil, coreModel.BadRequest(err) } @@ -211,16 +218,20 @@ func GenerateCollectorConfigWithPipelines( )) } - updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, procNames) + updatedProcessorList, _ := buildCollectorPipelineProcessorsList(p.Pipelines.Logs.Processors, signozPipelineProcNames) p.Pipelines.Logs.Processors = updatedProcessorList // add the new processor to the data ( no checks required as the keys will exists) - c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs + collectorConf["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs - updatedConf, err := yaml.Marshal(c) + updatedConf, err := yaml.Marshal(collectorConf) if err != nil { return nil, coreModel.BadRequest(err) } return updatedConf, nil } + +func hasSignozPipelineProcessorPrefix(procName string) bool { + return strings.HasPrefix(procName, constants.LogsPPLPfx) || strings.HasPrefix(procName, constants.OldLogsPPLPfx) +} diff --git a/pkg/query-service/app/logparsingpipeline/collector_config_test.go b/pkg/query-service/app/logparsingpipeline/collector_config_test.go index f5ba7a352b..2f2d898416 100644 --- a/pkg/query-service/app/logparsingpipeline/collector_config_test.go +++ b/pkg/query-service/app/logparsingpipeline/collector_config_test.go @@ -94,7 +94,7 @@ var buildProcessorTestData = []struct { func TestBuildLogParsingProcessors(t *testing.T) { for _, test := range buildProcessorTestData { Convey(test.Name, t, func() { - err := buildLogParsingProcessors(test.agentConf, test.pipelineProcessor) + err := updateProcessorConfigsInCollectorConf(test.agentConf, test.pipelineProcessor) So(err, ShouldBeNil) So(test.agentConf, ShouldResemble, test.outputConf) }) @@ -200,7 +200,7 @@ var BuildLogsPipelineTestData = []struct { func TestBuildLogsPipeline(t *testing.T) { for _, test := range BuildLogsPipelineTestData { Convey(test.Name, t, func() { - v, err := buildLogsProcessors(test.currentPipeline, test.logsPipeline) + v, err := buildCollectorPipelineProcessorsList(test.currentPipeline, test.logsPipeline) So(err, ShouldBeNil) fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline) So(v, ShouldResemble, test.expectedPipeline) diff --git a/pkg/query-service/app/logparsingpipeline/preview.go b/pkg/query-service/app/logparsingpipeline/preview.go index b37295eb96..548c1ee2f5 100644 --- a/pkg/query-service/app/logparsingpipeline/preview.go +++ b/pkg/query-service/app/logparsingpipeline/preview.go @@ -7,7 +7,7 @@ import ( "time" _ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor" + "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor" "github.com/pkg/errors" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -42,7 +42,7 @@ func SimulatePipelinesProcessing( simulatorInputPLogs := SignozLogsToPLogs(logs) processorFactories, err := processor.MakeFactoryMap( - logstransformprocessor.NewFactory(), + signozlogspipelineprocessor.NewFactory(), ) if err != nil { return nil, nil, model.InternalError(errors.Wrap( diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 78ee31e1a1..541d46d5ca 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -341,7 +341,9 @@ var ReservedColumnTargetAliases = map[string]struct{}{ } // logsPPLPfx is a short constant for logsPipelinePrefix -const LogsPPLPfx = "logstransform/pipeline_" +// TODO(Raj): Remove old prefix after new processor based pipelines have been rolled out +const LogsPPLPfx = "signozlogspipeline/pipeline_" +const OldLogsPPLPfx = "logstransform/pipeline_" const IntegrationPipelineIdPrefix = "integration"