mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-11 17:59:09 +08:00
Fix: QS: logs pipelines: better validation of pipelines being saved (#6652)
* chore: add test validating invalid field paths in pipeline operators are rejected * chore: refactor posted pipelines validation to use a controller method * fix: run a collector simulation to validate pipeline config being saved * chore: minor cleanup
This commit is contained in:
parent
14fbb1fcda
commit
cd9f27ab08
@ -4075,10 +4075,9 @@ func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
|
||||
zap.L().Warn("found no pipelines in the http request, this will delete all the pipelines")
|
||||
}
|
||||
|
||||
for _, p := range postable {
|
||||
if err := p.IsValid(); err != nil {
|
||||
return nil, model.BadRequestStr(err.Error())
|
||||
}
|
||||
validationErr := aH.LogsParsingPipelineController.ValidatePipelines(ctx, postable)
|
||||
if validationErr != nil {
|
||||
return nil, validationErr
|
||||
}
|
||||
|
||||
return aH.LogsParsingPipelineController.ApplyPipelines(ctx, postable)
|
||||
|
@ -94,6 +94,45 @@ func (ic *LogParsingPipelineController) ApplyPipelines(
|
||||
return ic.GetPipelinesByVersion(ctx, cfg.Version)
|
||||
}
|
||||
|
||||
func (ic *LogParsingPipelineController) ValidatePipelines(
|
||||
ctx context.Context,
|
||||
postedPipelines []PostablePipeline,
|
||||
) *model.ApiError {
|
||||
for _, p := range postedPipelines {
|
||||
if err := p.IsValid(); err != nil {
|
||||
return model.BadRequestStr(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Also run a collector simulation to ensure config is fit
|
||||
// for e2e use with a collector
|
||||
pipelines := []Pipeline{}
|
||||
for _, pp := range postedPipelines {
|
||||
pipelines = append(pipelines, Pipeline{
|
||||
Id: uuid.New().String(),
|
||||
OrderId: pp.OrderId,
|
||||
Enabled: pp.Enabled,
|
||||
Name: pp.Name,
|
||||
Alias: pp.Alias,
|
||||
Description: &pp.Description,
|
||||
Filter: pp.Filter,
|
||||
Config: pp.Config,
|
||||
})
|
||||
}
|
||||
|
||||
sampleLogs := []model.SignozLog{{Body: ""}}
|
||||
_, _, simulationErr := SimulatePipelinesProcessing(
|
||||
ctx, pipelines, sampleLogs,
|
||||
)
|
||||
if simulationErr != nil {
|
||||
return model.BadRequest(fmt.Errorf(
|
||||
"invalid pipelines config: %w", simulationErr.ToError(),
|
||||
))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns effective list of pipelines including user created
|
||||
// pipelines and pipelines for installed integrations
|
||||
func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion(
|
||||
|
@ -350,6 +350,27 @@ func TestLogPipelinesValidation(t *testing.T) {
|
||||
},
|
||||
},
|
||||
ExpectedResponseStatusCode: 400,
|
||||
}, {
|
||||
Name: "Invalid from field path",
|
||||
Pipeline: logparsingpipeline.PostablePipeline{
|
||||
OrderId: 1,
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: validPipelineFilterSet,
|
||||
Config: []logparsingpipeline.PipelineOperator{
|
||||
{
|
||||
OrderId: 1,
|
||||
ID: "move",
|
||||
Type: "move",
|
||||
From: `attributes.temp_parsed_body."@l"`,
|
||||
To: "attributes.test",
|
||||
Enabled: true,
|
||||
Name: "test move",
|
||||
},
|
||||
},
|
||||
},
|
||||
ExpectedResponseStatusCode: 400,
|
||||
},
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user