mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-15 08:25:54 +08:00
Feat: use new logspipelineprocessor for generating logs pipeline collector conf (#6080)
* chore: update logs pipeline prefix for generated collector config * chore: some cleanup * chore: some more cleanup * chore: some more cleanup
This commit is contained in:
parent
88ace79a64
commit
e4d1452f5f
@ -19,24 +19,28 @@ var lockLogsPipelineSpec sync.RWMutex
|
|||||||
// check if the processors already exist
|
// check if the processors already exist
|
||||||
// if yes then update the processor.
|
// if yes then update the processor.
|
||||||
// if something doesn't exists then remove it.
|
// if something doesn't exists then remove it.
|
||||||
func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error {
|
func updateProcessorConfigsInCollectorConf(
|
||||||
|
collectorConf map[string]interface{},
|
||||||
|
signozPipelineProcessors map[string]interface{},
|
||||||
|
) error {
|
||||||
agentProcessors := map[string]interface{}{}
|
agentProcessors := map[string]interface{}{}
|
||||||
if agentConf["processors"] != nil {
|
if collectorConf["processors"] != nil {
|
||||||
agentProcessors = (agentConf["processors"]).(map[string]interface{})
|
agentProcessors = (collectorConf["processors"]).(map[string]interface{})
|
||||||
}
|
}
|
||||||
|
|
||||||
exists := map[string]struct{}{}
|
exists := map[string]struct{}{}
|
||||||
for key, params := range parsingProcessors {
|
for key, params := range signozPipelineProcessors {
|
||||||
agentProcessors[key] = params
|
agentProcessors[key] = params
|
||||||
exists[key] = struct{}{}
|
exists[key] = struct{}{}
|
||||||
}
|
}
|
||||||
// remove the old unwanted processors
|
// remove the old unwanted pipeline processors
|
||||||
for k := range agentProcessors {
|
for k := range agentProcessors {
|
||||||
if _, ok := exists[k]; !ok && strings.HasPrefix(k, constants.LogsPPLPfx) {
|
_, isInDesiredPipelineProcs := exists[k]
|
||||||
|
if hasSignozPipelineProcessorPrefix(k) && !isInDesiredPipelineProcs {
|
||||||
delete(agentProcessors, k)
|
delete(agentProcessors, k)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
agentConf["processors"] = agentProcessors
|
collectorConf["processors"] = agentProcessors
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -65,21 +69,24 @@ func getOtelPipelineFromConfig(config map[string]interface{}) (*otelPipeline, er
|
|||||||
return &p, nil
|
return &p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildLogsProcessors(current []string, logsParserPipeline []string) ([]string, error) {
|
func buildCollectorPipelineProcessorsList(
|
||||||
|
currentCollectorProcessors []string,
|
||||||
|
signozPipelineProcessorNames []string,
|
||||||
|
) ([]string, error) {
|
||||||
lockLogsPipelineSpec.Lock()
|
lockLogsPipelineSpec.Lock()
|
||||||
defer lockLogsPipelineSpec.Unlock()
|
defer lockLogsPipelineSpec.Unlock()
|
||||||
|
|
||||||
exists := map[string]struct{}{}
|
exists := map[string]struct{}{}
|
||||||
for _, v := range logsParserPipeline {
|
for _, v := range signozPipelineProcessorNames {
|
||||||
exists[v] = struct{}{}
|
exists[v] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// removed the old processors which are not used
|
// removed the old processors which are not used
|
||||||
var pipeline []string
|
var pipeline []string
|
||||||
for _, v := range current {
|
for _, procName := range currentCollectorProcessors {
|
||||||
k := v
|
_, isInDesiredPipelineProcs := exists[procName]
|
||||||
if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) {
|
if isInDesiredPipelineProcs || !hasSignozPipelineProcessorPrefix(procName) {
|
||||||
pipeline = append(pipeline, v)
|
pipeline = append(pipeline, procName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -96,7 +103,7 @@ func buildLogsProcessors(current []string, logsParserPipeline []string) ([]strin
|
|||||||
existingVsSpec := map[int]int{}
|
existingVsSpec := map[int]int{}
|
||||||
|
|
||||||
// go through plan and map its elements to current positions in effective config
|
// go through plan and map its elements to current positions in effective config
|
||||||
for i, m := range logsParserPipeline {
|
for i, m := range signozPipelineProcessorNames {
|
||||||
if loc, ok := existing[m]; ok {
|
if loc, ok := existing[m]; ok {
|
||||||
specVsExistingMap[i] = loc
|
specVsExistingMap[i] = loc
|
||||||
existingVsSpec[loc] = i
|
existingVsSpec[loc] = i
|
||||||
@ -106,11 +113,11 @@ func buildLogsProcessors(current []string, logsParserPipeline []string) ([]strin
|
|||||||
lastMatched := 0
|
lastMatched := 0
|
||||||
newPipeline := []string{}
|
newPipeline := []string{}
|
||||||
|
|
||||||
for i := 0; i < len(logsParserPipeline); i++ {
|
for i := 0; i < len(signozPipelineProcessorNames); i++ {
|
||||||
m := logsParserPipeline[i]
|
m := signozPipelineProcessorNames[i]
|
||||||
if loc, ok := specVsExistingMap[i]; ok {
|
if loc, ok := specVsExistingMap[i]; ok {
|
||||||
for j := lastMatched; j < loc; j++ {
|
for j := lastMatched; j < loc; j++ {
|
||||||
if strings.HasPrefix(pipeline[j], constants.LogsPPLPfx) {
|
if hasSignozPipelineProcessorPrefix(pipeline[j]) {
|
||||||
delete(specVsExistingMap, existingVsSpec[j])
|
delete(specVsExistingMap, existingVsSpec[j])
|
||||||
} else {
|
} else {
|
||||||
newPipeline = append(newPipeline, pipeline[j])
|
newPipeline = append(newPipeline, pipeline[j])
|
||||||
@ -159,13 +166,13 @@ func GenerateCollectorConfigWithPipelines(
|
|||||||
config []byte,
|
config []byte,
|
||||||
pipelines []Pipeline,
|
pipelines []Pipeline,
|
||||||
) ([]byte, *coreModel.ApiError) {
|
) ([]byte, *coreModel.ApiError) {
|
||||||
var c map[string]interface{}
|
var collectorConf map[string]interface{}
|
||||||
err := yaml.Unmarshal([]byte(config), &c)
|
err := yaml.Unmarshal([]byte(config), &collectorConf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, coreModel.BadRequest(err)
|
return nil, coreModel.BadRequest(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
processors, procNames, err := PreparePipelineProcessor(pipelines)
|
signozPipelineProcessors, signozPipelineProcNames, err := PreparePipelineProcessor(pipelines)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, coreModel.BadRequest(errors.Wrap(
|
return nil, coreModel.BadRequest(errors.Wrap(
|
||||||
err, "could not prepare otel collector processors for log pipelines",
|
err, "could not prepare otel collector processors for log pipelines",
|
||||||
@ -174,8 +181,8 @@ func GenerateCollectorConfigWithPipelines(
|
|||||||
|
|
||||||
// Escape any `$`s as `$$` in config generated for pipelines, to ensure any occurrences
|
// Escape any `$`s as `$$` in config generated for pipelines, to ensure any occurrences
|
||||||
// like $data do not end up being treated as env vars when loading collector config.
|
// like $data do not end up being treated as env vars when loading collector config.
|
||||||
for _, procName := range procNames {
|
for _, procName := range signozPipelineProcNames {
|
||||||
procConf := processors[procName]
|
procConf := signozPipelineProcessors[procName]
|
||||||
serializedProcConf, err := yaml.Marshal(procConf)
|
serializedProcConf, err := yaml.Marshal(procConf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, coreModel.InternalError(fmt.Errorf(
|
return nil, coreModel.InternalError(fmt.Errorf(
|
||||||
@ -194,14 +201,14 @@ func GenerateCollectorConfigWithPipelines(
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
processors[procName] = escapedConf
|
signozPipelineProcessors[procName] = escapedConf
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add processors to unmarshaled collector config `c`
|
// Add processors to unmarshaled collector config `c`
|
||||||
buildLogParsingProcessors(c, processors)
|
updateProcessorConfigsInCollectorConf(collectorConf, signozPipelineProcessors)
|
||||||
|
|
||||||
// build the new processor list in service.pipelines.logs
|
// build the new processor list in service.pipelines.logs
|
||||||
p, err := getOtelPipelineFromConfig(c)
|
p, err := getOtelPipelineFromConfig(collectorConf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, coreModel.BadRequest(err)
|
return nil, coreModel.BadRequest(err)
|
||||||
}
|
}
|
||||||
@ -211,16 +218,20 @@ func GenerateCollectorConfigWithPipelines(
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, procNames)
|
updatedProcessorList, _ := buildCollectorPipelineProcessorsList(p.Pipelines.Logs.Processors, signozPipelineProcNames)
|
||||||
p.Pipelines.Logs.Processors = updatedProcessorList
|
p.Pipelines.Logs.Processors = updatedProcessorList
|
||||||
|
|
||||||
// add the new processor to the data ( no checks required as the keys will exists)
|
// add the new processor to the data ( no checks required as the keys will exists)
|
||||||
c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs
|
collectorConf["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs
|
||||||
|
|
||||||
updatedConf, err := yaml.Marshal(c)
|
updatedConf, err := yaml.Marshal(collectorConf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, coreModel.BadRequest(err)
|
return nil, coreModel.BadRequest(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return updatedConf, nil
|
return updatedConf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func hasSignozPipelineProcessorPrefix(procName string) bool {
|
||||||
|
return strings.HasPrefix(procName, constants.LogsPPLPfx) || strings.HasPrefix(procName, constants.OldLogsPPLPfx)
|
||||||
|
}
|
||||||
|
@ -94,7 +94,7 @@ var buildProcessorTestData = []struct {
|
|||||||
func TestBuildLogParsingProcessors(t *testing.T) {
|
func TestBuildLogParsingProcessors(t *testing.T) {
|
||||||
for _, test := range buildProcessorTestData {
|
for _, test := range buildProcessorTestData {
|
||||||
Convey(test.Name, t, func() {
|
Convey(test.Name, t, func() {
|
||||||
err := buildLogParsingProcessors(test.agentConf, test.pipelineProcessor)
|
err := updateProcessorConfigsInCollectorConf(test.agentConf, test.pipelineProcessor)
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
So(test.agentConf, ShouldResemble, test.outputConf)
|
So(test.agentConf, ShouldResemble, test.outputConf)
|
||||||
})
|
})
|
||||||
@ -200,7 +200,7 @@ var BuildLogsPipelineTestData = []struct {
|
|||||||
func TestBuildLogsPipeline(t *testing.T) {
|
func TestBuildLogsPipeline(t *testing.T) {
|
||||||
for _, test := range BuildLogsPipelineTestData {
|
for _, test := range BuildLogsPipelineTestData {
|
||||||
Convey(test.Name, t, func() {
|
Convey(test.Name, t, func() {
|
||||||
v, err := buildLogsProcessors(test.currentPipeline, test.logsPipeline)
|
v, err := buildCollectorPipelineProcessorsList(test.currentPipeline, test.logsPipeline)
|
||||||
So(err, ShouldBeNil)
|
So(err, ShouldBeNil)
|
||||||
fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline)
|
fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline)
|
||||||
So(v, ShouldResemble, test.expectedPipeline)
|
So(v, ShouldResemble, test.expectedPipeline)
|
||||||
|
@ -7,7 +7,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
_ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok"
|
_ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok"
|
||||||
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor"
|
"github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"go.opentelemetry.io/collector/pdata/pcommon"
|
"go.opentelemetry.io/collector/pdata/pcommon"
|
||||||
"go.opentelemetry.io/collector/pdata/plog"
|
"go.opentelemetry.io/collector/pdata/plog"
|
||||||
@ -42,7 +42,7 @@ func SimulatePipelinesProcessing(
|
|||||||
simulatorInputPLogs := SignozLogsToPLogs(logs)
|
simulatorInputPLogs := SignozLogsToPLogs(logs)
|
||||||
|
|
||||||
processorFactories, err := processor.MakeFactoryMap(
|
processorFactories, err := processor.MakeFactoryMap(
|
||||||
logstransformprocessor.NewFactory(),
|
signozlogspipelineprocessor.NewFactory(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, model.InternalError(errors.Wrap(
|
return nil, nil, model.InternalError(errors.Wrap(
|
||||||
|
@ -341,7 +341,9 @@ var ReservedColumnTargetAliases = map[string]struct{}{
|
|||||||
}
|
}
|
||||||
|
|
||||||
// logsPPLPfx is a short constant for logsPipelinePrefix
|
// logsPPLPfx is a short constant for logsPipelinePrefix
|
||||||
const LogsPPLPfx = "logstransform/pipeline_"
|
// TODO(Raj): Remove old prefix after new processor based pipelines have been rolled out
|
||||||
|
const LogsPPLPfx = "signozlogspipeline/pipeline_"
|
||||||
|
const OldLogsPPLPfx = "logstransform/pipeline_"
|
||||||
|
|
||||||
const IntegrationPipelineIdPrefix = "integration"
|
const IntegrationPipelineIdPrefix = "integration"
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user