Fix: query service: escape dollars in collector config generated for pipelines. (#4242)

* chore: add test validating pipeline config containing dollar works

* chore: collector simulator: use fileprovider with expandconverter like actual collector

* chore: get tests passing

* chore: take a configGenerator func in simulation and unify config generation logic

* fix: escape $ in config generated for log pipelines
This commit is contained in:
Raj Kamal Singh 2023-12-19 21:24:53 +05:30 committed by GitHub
parent c5cba68b53
commit 581bd07b35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 205 additions and 107 deletions

View File

@ -6,7 +6,9 @@ import (
"strings"
"sync"
"github.com/knadh/koanf/parsers/yaml"
"gopkg.in/yaml.v3"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/constants"
coreModel "go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
@ -14,11 +16,15 @@ import (
var lockLogsPipelineSpec sync.RWMutex
// check if the processors already exis
// check if the processors already exist
// if yes then update the processor.
// if something doesn't exists then remove it.
func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error {
agentProcessors := agentConf["processors"].(map[string]interface{})
agentProcessors := map[string]interface{}{}
if agentConf["processors"] != nil {
agentProcessors = (agentConf["processors"]).(map[string]interface{})
}
exists := map[string]struct{}{}
for key, params := range parsingProcessors {
agentProcessors[key] = params
@ -44,7 +50,7 @@ type otelPipeline struct {
} `json:"pipelines" yaml:"pipelines"`
}
func getOtelPipelinFromConfig(config map[string]interface{}) (*otelPipeline, error) {
func getOtelPipelineFromConfig(config map[string]interface{}) (*otelPipeline, error) {
if _, ok := config["service"]; !ok {
return nil, fmt.Errorf("service not found in OTEL config")
}
@ -146,17 +152,51 @@ func checkDuplicateString(pipeline []string) bool {
func GenerateCollectorConfigWithPipelines(
config []byte,
parsingProcessors map[string]interface{},
parsingProcessorsNames []string,
pipelines []Pipeline,
) ([]byte, *coreModel.ApiError) {
c, err := yaml.Parser().Unmarshal([]byte(config))
var c map[string]interface{}
err := yaml.Unmarshal([]byte(config), &c)
if err != nil {
return nil, coreModel.BadRequest(err)
}
buildLogParsingProcessors(c, parsingProcessors)
processors, procNames, err := PreparePipelineProcessor(pipelines)
if err != nil {
return nil, coreModel.BadRequest(errors.Wrap(
err, "could not prepare otel collector processors for log pipelines",
))
}
p, err := getOtelPipelinFromConfig(c)
// Escape any `$`s as `$$` in config generated for pipelines, to ensure any occurrences
// like $data do not end up being treated as env vars when loading collector config.
for _, procName := range procNames {
procConf := processors[procName]
serializedProcConf, err := yaml.Marshal(procConf)
if err != nil {
return nil, coreModel.InternalError(fmt.Errorf(
"could not marshal processor config for %s: %w", procName, err,
))
}
escapedSerializedConf := strings.ReplaceAll(
string(serializedProcConf), "$", "$$",
)
var escapedConf map[string]interface{}
err = yaml.Unmarshal([]byte(escapedSerializedConf), &escapedConf)
if err != nil {
return nil, coreModel.InternalError(fmt.Errorf(
"could not unmarshal dollar escaped processor config for %s: %w", procName, err,
))
}
processors[procName] = escapedConf
}
// Add processors to unmarshaled collector config `c`
buildLogParsingProcessors(c, processors)
// build the new processor list in service.pipelines.logs
p, err := getOtelPipelineFromConfig(c)
if err != nil {
return nil, coreModel.BadRequest(err)
}
@ -166,14 +206,13 @@ func GenerateCollectorConfigWithPipelines(
))
}
// build the new processor list
updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, parsingProcessorsNames)
updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, procNames)
p.Pipelines.Logs.Processors = updatedProcessorList
// add the new processor to the data ( no checks required as the keys will exists)
c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs
updatedConf, err := yaml.Parser().Marshal(c)
updatedConf, err := yaml.Marshal(c)
if err != nil {
return nil, coreModel.BadRequest(err)
}

View File

@ -171,13 +171,8 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
return nil, "", model.InternalError(multierr.Combine(errs...))
}
processors, procNames, err := PreparePipelineProcessor(pipelines)
if err != nil {
return nil, "", model.BadRequest(errors.Wrap(err, "could not prepare otel collector processors for log pipelines"))
}
updatedConf, apiErr := GenerateCollectorConfigWithPipelines(
currentConfYaml, processors, procNames,
currentConfYaml, pipelines,
)
if apiErr != nil {
return nil, "", model.WrapApiError(apiErr, "could not marshal yaml for updated conf")

View File

@ -557,6 +557,57 @@ func TestPipelineFilterWithStringOpsShouldNotSpamWarningsIfAttributeIsMissing(t
}
}
func TestAttributePathsContainingDollarDoNotBreakCollector(t *testing.T) {
require := require.New(t)
testPipeline := Pipeline{
OrderId: 1,
Name: "pipeline1",
Alias: "pipeline1",
Enabled: true,
Filter: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "$test",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "test",
},
},
},
Config: []PipelineOperator{
{
ID: "move",
Type: "move",
Enabled: true,
Name: "move",
From: "attributes.$test",
To: "attributes.$test1",
},
},
}
testLogs := []model.SignozLog{
makeTestSignozLog("test log", map[string]interface{}{
"$test": "test",
}),
}
result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing(
context.Background(),
[]Pipeline{testPipeline},
testLogs,
)
require.Nil(err)
require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n"))
require.Equal(1, len(result))
require.Equal("test", result[0].Attributes_string["$test1"])
}
func TestTemporaryWorkaroundForSupportingAttribsContainingDots(t *testing.T) {
// TODO(Raj): Remove this after dots are supported

View File

@ -14,7 +14,6 @@ import (
"go.opentelemetry.io/collector/processor"
"go.signoz.io/signoz/pkg/query-service/collectorsimulator"
"go.signoz.io/signoz/pkg/query-service/model"
"gopkg.in/yaml.v3"
)
func SimulatePipelinesProcessing(
@ -42,14 +41,6 @@ func SimulatePipelinesProcessing(
}
simulatorInputPLogs := SignozLogsToPLogs(logs)
// Simulate processing of logs through an otel collector
processorConfigs, err := collectorProcessorsForPipelines(pipelines)
if err != nil {
return nil, nil, model.BadRequest(errors.Wrap(
err, "could not prepare otel processors for pipelines",
))
}
processorFactories, err := processor.MakeFactoryMap(
logstransformprocessor.NewFactory(),
)
@ -65,12 +56,20 @@ func SimulatePipelinesProcessing(
// the number of logtransformprocessors involved.
// See defaultFlushInterval at https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/adapter/emitter.go
// TODO(Raj): Remove this after flushInterval is exposed in logtransformprocessor config
timeout := time.Millisecond * time.Duration(len(processorConfigs)*100+100)
timeout := time.Millisecond * time.Duration(len(pipelines)*100+100)
configGenerator := func(baseConf []byte) ([]byte, error) {
updatedConf, apiErr := GenerateCollectorConfigWithPipelines(baseConf, pipelines)
if apiErr != nil {
return nil, apiErr.ToError()
}
return updatedConf, nil
}
outputPLogs, collectorErrs, apiErr := collectorsimulator.SimulateLogsProcessing(
ctx,
processorFactories,
processorConfigs,
configGenerator,
simulatorInputPLogs,
timeout,
)
@ -95,36 +94,6 @@ func SimulatePipelinesProcessing(
return outputSignozLogs, collectorErrs, nil
}
func collectorProcessorsForPipelines(pipelines []Pipeline) (
[]collectorsimulator.ProcessorConfig, error,
) {
processors, procNames, err := PreparePipelineProcessor(pipelines)
if err != nil {
return nil, err
}
processorConfigs := []collectorsimulator.ProcessorConfig{}
for _, procName := range procNames {
// convert `Processor` structs to map[string]interface{}
procYaml, err := yaml.Marshal(processors[procName])
if err != nil {
return nil, errors.Wrap(err, "could not marshal Processor struct")
}
var procConfRaw map[string]interface{}
err = yaml.Unmarshal(procYaml, &procConfRaw)
if err != nil {
return nil, errors.Wrap(err, "could not unmarshal proc yaml")
}
processorConfigs = append(processorConfigs, collectorsimulator.ProcessorConfig{
Name: procName,
Config: procConfRaw,
})
}
return processorConfigs, nil
}
// plog doesn't contain an ID field.
// SignozLog.ID is stored as a log attribute in plogs for processing
// and gets hydrated back later.

View File

@ -7,11 +7,11 @@ import (
"strings"
"github.com/google/uuid"
"github.com/knadh/koanf/parsers/yaml"
"github.com/pkg/errors"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
"go.opentelemetry.io/collector/confmap/converter/expandconverter"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension"
@ -44,11 +44,12 @@ type CollectorSimulator struct {
inMemoryExporterId string
}
type ConfigGenerator func(baseConfYaml []byte) ([]byte, error)
func NewCollectorSimulator(
ctx context.Context,
signalType component.DataType,
processorFactories map[component.Type]processor.Factory,
processorConfigs []ProcessorConfig,
configGenerator ConfigGenerator,
) (simulator *CollectorSimulator, cleanupFn func(), apiErr *model.ApiError) {
// Put together collector component factories for use in the simulation
receiverFactories, err := receiver.MakeFactoryMap(inmemoryreceiver.NewFactory())
@ -85,9 +86,8 @@ func NewCollectorSimulator(
}
collectorConfYaml, err := generateSimulationConfig(
signalType,
inMemoryReceiverId,
processorConfigs,
configGenerator,
inMemoryExporterId,
collectorLogsOutputFilePath,
)
@ -95,17 +95,42 @@ func NewCollectorSimulator(
return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "could not generate collector config"))
}
// Parse and validate collector config
yamlP := yamlprovider.New()
// Read collector config using the same file provider we use in the actual collector.
// This ensures env variable substitution if any is taken into account.
simulationConfigFile, err := os.CreateTemp("", "collector-simulator-config-*")
if err != nil {
return nil, nil, model.InternalError(errors.Wrap(
err, "could not create tmp file for capturing collector logs",
))
}
simulationConfigPath := simulationConfigFile.Name()
cleanupFn = func() {
os.Remove(collectorLogsOutputFilePath)
os.Remove(simulationConfigPath)
}
_, err = simulationConfigFile.Write(collectorConfYaml)
if err != nil {
return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not write simulation config to tmp file"))
}
err = simulationConfigFile.Close()
if err != nil {
return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not close tmp simulation config file"))
}
fp := fileprovider.New()
confProvider, err := otelcol.NewConfigProvider(otelcol.ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{"yaml:" + string(collectorConfYaml)},
Providers: map[string]confmap.Provider{yamlP.Scheme(): yamlP},
URIs: []string{simulationConfigPath},
Providers: map[string]confmap.Provider{fp.Scheme(): fp},
Converters: []confmap.Converter{expandconverter.New()},
},
})
if err != nil {
return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "could not create config provider."))
}
collectorCfg, err := confProvider.Get(ctx, factories)
if err != nil {
return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "failed to parse collector config"))
@ -201,9 +226,8 @@ func (l *CollectorSimulator) Shutdown(ctx context.Context) (
}
func generateSimulationConfig(
signalType component.DataType,
receiverId string,
processorConfigs []ProcessorConfig,
configGenerator ConfigGenerator,
exporterId string,
collectorLogsOutputPath string,
) ([]byte, error) {
@ -215,6 +239,12 @@ func generateSimulationConfig(
memory:
id: %s
service:
pipelines:
logs:
receivers:
- memory
exporters:
- memory
telemetry:
metrics:
level: none
@ -223,32 +253,5 @@ func generateSimulationConfig(
output_paths: ["%s"]
`, receiverId, exporterId, collectorLogsOutputPath)
simulationConf, err := yaml.Parser().Unmarshal([]byte(baseConf))
if err != nil {
return nil, err
}
processors := map[string]interface{}{}
procNamesInOrder := []string{}
for _, processorConf := range processorConfigs {
processors[processorConf.Name] = processorConf.Config
procNamesInOrder = append(procNamesInOrder, processorConf.Name)
}
simulationConf["processors"] = processors
svc := simulationConf["service"].(map[string]interface{})
svc["pipelines"] = map[string]interface{}{
string(signalType): map[string]interface{}{
"receivers": []string{"memory"},
"processors": procNamesInOrder,
"exporters": []string{"memory"},
},
}
simulationConfYaml, err := yaml.Parser().Marshal(simulationConf)
if err != nil {
return nil, err
}
return simulationConfYaml, nil
return configGenerator([]byte(baseConf))
}

View File

@ -12,17 +12,12 @@ import (
"go.signoz.io/signoz/pkg/query-service/model"
)
type ProcessorConfig struct {
Name string
Config map[string]interface{}
}
// Simulate processing of logs through the otel collector.
// Useful for testing, validation and generating previews.
func SimulateLogsProcessing(
ctx context.Context,
processorFactories map[component.Type]processor.Factory,
processorConfigs []ProcessorConfig,
configGenerator ConfigGenerator,
logs []plog.Logs,
timeout time.Duration,
) (
@ -30,7 +25,7 @@ func SimulateLogsProcessing(
) {
// Construct and start a simulator (wraps a collector service)
simulator, simulatorInitCleanup, apiErr := NewCollectorSimulator(
ctx, component.DataTypeLogs, processorFactories, processorConfigs,
ctx, processorFactories, configGenerator,
)
if simulatorInitCleanup != nil {
defer simulatorInitCleanup()

View File

@ -12,6 +12,11 @@ import (
"go.opentelemetry.io/collector/processor"
)
type ProcessorConfig struct {
Name string
Config map[string]interface{}
}
func TestLogsProcessingSimulation(t *testing.T) {
require := require.New(t)
@ -71,10 +76,13 @@ func TestLogsProcessingSimulation(t *testing.T) {
)
require.Nil(err, "could not create processors factory map")
configGenerator := makeTestConfigGenerator(
[]ProcessorConfig{testProcessor1, testProcessor2},
)
outputLogs, collectorErrs, apiErr := SimulateLogsProcessing(
context.Background(),
processorFactories,
[]ProcessorConfig{testProcessor1, testProcessor2},
configGenerator,
inputLogs,
300*time.Millisecond,
)
@ -111,3 +119,41 @@ func makeTestPlog(body string, attrsStr map[string]string) plog.Logs {
return pl
}
func makeTestConfigGenerator(
processorConfigs []ProcessorConfig,
) ConfigGenerator {
return func(baseConf []byte) ([]byte, error) {
conf, err := yaml.Parser().Unmarshal([]byte(baseConf))
if err != nil {
return nil, err
}
processors := map[string]interface{}{}
if conf["processors"] != nil {
processors = conf["processors"].(map[string]interface{})
}
logsProcessors := []string{}
svc := conf["service"].(map[string]interface{})
svcPipelines := svc["pipelines"].(map[string]interface{})
svcLogsPipeline := svcPipelines["logs"].(map[string]interface{})
if svcLogsPipeline["processors"] != nil {
logsProcessors = svcLogsPipeline["processors"].([]string)
}
for _, processorConf := range processorConfigs {
processors[processorConf.Name] = processorConf.Config
logsProcessors = append(logsProcessors, processorConf.Name)
}
conf["processors"] = processors
svcLogsPipeline["processors"] = logsProcessors
confYaml, err := yaml.Parser().Marshal(conf)
if err != nil {
return nil, err
}
return confYaml, nil
}
}