From 8324d010aefcc72319a065b83644b39703927d4e Mon Sep 17 00:00:00 2001 From: Yunus M Date: Wed, 20 Sep 2023 11:32:17 +0530 Subject: [PATCH 1/8] fix: frontend/Dockerfile to reduce vulnerabilities (#3589) The following vulnerabilities are fixed with an upgrade: - https://snyk.io/vuln/SNYK-ALPINE317-LIBWEBP-5902239 Co-authored-by: snyk-bot --- frontend/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/Dockerfile b/frontend/Dockerfile index db4974a3b5..3209052799 100644 --- a/frontend/Dockerfile +++ b/frontend/Dockerfile @@ -24,7 +24,7 @@ COPY . . RUN yarn build -FROM nginx:1.24.0-alpine +FROM nginx:1.25.2-alpine COPY conf/default.conf /etc/nginx/conf.d/default.conf From 3db8a25eb989c3ac39e54b1dab18c3becda54c4c Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+rkssisodiya@users.noreply.github.com> Date: Wed, 20 Sep 2023 12:39:34 +0530 Subject: [PATCH 2/8] Add support in query service for querybuilder filterset based log pipelines (#3560) * chore: use v3.Filterset as pipeline filters in logparsing pipelines integration tests * chore: get logparsing integration tests passing with filterset based pipeline * chore: get all other breaking tests passing * chore: move models.logparsingpipeline to logparsingpipeline.model * chore: implement Valuer and Scanner interfaces for v3.FilterSet --- .../app/logparsingpipeline/controller.go | 4 +- .../app/logparsingpipeline/db.go | 16 +- .../logparsingpipeline/model.go} | 17 +- .../app/logparsingpipeline/pipelineBuilder.go | 31 ++- .../pipelineBuilder_test.go | 33 ++- .../logparsingpipeline/postablePipeline.go | 31 ++- .../postablePipeline_test.go | 188 ++++++++++-------- pkg/query-service/model/v3/v3.go | 18 ++ .../integration/logparsingpipeline_test.go | 116 ++++++++--- 9 files changed, 283 insertions(+), 171 deletions(-) rename pkg/query-service/{model/logparsingpipeline.go => app/logparsingpipeline/model.go} (86%) diff --git a/pkg/query-service/app/logparsingpipeline/controller.go b/pkg/query-service/app/logparsingpipeline/controller.go index 2f68105129..2aa036b394 100644 --- a/pkg/query-service/app/logparsingpipeline/controller.go +++ b/pkg/query-service/app/logparsingpipeline/controller.go @@ -28,7 +28,7 @@ func NewLogParsingPipelinesController(db *sqlx.DB, engine string) (*LogParsingPi type PipelinesResponse struct { *agentConf.ConfigVersion - Pipelines []model.Pipeline `json:"pipelines"` + Pipelines []Pipeline `json:"pipelines"` History []agentConf.ConfigVersion `json:"history"` } @@ -43,7 +43,7 @@ func (ic *LogParsingPipelineController) ApplyPipelines( return nil, model.UnauthorizedError(errors.Wrap(authErr, "failed to get userId from context")) } - var pipelines []model.Pipeline + var pipelines []Pipeline // scan through postable pipelines, to select the existing pipelines or insert missing ones for _, r := range postable { diff --git a/pkg/query-service/app/logparsingpipeline/db.go b/pkg/query-service/app/logparsingpipeline/db.go index 0d897c272c..ae4effb590 100644 --- a/pkg/query-service/app/logparsingpipeline/db.go +++ b/pkg/query-service/app/logparsingpipeline/db.go @@ -41,7 +41,7 @@ func (r *Repo) InitDB(engine string) error { // insertPipeline stores a given postable pipeline to database func (r *Repo) insertPipeline( ctx context.Context, postable *PostablePipeline, -) (*model.Pipeline, *model.ApiError) { +) (*Pipeline, *model.ApiError) { if err := postable.IsValid(); err != nil { return nil, model.BadRequest(errors.Wrap(err, "pipeline is not valid", @@ -65,7 +65,7 @@ func (r *Repo) insertPipeline( return nil, model.UnauthorizedError(err) } - insertRow := &model.Pipeline{ + insertRow := &Pipeline{ Id: uuid.New().String(), OrderId: postable.OrderId, Enabled: postable.Enabled, @@ -75,7 +75,7 @@ func (r *Repo) insertPipeline( Filter: postable.Filter, Config: postable.Config, RawConfig: string(rawConfig), - Creator: model.Creator{ + Creator: Creator{ CreatedBy: claims["email"].(string), CreatedAt: time.Now(), }, @@ -107,9 +107,11 @@ func (r *Repo) insertPipeline( } // getPipelinesByVersion returns pipelines associated with a given version -func (r *Repo) getPipelinesByVersion(ctx context.Context, version int) ([]model.Pipeline, []error) { +func (r *Repo) getPipelinesByVersion( + ctx context.Context, version int, +) ([]Pipeline, []error) { var errors []error - pipelines := []model.Pipeline{} + pipelines := []Pipeline{} versionQuery := `SELECT r.id, r.name, @@ -151,8 +153,8 @@ func (r *Repo) getPipelinesByVersion(ctx context.Context, version int) ([]model. // GetPipelines returns pipeline and errors (if any) func (r *Repo) GetPipeline( ctx context.Context, id string, -) (*model.Pipeline, *model.ApiError) { - pipelines := []model.Pipeline{} +) (*Pipeline, *model.ApiError) { + pipelines := []Pipeline{} pipelineQuery := `SELECT id, name, diff --git a/pkg/query-service/model/logparsingpipeline.go b/pkg/query-service/app/logparsingpipeline/model.go similarity index 86% rename from pkg/query-service/model/logparsingpipeline.go rename to pkg/query-service/app/logparsingpipeline/model.go index 3eec51bdc3..21493115bd 100644 --- a/pkg/query-service/model/logparsingpipeline.go +++ b/pkg/query-service/app/logparsingpipeline/model.go @@ -1,21 +1,22 @@ -package model +package logparsingpipeline import ( "encoding/json" "time" "github.com/pkg/errors" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) // Pipeline is stored and also deployed finally to collector config type Pipeline struct { - Id string `json:"id,omitempty" db:"id"` - OrderId int `json:"orderId" db:"order_id"` - Name string `json:"name,omitempty" db:"name"` - Alias string `json:"alias" db:"alias"` - Description *string `json:"description" db:"description"` - Enabled bool `json:"enabled" db:"enabled"` - Filter string `json:"filter" db:"filter"` + Id string `json:"id,omitempty" db:"id"` + OrderId int `json:"orderId" db:"order_id"` + Name string `json:"name,omitempty" db:"name"` + Alias string `json:"alias" db:"alias"` + Description *string `json:"description" db:"description"` + Enabled bool `json:"enabled" db:"enabled"` + Filter *v3.FilterSet `json:"filter" db:"filter"` // configuration for pipeline RawConfig string `db:"config_json" json:"-"` diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go index 568cbe20ce..a7c1f9b5c6 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go @@ -1,15 +1,20 @@ package logparsingpipeline import ( + "github.com/pkg/errors" "go.signoz.io/signoz/pkg/query-service/constants" - "go.signoz.io/signoz/pkg/query-service/model" + "go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr" ) const ( NOOP = "noop" ) -func PreparePipelineProcessor(pipelines []model.Pipeline) (map[string]interface{}, []string, error) { +func CollectorConfProcessorName(p Pipeline) string { + return constants.LogsPPLPfx + p.Alias +} + +func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []string, error) { processors := map[string]interface{}{} names := []string{} for _, v := range pipelines { @@ -21,14 +26,20 @@ func PreparePipelineProcessor(pipelines []model.Pipeline) (map[string]interface{ if len(operators) == 0 { continue } - router := []model.PipelineOperator{ + + filterExpr, err := queryBuilderToExpr.Parse(v.Filter) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to parse pipeline filter") + } + + router := []PipelineOperator{ { ID: "router_signoz", Type: "router", - Routes: &[]model.Route{ + Routes: &[]Route{ { Output: v.Config[0].ID, - Expr: v.Filter, + Expr: filterExpr, }, }, Default: NOOP, @@ -38,24 +49,24 @@ func PreparePipelineProcessor(pipelines []model.Pipeline) (map[string]interface{ v.Config = append(router, operators...) // noop operator is needed as the default operator so that logs are not dropped - noop := model.PipelineOperator{ + noop := PipelineOperator{ ID: NOOP, Type: NOOP, } v.Config = append(v.Config, noop) - processor := model.Processor{ + processor := Processor{ Operators: v.Config, } - name := constants.LogsPPLPfx + v.Alias + name := CollectorConfProcessorName(v) processors[name] = processor names = append(names, name) } return processors, names, nil } -func getOperators(ops []model.PipelineOperator) []model.PipelineOperator { - filteredOp := []model.PipelineOperator{} +func getOperators(ops []PipelineOperator) []PipelineOperator { + filteredOp := []PipelineOperator{} for i, operator := range ops { if operator.Enabled { if len(filteredOp) > 0 { diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go index a8017ac439..3dc0ff7cf1 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go @@ -4,17 +4,16 @@ import ( "testing" . "github.com/smartystreets/goconvey/convey" - "go.signoz.io/signoz/pkg/query-service/model" ) var prepareProcessorTestData = []struct { Name string - Operators []model.PipelineOperator - Output []model.PipelineOperator + Operators []PipelineOperator + Output []PipelineOperator }{ { Name: "Last operator disabled", - Operators: []model.PipelineOperator{ + Operators: []PipelineOperator{ { ID: "t1", Name: "t1", @@ -27,7 +26,7 @@ var prepareProcessorTestData = []struct { Enabled: false, }, }, - Output: []model.PipelineOperator{ + Output: []PipelineOperator{ { ID: "t1", Name: "t1", @@ -37,7 +36,7 @@ var prepareProcessorTestData = []struct { }, { Name: "Operator in middle disabled", - Operators: []model.PipelineOperator{ + Operators: []PipelineOperator{ { ID: "t1", Name: "t1", @@ -56,7 +55,7 @@ var prepareProcessorTestData = []struct { Enabled: true, }, }, - Output: []model.PipelineOperator{ + Output: []PipelineOperator{ { ID: "t1", Name: "t1", @@ -72,7 +71,7 @@ var prepareProcessorTestData = []struct { }, { Name: "Single operator disabled", - Operators: []model.PipelineOperator{ + Operators: []PipelineOperator{ { ID: "t1", Name: "t1", @@ -80,18 +79,18 @@ var prepareProcessorTestData = []struct { Enabled: false, }, }, - Output: []model.PipelineOperator{}, + Output: []PipelineOperator{}, }, { Name: "Single operator enabled", - Operators: []model.PipelineOperator{ + Operators: []PipelineOperator{ { ID: "t1", Name: "t1", Enabled: true, }, }, - Output: []model.PipelineOperator{ + Output: []PipelineOperator{ { ID: "t1", Name: "t1", @@ -101,12 +100,12 @@ var prepareProcessorTestData = []struct { }, { Name: "Empty operator", - Operators: []model.PipelineOperator{}, - Output: []model.PipelineOperator{}, + Operators: []PipelineOperator{}, + Output: []PipelineOperator{}, }, { Name: "new test", - Operators: []model.PipelineOperator{ + Operators: []PipelineOperator{ { ID: "move_filename", Output: "move_function", @@ -137,7 +136,7 @@ var prepareProcessorTestData = []struct { Name: "move_lwp", }, }, - Output: []model.PipelineOperator{ + Output: []PipelineOperator{ { ID: "move_filename", Output: "move_line", @@ -165,7 +164,7 @@ var prepareProcessorTestData = []struct { }, { Name: "first op disabled", - Operators: []model.PipelineOperator{ + Operators: []PipelineOperator{ { ID: "move_filename", Output: "move_function", @@ -178,7 +177,7 @@ var prepareProcessorTestData = []struct { Name: "move_function", }, }, - Output: []model.PipelineOperator{ + Output: []PipelineOperator{ { ID: "move_function", Enabled: true, diff --git a/pkg/query-service/app/logparsingpipeline/postablePipeline.go b/pkg/query-service/app/logparsingpipeline/postablePipeline.go index 2deda650bd..d11fb5b952 100644 --- a/pkg/query-service/app/logparsingpipeline/postablePipeline.go +++ b/pkg/query-service/app/logparsingpipeline/postablePipeline.go @@ -6,9 +6,8 @@ import ( "regexp" "strings" - "github.com/antonmedv/expr" - - "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr" ) // PostablePipelines are a list of user defined pielines @@ -19,14 +18,14 @@ type PostablePipelines struct { // PostablePipeline captures user inputs in setting the pipeline type PostablePipeline struct { - Id string `json:"id"` - OrderId int `json:"orderId"` - Name string `json:"name"` - Alias string `json:"alias"` - Description string `json:"description"` - Enabled bool `json:"enabled"` - Filter string `json:"filter"` - Config []model.PipelineOperator `json:"config"` + Id string `json:"id"` + OrderId int `json:"orderId"` + Name string `json:"name"` + Alias string `json:"alias"` + Description string `json:"description"` + Enabled bool `json:"enabled"` + Filter *v3.FilterSet `json:"filter"` + Config []PipelineOperator `json:"config"` } // IsValid checks if postable pipeline has all the required params @@ -42,12 +41,8 @@ func (p *PostablePipeline) IsValid() error { return fmt.Errorf("pipeline alias is required") } - if p.Filter == "" { - return fmt.Errorf("pipeline filter is required") - } - - // check the expression - _, err := expr.Compile(p.Filter, expr.AsBool(), expr.AllowUndefinedVariables()) + // check the filter + _, err := queryBuilderToExpr.Parse(p.Filter) if err != nil { return fmt.Errorf(fmt.Sprintf("filter for pipeline %v is not correct: %v", p.Name, err.Error())) } @@ -95,7 +90,7 @@ func (p *PostablePipeline) IsValid() error { return nil } -func isValidOperator(op model.PipelineOperator) error { +func isValidOperator(op PipelineOperator) error { if op.ID == "" { return errors.New("PipelineOperator.ID is required.") } diff --git a/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go b/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go index ab9ed4414f..dd2c61e748 100644 --- a/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go +++ b/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go @@ -4,74 +4,102 @@ import ( "testing" . "github.com/smartystreets/goconvey/convey" - "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) -var correctQueriesTest = []struct { - Name string - Pipeline PostablePipeline - IsValid bool -}{ - { - Name: "No orderId", - Pipeline: PostablePipeline{ - Name: "pipeline 1", - Alias: "pipeline1", - Enabled: true, - Filter: "attributes.method == \"GET\"", - Config: []model.PipelineOperator{}, - }, - IsValid: false, - }, - { - Name: "Invalid orderId", - Pipeline: PostablePipeline{ - OrderId: 0, - Name: "pipeline 1", - Alias: "pipeline1", - Enabled: true, - Filter: "attributes.method == \"GET\"", - Config: []model.PipelineOperator{}, - }, - IsValid: false, - }, - { - Name: "Valid orderId", - Pipeline: PostablePipeline{ - OrderId: 1, - Name: "pipeline 1", - Alias: "pipeline1", - Enabled: true, - Filter: "attributes.method == \"GET\"", - Config: []model.PipelineOperator{}, - }, - IsValid: true, - }, - { - Name: "Invalid filter", - Pipeline: PostablePipeline{ - OrderId: 1, - Name: "pipeline 1", - Alias: "pipeline1", - Enabled: true, - Filter: "test filter", - }, - IsValid: false, - }, - { - Name: "Valid filter", - Pipeline: PostablePipeline{ - OrderId: 1, - Name: "pipeline 1", - Alias: "pipeline1", - Enabled: true, - Filter: "attributes.method == \"GET\"", - }, - IsValid: true, - }, -} - func TestIsValidPostablePipeline(t *testing.T) { + validPipelineFilterSet := &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "GET", + }, + }, + } + + var correctQueriesTest = []struct { + Name string + Pipeline PostablePipeline + IsValid bool + }{ + { + Name: "No orderId", + Pipeline: PostablePipeline{ + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: validPipelineFilterSet, + Config: []PipelineOperator{}, + }, + IsValid: false, + }, + { + Name: "Invalid orderId", + Pipeline: PostablePipeline{ + OrderId: 0, + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: validPipelineFilterSet, + Config: []PipelineOperator{}, + }, + IsValid: false, + }, + { + Name: "Valid orderId", + Pipeline: PostablePipeline{ + OrderId: 1, + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: validPipelineFilterSet, + Config: []PipelineOperator{}, + }, + IsValid: true, + }, + { + Name: "Invalid filter", + Pipeline: PostablePipeline{ + OrderId: 1, + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeUnspecified, + }, + Operator: "regex", + Value: "[0-9A-Z*", + }, + }, + }, + }, + IsValid: false, + }, + { + Name: "Valid filter", + Pipeline: PostablePipeline{ + OrderId: 1, + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: validPipelineFilterSet, + }, + IsValid: true, + }, + } + for _, test := range correctQueriesTest { Convey(test.Name, t, func() { err := test.Pipeline.IsValid() @@ -86,12 +114,12 @@ func TestIsValidPostablePipeline(t *testing.T) { var operatorTest = []struct { Name string - Operator model.PipelineOperator + Operator PipelineOperator IsValid bool }{ { Name: "Operator - without id", - Operator: model.PipelineOperator{ + Operator: PipelineOperator{ Type: "remove", Field: "attributes.abc", }, @@ -99,7 +127,7 @@ var operatorTest = []struct { }, { Name: "Operator - without type", - Operator: model.PipelineOperator{ + Operator: PipelineOperator{ ID: "test", Field: "attributes.abc", }, @@ -107,7 +135,7 @@ var operatorTest = []struct { }, { Name: "Copy - invalid to and from", - Operator: model.PipelineOperator{ + Operator: PipelineOperator{ ID: "copy", Type: "copy", From: "date", @@ -117,7 +145,7 @@ var operatorTest = []struct { }, { Name: "Move - invalid to and from", - Operator: model.PipelineOperator{ + Operator: PipelineOperator{ ID: "move", Type: "move", From: "attributes", @@ -127,7 +155,7 @@ var operatorTest = []struct { }, { Name: "Add - invalid to and from", - Operator: model.PipelineOperator{ + Operator: PipelineOperator{ ID: "add", Type: "add", Field: "data", @@ -136,7 +164,7 @@ var operatorTest = []struct { }, { Name: "Remove - invalid to and from", - Operator: model.PipelineOperator{ + Operator: PipelineOperator{ ID: "remove", Type: "remove", Field: "data", @@ -145,7 +173,7 @@ var operatorTest = []struct { }, { Name: "Add - valid", - Operator: model.PipelineOperator{ + Operator: PipelineOperator{ ID: "add", Type: "add", Field: "body", @@ -155,7 +183,7 @@ var operatorTest = []struct { }, { Name: "Move - valid", - Operator: model.PipelineOperator{ + Operator: PipelineOperator{ ID: "move", Type: "move", From: "attributes.x1", @@ -165,7 +193,7 @@ var operatorTest = []struct { }, { Name: "Copy - valid", - Operator: model.PipelineOperator{ + Operator: PipelineOperator{ ID: "copy", Type: "copy", From: "resource.x1", @@ -175,7 +203,7 @@ var operatorTest = []struct { }, { Name: "Unknown operator", - Operator: model.PipelineOperator{ + Operator: PipelineOperator{ ID: "copy", Type: "operator", From: "resource.x1", @@ -185,7 +213,7 @@ var operatorTest = []struct { }, { Name: "Grok - valid", - Operator: model.PipelineOperator{ + Operator: PipelineOperator{ ID: "grok", Type: "grok_parser", Pattern: "%{COMMONAPACHELOG}", @@ -195,7 +223,7 @@ var operatorTest = []struct { }, { Name: "Grok - invalid", - Operator: model.PipelineOperator{ + Operator: PipelineOperator{ ID: "grok", Type: "grok_parser", Pattern: "%{COMMONAPACHELOG}", @@ -205,7 +233,7 @@ var operatorTest = []struct { }, { Name: "Regex - valid", - Operator: model.PipelineOperator{ + Operator: PipelineOperator{ ID: "regex", Type: "regex_parser", Regex: "(?P