diff --git a/pkg/query-service/app/logparsingpipeline/collector_config.go b/pkg/query-service/app/logparsingpipeline/collector_config.go index dfef6070f9..c370441210 100644 --- a/pkg/query-service/app/logparsingpipeline/collector_config.go +++ b/pkg/query-service/app/logparsingpipeline/collector_config.go @@ -6,7 +6,9 @@ import ( "strings" "sync" - "github.com/knadh/koanf/parsers/yaml" + "gopkg.in/yaml.v3" + + "github.com/pkg/errors" "go.signoz.io/signoz/pkg/query-service/constants" coreModel "go.signoz.io/signoz/pkg/query-service/model" "go.uber.org/zap" @@ -14,11 +16,15 @@ import ( var lockLogsPipelineSpec sync.RWMutex -// check if the processors already exis +// check if the processors already exist // if yes then update the processor. // if something doesn't exists then remove it. func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error { - agentProcessors := agentConf["processors"].(map[string]interface{}) + agentProcessors := map[string]interface{}{} + if agentConf["processors"] != nil { + agentProcessors = (agentConf["processors"]).(map[string]interface{}) + } + exists := map[string]struct{}{} for key, params := range parsingProcessors { agentProcessors[key] = params @@ -44,7 +50,7 @@ type otelPipeline struct { } `json:"pipelines" yaml:"pipelines"` } -func getOtelPipelinFromConfig(config map[string]interface{}) (*otelPipeline, error) { +func getOtelPipelineFromConfig(config map[string]interface{}) (*otelPipeline, error) { if _, ok := config["service"]; !ok { return nil, fmt.Errorf("service not found in OTEL config") } @@ -146,17 +152,51 @@ func checkDuplicateString(pipeline []string) bool { func GenerateCollectorConfigWithPipelines( config []byte, - parsingProcessors map[string]interface{}, - parsingProcessorsNames []string, + pipelines []Pipeline, ) ([]byte, *coreModel.ApiError) { - c, err := yaml.Parser().Unmarshal([]byte(config)) + var c map[string]interface{} + err := yaml.Unmarshal([]byte(config), &c) if err != nil { return nil, coreModel.BadRequest(err) } - buildLogParsingProcessors(c, parsingProcessors) + processors, procNames, err := PreparePipelineProcessor(pipelines) + if err != nil { + return nil, coreModel.BadRequest(errors.Wrap( + err, "could not prepare otel collector processors for log pipelines", + )) + } - p, err := getOtelPipelinFromConfig(c) + // Escape any `$`s as `$$` in config generated for pipelines, to ensure any occurrences + // like $data do not end up being treated as env vars when loading collector config. + for _, procName := range procNames { + procConf := processors[procName] + serializedProcConf, err := yaml.Marshal(procConf) + if err != nil { + return nil, coreModel.InternalError(fmt.Errorf( + "could not marshal processor config for %s: %w", procName, err, + )) + } + escapedSerializedConf := strings.ReplaceAll( + string(serializedProcConf), "$", "$$", + ) + + var escapedConf map[string]interface{} + err = yaml.Unmarshal([]byte(escapedSerializedConf), &escapedConf) + if err != nil { + return nil, coreModel.InternalError(fmt.Errorf( + "could not unmarshal dollar escaped processor config for %s: %w", procName, err, + )) + } + + processors[procName] = escapedConf + } + + // Add processors to unmarshaled collector config `c` + buildLogParsingProcessors(c, processors) + + // build the new processor list in service.pipelines.logs + p, err := getOtelPipelineFromConfig(c) if err != nil { return nil, coreModel.BadRequest(err) } @@ -166,14 +206,13 @@ func GenerateCollectorConfigWithPipelines( )) } - // build the new processor list - updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, parsingProcessorsNames) + updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, procNames) p.Pipelines.Logs.Processors = updatedProcessorList // add the new processor to the data ( no checks required as the keys will exists) c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs - updatedConf, err := yaml.Parser().Marshal(c) + updatedConf, err := yaml.Marshal(c) if err != nil { return nil, coreModel.BadRequest(err) } diff --git a/pkg/query-service/app/logparsingpipeline/controller.go b/pkg/query-service/app/logparsingpipeline/controller.go index 066123c416..eed3befec5 100644 --- a/pkg/query-service/app/logparsingpipeline/controller.go +++ b/pkg/query-service/app/logparsingpipeline/controller.go @@ -171,13 +171,8 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig( return nil, "", model.InternalError(multierr.Combine(errs...)) } - processors, procNames, err := PreparePipelineProcessor(pipelines) - if err != nil { - return nil, "", model.BadRequest(errors.Wrap(err, "could not prepare otel collector processors for log pipelines")) - } - updatedConf, apiErr := GenerateCollectorConfigWithPipelines( - currentConfYaml, processors, procNames, + currentConfYaml, pipelines, ) if apiErr != nil { return nil, "", model.WrapApiError(apiErr, "could not marshal yaml for updated conf") diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go index 0ef3ff71a5..2eeffaed2f 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go @@ -557,6 +557,57 @@ func TestPipelineFilterWithStringOpsShouldNotSpamWarningsIfAttributeIsMissing(t } } +func TestAttributePathsContainingDollarDoNotBreakCollector(t *testing.T) { + require := require.New(t) + + testPipeline := Pipeline{ + OrderId: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + Filter: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "$test", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "test", + }, + }, + }, + Config: []PipelineOperator{ + { + ID: "move", + Type: "move", + Enabled: true, + Name: "move", + From: "attributes.$test", + To: "attributes.$test1", + }, + }, + } + + testLogs := []model.SignozLog{ + makeTestSignozLog("test log", map[string]interface{}{ + "$test": "test", + }), + } + + result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing( + context.Background(), + []Pipeline{testPipeline}, + testLogs, + ) + require.Nil(err) + require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n")) + require.Equal(1, len(result)) + require.Equal("test", result[0].Attributes_string["$test1"]) +} + func TestTemporaryWorkaroundForSupportingAttribsContainingDots(t *testing.T) { // TODO(Raj): Remove this after dots are supported diff --git a/pkg/query-service/app/logparsingpipeline/preview.go b/pkg/query-service/app/logparsingpipeline/preview.go index 4f725cafa5..8f991ee3da 100644 --- a/pkg/query-service/app/logparsingpipeline/preview.go +++ b/pkg/query-service/app/logparsingpipeline/preview.go @@ -14,7 +14,6 @@ import ( "go.opentelemetry.io/collector/processor" "go.signoz.io/signoz/pkg/query-service/collectorsimulator" "go.signoz.io/signoz/pkg/query-service/model" - "gopkg.in/yaml.v3" ) func SimulatePipelinesProcessing( @@ -42,14 +41,6 @@ func SimulatePipelinesProcessing( } simulatorInputPLogs := SignozLogsToPLogs(logs) - // Simulate processing of logs through an otel collector - processorConfigs, err := collectorProcessorsForPipelines(pipelines) - if err != nil { - return nil, nil, model.BadRequest(errors.Wrap( - err, "could not prepare otel processors for pipelines", - )) - } - processorFactories, err := processor.MakeFactoryMap( logstransformprocessor.NewFactory(), ) @@ -65,12 +56,20 @@ func SimulatePipelinesProcessing( // the number of logtransformprocessors involved. // See defaultFlushInterval at https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/adapter/emitter.go // TODO(Raj): Remove this after flushInterval is exposed in logtransformprocessor config - timeout := time.Millisecond * time.Duration(len(processorConfigs)*100+100) + timeout := time.Millisecond * time.Duration(len(pipelines)*100+100) + + configGenerator := func(baseConf []byte) ([]byte, error) { + updatedConf, apiErr := GenerateCollectorConfigWithPipelines(baseConf, pipelines) + if apiErr != nil { + return nil, apiErr.ToError() + } + return updatedConf, nil + } outputPLogs, collectorErrs, apiErr := collectorsimulator.SimulateLogsProcessing( ctx, processorFactories, - processorConfigs, + configGenerator, simulatorInputPLogs, timeout, ) @@ -95,36 +94,6 @@ func SimulatePipelinesProcessing( return outputSignozLogs, collectorErrs, nil } -func collectorProcessorsForPipelines(pipelines []Pipeline) ( - []collectorsimulator.ProcessorConfig, error, -) { - processors, procNames, err := PreparePipelineProcessor(pipelines) - if err != nil { - return nil, err - } - - processorConfigs := []collectorsimulator.ProcessorConfig{} - for _, procName := range procNames { - // convert `Processor` structs to map[string]interface{} - procYaml, err := yaml.Marshal(processors[procName]) - if err != nil { - return nil, errors.Wrap(err, "could not marshal Processor struct") - } - var procConfRaw map[string]interface{} - err = yaml.Unmarshal(procYaml, &procConfRaw) - if err != nil { - return nil, errors.Wrap(err, "could not unmarshal proc yaml") - } - - processorConfigs = append(processorConfigs, collectorsimulator.ProcessorConfig{ - Name: procName, - Config: procConfRaw, - }) - } - - return processorConfigs, nil -} - // plog doesn't contain an ID field. // SignozLog.ID is stored as a log attribute in plogs for processing // and gets hydrated back later. diff --git a/pkg/query-service/collectorsimulator/collectorsimulator.go b/pkg/query-service/collectorsimulator/collectorsimulator.go index 4a8236b483..e45c2d168a 100644 --- a/pkg/query-service/collectorsimulator/collectorsimulator.go +++ b/pkg/query-service/collectorsimulator/collectorsimulator.go @@ -7,11 +7,11 @@ import ( "strings" "github.com/google/uuid" - "github.com/knadh/koanf/parsers/yaml" "github.com/pkg/errors" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" - "go.opentelemetry.io/collector/confmap/provider/yamlprovider" + "go.opentelemetry.io/collector/confmap/converter/expandconverter" + "go.opentelemetry.io/collector/confmap/provider/fileprovider" "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/extension" @@ -44,11 +44,12 @@ type CollectorSimulator struct { inMemoryExporterId string } +type ConfigGenerator func(baseConfYaml []byte) ([]byte, error) + func NewCollectorSimulator( ctx context.Context, - signalType component.DataType, processorFactories map[component.Type]processor.Factory, - processorConfigs []ProcessorConfig, + configGenerator ConfigGenerator, ) (simulator *CollectorSimulator, cleanupFn func(), apiErr *model.ApiError) { // Put together collector component factories for use in the simulation receiverFactories, err := receiver.MakeFactoryMap(inmemoryreceiver.NewFactory()) @@ -85,9 +86,8 @@ func NewCollectorSimulator( } collectorConfYaml, err := generateSimulationConfig( - signalType, inMemoryReceiverId, - processorConfigs, + configGenerator, inMemoryExporterId, collectorLogsOutputFilePath, ) @@ -95,17 +95,42 @@ func NewCollectorSimulator( return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "could not generate collector config")) } - // Parse and validate collector config - yamlP := yamlprovider.New() + // Read collector config using the same file provider we use in the actual collector. + // This ensures env variable substitution if any is taken into account. + simulationConfigFile, err := os.CreateTemp("", "collector-simulator-config-*") + if err != nil { + return nil, nil, model.InternalError(errors.Wrap( + err, "could not create tmp file for capturing collector logs", + )) + } + simulationConfigPath := simulationConfigFile.Name() + cleanupFn = func() { + os.Remove(collectorLogsOutputFilePath) + os.Remove(simulationConfigPath) + } + + _, err = simulationConfigFile.Write(collectorConfYaml) + + if err != nil { + return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not write simulation config to tmp file")) + } + err = simulationConfigFile.Close() + if err != nil { + return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not close tmp simulation config file")) + } + + fp := fileprovider.New() confProvider, err := otelcol.NewConfigProvider(otelcol.ConfigProviderSettings{ ResolverSettings: confmap.ResolverSettings{ - URIs: []string{"yaml:" + string(collectorConfYaml)}, - Providers: map[string]confmap.Provider{yamlP.Scheme(): yamlP}, + URIs: []string{simulationConfigPath}, + Providers: map[string]confmap.Provider{fp.Scheme(): fp}, + Converters: []confmap.Converter{expandconverter.New()}, }, }) if err != nil { return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "could not create config provider.")) } + collectorCfg, err := confProvider.Get(ctx, factories) if err != nil { return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "failed to parse collector config")) @@ -201,9 +226,8 @@ func (l *CollectorSimulator) Shutdown(ctx context.Context) ( } func generateSimulationConfig( - signalType component.DataType, receiverId string, - processorConfigs []ProcessorConfig, + configGenerator ConfigGenerator, exporterId string, collectorLogsOutputPath string, ) ([]byte, error) { @@ -215,6 +239,12 @@ func generateSimulationConfig( memory: id: %s service: + pipelines: + logs: + receivers: + - memory + exporters: + - memory telemetry: metrics: level: none @@ -223,32 +253,5 @@ func generateSimulationConfig( output_paths: ["%s"] `, receiverId, exporterId, collectorLogsOutputPath) - simulationConf, err := yaml.Parser().Unmarshal([]byte(baseConf)) - if err != nil { - return nil, err - } - - processors := map[string]interface{}{} - procNamesInOrder := []string{} - for _, processorConf := range processorConfigs { - processors[processorConf.Name] = processorConf.Config - procNamesInOrder = append(procNamesInOrder, processorConf.Name) - } - simulationConf["processors"] = processors - - svc := simulationConf["service"].(map[string]interface{}) - svc["pipelines"] = map[string]interface{}{ - string(signalType): map[string]interface{}{ - "receivers": []string{"memory"}, - "processors": procNamesInOrder, - "exporters": []string{"memory"}, - }, - } - - simulationConfYaml, err := yaml.Parser().Marshal(simulationConf) - if err != nil { - return nil, err - } - - return simulationConfYaml, nil + return configGenerator([]byte(baseConf)) } diff --git a/pkg/query-service/collectorsimulator/logs.go b/pkg/query-service/collectorsimulator/logs.go index 7d67eb5d55..dc84e6fe56 100644 --- a/pkg/query-service/collectorsimulator/logs.go +++ b/pkg/query-service/collectorsimulator/logs.go @@ -12,17 +12,12 @@ import ( "go.signoz.io/signoz/pkg/query-service/model" ) -type ProcessorConfig struct { - Name string - Config map[string]interface{} -} - // Simulate processing of logs through the otel collector. // Useful for testing, validation and generating previews. func SimulateLogsProcessing( ctx context.Context, processorFactories map[component.Type]processor.Factory, - processorConfigs []ProcessorConfig, + configGenerator ConfigGenerator, logs []plog.Logs, timeout time.Duration, ) ( @@ -30,7 +25,7 @@ func SimulateLogsProcessing( ) { // Construct and start a simulator (wraps a collector service) simulator, simulatorInitCleanup, apiErr := NewCollectorSimulator( - ctx, component.DataTypeLogs, processorFactories, processorConfigs, + ctx, processorFactories, configGenerator, ) if simulatorInitCleanup != nil { defer simulatorInitCleanup() diff --git a/pkg/query-service/collectorsimulator/logs_test.go b/pkg/query-service/collectorsimulator/logs_test.go index 796d19f00f..628c33c537 100644 --- a/pkg/query-service/collectorsimulator/logs_test.go +++ b/pkg/query-service/collectorsimulator/logs_test.go @@ -12,6 +12,11 @@ import ( "go.opentelemetry.io/collector/processor" ) +type ProcessorConfig struct { + Name string + Config map[string]interface{} +} + func TestLogsProcessingSimulation(t *testing.T) { require := require.New(t) @@ -71,10 +76,13 @@ func TestLogsProcessingSimulation(t *testing.T) { ) require.Nil(err, "could not create processors factory map") + configGenerator := makeTestConfigGenerator( + []ProcessorConfig{testProcessor1, testProcessor2}, + ) outputLogs, collectorErrs, apiErr := SimulateLogsProcessing( context.Background(), processorFactories, - []ProcessorConfig{testProcessor1, testProcessor2}, + configGenerator, inputLogs, 300*time.Millisecond, ) @@ -111,3 +119,41 @@ func makeTestPlog(body string, attrsStr map[string]string) plog.Logs { return pl } + +func makeTestConfigGenerator( + processorConfigs []ProcessorConfig, +) ConfigGenerator { + return func(baseConf []byte) ([]byte, error) { + conf, err := yaml.Parser().Unmarshal([]byte(baseConf)) + if err != nil { + return nil, err + } + + processors := map[string]interface{}{} + if conf["processors"] != nil { + processors = conf["processors"].(map[string]interface{}) + } + logsProcessors := []string{} + svc := conf["service"].(map[string]interface{}) + svcPipelines := svc["pipelines"].(map[string]interface{}) + svcLogsPipeline := svcPipelines["logs"].(map[string]interface{}) + if svcLogsPipeline["processors"] != nil { + logsProcessors = svcLogsPipeline["processors"].([]string) + } + + for _, processorConf := range processorConfigs { + processors[processorConf.Name] = processorConf.Config + logsProcessors = append(logsProcessors, processorConf.Name) + } + + conf["processors"] = processors + svcLogsPipeline["processors"] = logsProcessors + + confYaml, err := yaml.Parser().Marshal(conf) + if err != nil { + return nil, err + } + + return confYaml, nil + } +}