From cd9f27ab0898babdfa2c4f4cbd8cf045fa5ac3cc Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+raj-k-singh@users.noreply.github.com> Date: Wed, 18 Dec 2024 10:42:14 +0530 Subject: [PATCH] Fix: QS: logs pipelines: better validation of pipelines being saved (#6652) * chore: add test validating invalid field paths in pipeline operators are rejected * chore: refactor posted pipelines validation to use a controller method * fix: run a collector simulation to validate pipeline config being saved * chore: minor cleanup --- pkg/query-service/app/http_handler.go | 7 ++-- .../app/logparsingpipeline/controller.go | 39 +++++++++++++++++++ .../integration/logparsingpipeline_test.go | 21 ++++++++++ 3 files changed, 63 insertions(+), 4 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index e14eec7ef6..ba16894438 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -4075,10 +4075,9 @@ func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) zap.L().Warn("found no pipelines in the http request, this will delete all the pipelines") } - for _, p := range postable { - if err := p.IsValid(); err != nil { - return nil, model.BadRequestStr(err.Error()) - } + validationErr := aH.LogsParsingPipelineController.ValidatePipelines(ctx, postable) + if validationErr != nil { + return nil, validationErr } return aH.LogsParsingPipelineController.ApplyPipelines(ctx, postable) diff --git a/pkg/query-service/app/logparsingpipeline/controller.go b/pkg/query-service/app/logparsingpipeline/controller.go index 2e6b0ba4d3..4929f72f78 100644 --- a/pkg/query-service/app/logparsingpipeline/controller.go +++ b/pkg/query-service/app/logparsingpipeline/controller.go @@ -94,6 +94,45 @@ func (ic *LogParsingPipelineController) ApplyPipelines( return ic.GetPipelinesByVersion(ctx, cfg.Version) } +func (ic *LogParsingPipelineController) ValidatePipelines( + ctx context.Context, + postedPipelines []PostablePipeline, +) *model.ApiError { + for _, p := range postedPipelines { + if err := p.IsValid(); err != nil { + return model.BadRequestStr(err.Error()) + } + } + + // Also run a collector simulation to ensure config is fit + // for e2e use with a collector + pipelines := []Pipeline{} + for _, pp := range postedPipelines { + pipelines = append(pipelines, Pipeline{ + Id: uuid.New().String(), + OrderId: pp.OrderId, + Enabled: pp.Enabled, + Name: pp.Name, + Alias: pp.Alias, + Description: &pp.Description, + Filter: pp.Filter, + Config: pp.Config, + }) + } + + sampleLogs := []model.SignozLog{{Body: ""}} + _, _, simulationErr := SimulatePipelinesProcessing( + ctx, pipelines, sampleLogs, + ) + if simulationErr != nil { + return model.BadRequest(fmt.Errorf( + "invalid pipelines config: %w", simulationErr.ToError(), + )) + } + + return nil +} + // Returns effective list of pipelines including user created // pipelines and pipelines for installed integrations func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion( diff --git a/pkg/query-service/tests/integration/logparsingpipeline_test.go b/pkg/query-service/tests/integration/logparsingpipeline_test.go index e06f7a280b..9e07b604d7 100644 --- a/pkg/query-service/tests/integration/logparsingpipeline_test.go +++ b/pkg/query-service/tests/integration/logparsingpipeline_test.go @@ -350,6 +350,27 @@ func TestLogPipelinesValidation(t *testing.T) { }, }, ExpectedResponseStatusCode: 400, + }, { + Name: "Invalid from field path", + Pipeline: logparsingpipeline.PostablePipeline{ + OrderId: 1, + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: validPipelineFilterSet, + Config: []logparsingpipeline.PipelineOperator{ + { + OrderId: 1, + ID: "move", + Type: "move", + From: `attributes.temp_parsed_body."@l"`, + To: "attributes.test", + Enabled: true, + Name: "test move", + }, + }, + }, + ExpectedResponseStatusCode: 400, }, }