Revert "Feat: use new logspipelineprocessor for generating logs pipeline coll…" (#6099)

This reverts commit e4d1452f5ff63483b95c913cae37442b286e9aae.
This commit is contained in:
Raj Kamal Singh 2024-09-30 23:46:34 +05:30 committed by GitHub
parent bc8891d2f8
commit 1411ae41c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 33 additions and 46 deletions

View File

@ -19,28 +19,24 @@ var lockLogsPipelineSpec sync.RWMutex
// check if the processors already exist // check if the processors already exist
// if yes then update the processor. // if yes then update the processor.
// if something doesn't exists then remove it. // if something doesn't exists then remove it.
func updateProcessorConfigsInCollectorConf( func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error {
collectorConf map[string]interface{},
signozPipelineProcessors map[string]interface{},
) error {
agentProcessors := map[string]interface{}{} agentProcessors := map[string]interface{}{}
if collectorConf["processors"] != nil { if agentConf["processors"] != nil {
agentProcessors = (collectorConf["processors"]).(map[string]interface{}) agentProcessors = (agentConf["processors"]).(map[string]interface{})
} }
exists := map[string]struct{}{} exists := map[string]struct{}{}
for key, params := range signozPipelineProcessors { for key, params := range parsingProcessors {
agentProcessors[key] = params agentProcessors[key] = params
exists[key] = struct{}{} exists[key] = struct{}{}
} }
// remove the old unwanted pipeline processors // remove the old unwanted processors
for k := range agentProcessors { for k := range agentProcessors {
_, isInDesiredPipelineProcs := exists[k] if _, ok := exists[k]; !ok && strings.HasPrefix(k, constants.LogsPPLPfx) {
if hasSignozPipelineProcessorPrefix(k) && !isInDesiredPipelineProcs {
delete(agentProcessors, k) delete(agentProcessors, k)
} }
} }
collectorConf["processors"] = agentProcessors agentConf["processors"] = agentProcessors
return nil return nil
} }
@ -69,24 +65,21 @@ func getOtelPipelineFromConfig(config map[string]interface{}) (*otelPipeline, er
return &p, nil return &p, nil
} }
func buildCollectorPipelineProcessorsList( func buildLogsProcessors(current []string, logsParserPipeline []string) ([]string, error) {
currentCollectorProcessors []string,
signozPipelineProcessorNames []string,
) ([]string, error) {
lockLogsPipelineSpec.Lock() lockLogsPipelineSpec.Lock()
defer lockLogsPipelineSpec.Unlock() defer lockLogsPipelineSpec.Unlock()
exists := map[string]struct{}{} exists := map[string]struct{}{}
for _, v := range signozPipelineProcessorNames { for _, v := range logsParserPipeline {
exists[v] = struct{}{} exists[v] = struct{}{}
} }
// removed the old processors which are not used // removed the old processors which are not used
var pipeline []string var pipeline []string
for _, procName := range currentCollectorProcessors { for _, v := range current {
_, isInDesiredPipelineProcs := exists[procName] k := v
if isInDesiredPipelineProcs || !hasSignozPipelineProcessorPrefix(procName) { if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) {
pipeline = append(pipeline, procName) pipeline = append(pipeline, v)
} }
} }
@ -103,7 +96,7 @@ func buildCollectorPipelineProcessorsList(
existingVsSpec := map[int]int{} existingVsSpec := map[int]int{}
// go through plan and map its elements to current positions in effective config // go through plan and map its elements to current positions in effective config
for i, m := range signozPipelineProcessorNames { for i, m := range logsParserPipeline {
if loc, ok := existing[m]; ok { if loc, ok := existing[m]; ok {
specVsExistingMap[i] = loc specVsExistingMap[i] = loc
existingVsSpec[loc] = i existingVsSpec[loc] = i
@ -113,11 +106,11 @@ func buildCollectorPipelineProcessorsList(
lastMatched := 0 lastMatched := 0
newPipeline := []string{} newPipeline := []string{}
for i := 0; i < len(signozPipelineProcessorNames); i++ { for i := 0; i < len(logsParserPipeline); i++ {
m := signozPipelineProcessorNames[i] m := logsParserPipeline[i]
if loc, ok := specVsExistingMap[i]; ok { if loc, ok := specVsExistingMap[i]; ok {
for j := lastMatched; j < loc; j++ { for j := lastMatched; j < loc; j++ {
if hasSignozPipelineProcessorPrefix(pipeline[j]) { if strings.HasPrefix(pipeline[j], constants.LogsPPLPfx) {
delete(specVsExistingMap, existingVsSpec[j]) delete(specVsExistingMap, existingVsSpec[j])
} else { } else {
newPipeline = append(newPipeline, pipeline[j]) newPipeline = append(newPipeline, pipeline[j])
@ -166,13 +159,13 @@ func GenerateCollectorConfigWithPipelines(
config []byte, config []byte,
pipelines []Pipeline, pipelines []Pipeline,
) ([]byte, *coreModel.ApiError) { ) ([]byte, *coreModel.ApiError) {
var collectorConf map[string]interface{} var c map[string]interface{}
err := yaml.Unmarshal([]byte(config), &collectorConf) err := yaml.Unmarshal([]byte(config), &c)
if err != nil { if err != nil {
return nil, coreModel.BadRequest(err) return nil, coreModel.BadRequest(err)
} }
signozPipelineProcessors, signozPipelineProcNames, err := PreparePipelineProcessor(pipelines) processors, procNames, err := PreparePipelineProcessor(pipelines)
if err != nil { if err != nil {
return nil, coreModel.BadRequest(errors.Wrap( return nil, coreModel.BadRequest(errors.Wrap(
err, "could not prepare otel collector processors for log pipelines", err, "could not prepare otel collector processors for log pipelines",
@ -181,8 +174,8 @@ func GenerateCollectorConfigWithPipelines(
// Escape any `$`s as `$$` in config generated for pipelines, to ensure any occurrences // Escape any `$`s as `$$` in config generated for pipelines, to ensure any occurrences
// like $data do not end up being treated as env vars when loading collector config. // like $data do not end up being treated as env vars when loading collector config.
for _, procName := range signozPipelineProcNames { for _, procName := range procNames {
procConf := signozPipelineProcessors[procName] procConf := processors[procName]
serializedProcConf, err := yaml.Marshal(procConf) serializedProcConf, err := yaml.Marshal(procConf)
if err != nil { if err != nil {
return nil, coreModel.InternalError(fmt.Errorf( return nil, coreModel.InternalError(fmt.Errorf(
@ -201,14 +194,14 @@ func GenerateCollectorConfigWithPipelines(
)) ))
} }
signozPipelineProcessors[procName] = escapedConf processors[procName] = escapedConf
} }
// Add processors to unmarshaled collector config `c` // Add processors to unmarshaled collector config `c`
updateProcessorConfigsInCollectorConf(collectorConf, signozPipelineProcessors) buildLogParsingProcessors(c, processors)
// build the new processor list in service.pipelines.logs // build the new processor list in service.pipelines.logs
p, err := getOtelPipelineFromConfig(collectorConf) p, err := getOtelPipelineFromConfig(c)
if err != nil { if err != nil {
return nil, coreModel.BadRequest(err) return nil, coreModel.BadRequest(err)
} }
@ -218,20 +211,16 @@ func GenerateCollectorConfigWithPipelines(
)) ))
} }
updatedProcessorList, _ := buildCollectorPipelineProcessorsList(p.Pipelines.Logs.Processors, signozPipelineProcNames) updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, procNames)
p.Pipelines.Logs.Processors = updatedProcessorList p.Pipelines.Logs.Processors = updatedProcessorList
// add the new processor to the data ( no checks required as the keys will exists) // add the new processor to the data ( no checks required as the keys will exists)
collectorConf["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs
updatedConf, err := yaml.Marshal(collectorConf) updatedConf, err := yaml.Marshal(c)
if err != nil { if err != nil {
return nil, coreModel.BadRequest(err) return nil, coreModel.BadRequest(err)
} }
return updatedConf, nil return updatedConf, nil
} }
func hasSignozPipelineProcessorPrefix(procName string) bool {
return strings.HasPrefix(procName, constants.LogsPPLPfx) || strings.HasPrefix(procName, constants.OldLogsPPLPfx)
}

View File

@ -94,7 +94,7 @@ var buildProcessorTestData = []struct {
func TestBuildLogParsingProcessors(t *testing.T) { func TestBuildLogParsingProcessors(t *testing.T) {
for _, test := range buildProcessorTestData { for _, test := range buildProcessorTestData {
Convey(test.Name, t, func() { Convey(test.Name, t, func() {
err := updateProcessorConfigsInCollectorConf(test.agentConf, test.pipelineProcessor) err := buildLogParsingProcessors(test.agentConf, test.pipelineProcessor)
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(test.agentConf, ShouldResemble, test.outputConf) So(test.agentConf, ShouldResemble, test.outputConf)
}) })
@ -200,7 +200,7 @@ var BuildLogsPipelineTestData = []struct {
func TestBuildLogsPipeline(t *testing.T) { func TestBuildLogsPipeline(t *testing.T) {
for _, test := range BuildLogsPipelineTestData { for _, test := range BuildLogsPipelineTestData {
Convey(test.Name, t, func() { Convey(test.Name, t, func() {
v, err := buildCollectorPipelineProcessorsList(test.currentPipeline, test.logsPipeline) v, err := buildLogsProcessors(test.currentPipeline, test.logsPipeline)
So(err, ShouldBeNil) So(err, ShouldBeNil)
fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline) fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline)
So(v, ShouldResemble, test.expectedPipeline) So(v, ShouldResemble, test.expectedPipeline)

View File

@ -7,7 +7,7 @@ import (
"time" "time"
_ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok" _ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok"
"github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor"
"github.com/pkg/errors" "github.com/pkg/errors"
"go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/plog"
@ -42,7 +42,7 @@ func SimulatePipelinesProcessing(
simulatorInputPLogs := SignozLogsToPLogs(logs) simulatorInputPLogs := SignozLogsToPLogs(logs)
processorFactories, err := processor.MakeFactoryMap( processorFactories, err := processor.MakeFactoryMap(
signozlogspipelineprocessor.NewFactory(), logstransformprocessor.NewFactory(),
) )
if err != nil { if err != nil {
return nil, nil, model.InternalError(errors.Wrap( return nil, nil, model.InternalError(errors.Wrap(

View File

@ -341,9 +341,7 @@ var ReservedColumnTargetAliases = map[string]struct{}{
} }
// logsPPLPfx is a short constant for logsPipelinePrefix // logsPPLPfx is a short constant for logsPipelinePrefix
// TODO(Raj): Remove old prefix after new processor based pipelines have been rolled out const LogsPPLPfx = "logstransform/pipeline_"
const LogsPPLPfx = "signozlogspipeline/pipeline_"
const OldLogsPPLPfx = "logstransform/pipeline_"
const IntegrationPipelineIdPrefix = "integration" const IntegrationPipelineIdPrefix = "integration"