package logparsingpipeline import ( "github.com/pkg/errors" "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr" ) const ( NOOP = "noop" ) func CollectorConfProcessorName(p Pipeline) string { return constants.LogsPPLPfx + p.Alias } func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []string, error) { processors := map[string]interface{}{} names := []string{} for _, v := range pipelines { if !v.Enabled { continue } operators := getOperators(v.Config) if len(operators) == 0 { continue } filterExpr, err := queryBuilderToExpr.Parse(v.Filter) if err != nil { return nil, nil, errors.Wrap(err, "failed to parse pipeline filter") } router := []PipelineOperator{ { ID: "router_signoz", Type: "router", Routes: &[]Route{ { Output: v.Config[0].ID, Expr: filterExpr, }, }, Default: NOOP, }, } v.Config = append(router, operators...) // noop operator is needed as the default operator so that logs are not dropped noop := PipelineOperator{ ID: NOOP, Type: NOOP, } v.Config = append(v.Config, noop) processor := Processor{ Operators: v.Config, } name := CollectorConfProcessorName(v) processors[name] = processor names = append(names, name) } return processors, names, nil } func getOperators(ops []PipelineOperator) []PipelineOperator { filteredOp := []PipelineOperator{} for i, operator := range ops { if operator.Enabled { if len(filteredOp) > 0 { filteredOp[len(filteredOp)-1].Output = operator.ID } filteredOp = append(filteredOp, operator) } else if i == len(ops)-1 && len(filteredOp) != 0 { filteredOp[len(filteredOp)-1].Output = "" } } return filteredOp }