diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index c9d839fd39..89a9ed24cb 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -8,6 +8,7 @@ import ( "go.signoz.io/signoz/ee/query-service/interfaces" "go.signoz.io/signoz/ee/query-service/license" baseapp "go.signoz.io/signoz/pkg/query-service/app" + "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" baseint "go.signoz.io/signoz/pkg/query-service/interfaces" basemodel "go.signoz.io/signoz/pkg/query-service/model" rules "go.signoz.io/signoz/pkg/query-service/rules" @@ -15,14 +16,15 @@ import ( ) type APIHandlerOptions struct { - DataConnector interfaces.DataConnector - SkipConfig *basemodel.SkipConfig - PreferDelta bool - PreferSpanMetrics bool - AppDao dao.ModelDao - RulesManager *rules.Manager - FeatureFlags baseint.FeatureLookup - LicenseManager *license.Manager + DataConnector interfaces.DataConnector + SkipConfig *basemodel.SkipConfig + PreferDelta bool + PreferSpanMetrics bool + AppDao dao.ModelDao + RulesManager *rules.Manager + FeatureFlags baseint.FeatureLookup + LicenseManager *license.Manager + LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController } type APIHandler struct { @@ -34,13 +36,15 @@ type APIHandler struct { func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) { baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{ - Reader: opts.DataConnector, - SkipConfig: opts.SkipConfig, - PerferDelta: opts.PreferDelta, - PreferSpanMetrics: opts.PreferSpanMetrics, - AppDao: opts.AppDao, - RuleManager: opts.RulesManager, - FeatureFlags: opts.FeatureFlags}) + Reader: opts.DataConnector, + SkipConfig: opts.SkipConfig, + PerferDelta: opts.PreferDelta, + PreferSpanMetrics: opts.PreferSpanMetrics, + AppDao: opts.AppDao, + RuleManager: opts.RulesManager, + FeatureFlags: opts.FeatureFlags, + LogsParsingPipelineController: opts.LogsParsingPipelineController, + }) if err != nil { return nil, err diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index a74738eef5..5004d36ab1 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -31,6 +31,7 @@ import ( baseapp "go.signoz.io/signoz/pkg/query-service/app" "go.signoz.io/signoz/pkg/query-service/app/dashboards" baseexplorer "go.signoz.io/signoz/pkg/query-service/app/explorer" + "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/app/opamp" opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model" baseauth "go.signoz.io/signoz/pkg/query-service/auth" @@ -157,6 +158,12 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } + // ingestion pipelines manager + logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(localDB, "sqlite") + if err != nil { + return nil, err + } + // start the usagemanager usageManager, err := usage.New("sqlite", localDB, lm.GetRepo(), reader.GetConn()) if err != nil { @@ -170,14 +177,15 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { telemetry.GetInstance().SetReader(reader) apiOpts := api.APIHandlerOptions{ - DataConnector: reader, - SkipConfig: skipConfig, - PreferDelta: serverOptions.PreferDelta, - PreferSpanMetrics: serverOptions.PreferSpanMetrics, - AppDao: modelDao, - RulesManager: rm, - FeatureFlags: lm, - LicenseManager: lm, + DataConnector: reader, + SkipConfig: skipConfig, + PreferDelta: serverOptions.PreferDelta, + PreferSpanMetrics: serverOptions.PreferSpanMetrics, + AppDao: modelDao, + RulesManager: rm, + FeatureFlags: lm, + LicenseManager: lm, + LogsParsingPipelineController: logParsingPipelineController, } apiHandler, err := api.NewAPIHandler(apiOpts) diff --git a/go.mod b/go.mod index 753e8fd613..59ef611a4c 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.18 require ( github.com/ClickHouse/clickhouse-go/v2 v2.5.1 github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb + github.com/antonmedv/expr v1.12.4 github.com/SigNoz/zap_otlp/zap_otlp_encoder v0.0.0-20230523034029-2b7ff773052c github.com/SigNoz/zap_otlp/zap_otlp_sync v0.0.0-20230517094211-cd3f3f0aea85 github.com/coreos/go-oidc/v3 v3.4.0 diff --git a/go.sum b/go.sum index 5e49f85e71..e9f289de7e 100644 --- a/go.sum +++ b/go.sum @@ -95,6 +95,8 @@ github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8V github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/antonmedv/expr v1.12.4 h1:YRkeF7r0cejMS47bDYe3Jyes7L9t1AhpunC+Duq+R9k= +github.com/antonmedv/expr v1.12.4/go.mod h1:FPC8iWArxls7axbVLsW+kpg1mz29A1b2M6jt+hZfDkU= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-metrics v0.4.0 h1:yCQqn7dwca4ITXb+CbubHmedzaQYHhNhrEXLYUeEe8Q= diff --git a/pkg/query-service/agentConf/db.go b/pkg/query-service/agentConf/db.go index c3a84f6b3a..3deb592778 100644 --- a/pkg/query-service/agentConf/db.go +++ b/pkg/query-service/agentConf/db.go @@ -104,7 +104,8 @@ func (r *Repo) GetLatestVersion(ctx context.Context, typ ElementTypeDef) (*Confi FROM agent_config_versions WHERE element_type=$2)`, typ, typ) if err != nil { - zap.S().Error("failed get latest config version for element:", typ, err) + // intially the table will be empty + return nil, err } return &c, err } @@ -115,8 +116,9 @@ func (r *Repo) insertConfig(ctx context.Context, userId string, c *ConfigVersion return fmt.Errorf("element type is required for creating agent config version") } - if len(elements) == 0 { - zap.S().Error("insert config called with no elements", c.ElementType) + // allowing empty elements for logs - use case is deleting all pipelines + if len(elements) == 0 && c.ElementType != ElementTypeLogPipelines { + zap.S().Error("insert config called with no elements ", c.ElementType) return fmt.Errorf("config must have atleast one element") } @@ -136,7 +138,12 @@ func (r *Repo) insertConfig(ctx context.Context, userId string, c *ConfigVersion } } - c.Version = updateVersion(configVersion.Version) + if configVersion != nil { + c.Version = updateVersion(configVersion.Version) + } else { + // first version + c.Version = 1 + } defer func() { if fnerr != nil { diff --git a/pkg/query-service/agentConf/manager.go b/pkg/query-service/agentConf/manager.go index b26d382070..e0b32ffc0c 100644 --- a/pkg/query-service/agentConf/manager.go +++ b/pkg/query-service/agentConf/manager.go @@ -169,7 +169,7 @@ func (m *Manager) OnConfigUpdate(agentId string, hash string, err error) { status := string(Deployed) - message := "deploy successful" + message := "Deployment was successful" defer func() { zap.S().Info(status, zap.String("agentId", agentId), zap.String("agentResponse", message)) @@ -225,6 +225,6 @@ func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData return err } - m.updateDeployStatus(ctx, ElementTypeLogPipelines, version, string(DeployInitiated), "Deployment started", configHash, string(rawPipelineData)) + m.updateDeployStatus(ctx, ElementTypeLogPipelines, version, string(DeployInitiated), "Deployment has started", configHash, string(rawPipelineData)) return nil } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 5008d6edae..46fcd00676 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -3,6 +3,7 @@ package app import ( "bytes" "context" + "database/sql" "encoding/json" "errors" "fmt" @@ -19,6 +20,7 @@ import ( jsoniter "github.com/json-iterator/go" _ "github.com/mattn/go-sqlite3" "github.com/prometheus/prometheus/promql" + "go.signoz.io/signoz/pkg/query-service/agentConf" "go.signoz.io/signoz/pkg/query-service/app/dashboards" "go.signoz.io/signoz/pkg/query-service/app/explorer" "go.signoz.io/signoz/pkg/query-service/app/logs" @@ -33,6 +35,7 @@ import ( v3 "go.signoz.io/signoz/pkg/query-service/model/v3" querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate" + "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/dao" am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" signozio "go.signoz.io/signoz/pkg/query-service/integrations/signozio" @@ -74,6 +77,8 @@ type APIHandler struct { preferDelta bool preferSpanMetrics bool + LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController + // SetupCompleted indicates if SigNoz is ready for general use. // at the moment, we mark the app ready when the first user // is registers. @@ -97,6 +102,9 @@ type APIHandlerOpts struct { // feature flags querier FeatureFlags interfaces.FeatureLookup + + // Log parsing pipelines + LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController } // NewAPIHandler returns an APIHandler @@ -108,14 +116,15 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { } aH := &APIHandler{ - reader: opts.Reader, - appDao: opts.AppDao, - skipConfig: opts.SkipConfig, - preferDelta: opts.PerferDelta, - preferSpanMetrics: opts.PreferSpanMetrics, - alertManager: alertManager, - ruleManager: opts.RuleManager, - featureFlags: opts.FeatureFlags, + reader: opts.Reader, + appDao: opts.AppDao, + skipConfig: opts.SkipConfig, + preferDelta: opts.PerferDelta, + preferSpanMetrics: opts.PreferSpanMetrics, + alertManager: alertManager, + ruleManager: opts.RuleManager, + featureFlags: opts.FeatureFlags, + LogsParsingPipelineController: opts.LogsParsingPipelineController, } builderOpts := queryBuilder.QueryBuilderOptions{ @@ -2240,6 +2249,10 @@ func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router, am *AuthMiddleware) subRouter.HandleFunc("/fields", am.ViewAccess(aH.logFields)).Methods(http.MethodGet) subRouter.HandleFunc("/fields", am.EditAccess(aH.logFieldUpdate)).Methods(http.MethodPost) subRouter.HandleFunc("/aggregate", am.ViewAccess(aH.logAggregate)).Methods(http.MethodGet) + + // log pipelines + subRouter.HandleFunc("/pipelines/{version}", am.ViewAccess(aH.listLogsPipelinesHandler)).Methods(http.MethodGet) + subRouter.HandleFunc("/pipelines", am.EditAccess(aH.createLogsPipeline)).Methods(http.MethodPost) } func (aH *APIHandler) logFields(w http.ResponseWriter, r *http.Request) { @@ -2351,6 +2364,131 @@ func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) { aH.WriteJSON(w, r, res) } +const logPipelines = "log_pipelines" + +func parseAgentConfigVersion(r *http.Request) (int, *model.ApiError) { + versionString := mux.Vars(r)["version"] + + if versionString == "latest" { + return -1, nil + } + + version64, err := strconv.ParseInt(versionString, 0, 8) + + if err != nil { + return 0, model.BadRequestStr("invalid version number") + } + + if version64 <= 0 { + return 0, model.BadRequestStr("invalid version number") + } + + return int(version64), nil +} + +func (ah *APIHandler) listLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { + + version, err := parseAgentConfigVersion(r) + if err != nil { + RespondError(w, err, nil) + return + } + + var payload *logparsingpipeline.PipelinesResponse + var apierr *model.ApiError + + if version != -1 { + payload, apierr = ah.listLogsPipelinesByVersion(context.Background(), version) + } else { + payload, apierr = ah.listLogsPipelines(context.Background()) + } + + if apierr != nil { + RespondError(w, apierr, payload) + return + } + ah.Respond(w, payload) +} + +// listLogsPipelines lists logs piplines for latest version +func (ah *APIHandler) listLogsPipelines(ctx context.Context) (*logparsingpipeline.PipelinesResponse, *model.ApiError) { + // get lateset agent config + lastestConfig, err := agentConf.GetLatestVersion(ctx, logPipelines) + if err != nil { + if err != sql.ErrNoRows { + return nil, model.InternalError(fmt.Errorf("failed to get latest agent config version with error %w", err)) + } else { + return nil, nil + } + } + + payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, lastestConfig.Version) + if err != nil { + return nil, model.InternalError(fmt.Errorf("failed to get pipelines with error %w", err)) + } + + // todo(Nitya): make a new API for history pagination + limit := 10 + history, err := agentConf.GetConfigHistory(ctx, logPipelines, limit) + if err != nil { + return nil, model.InternalError(fmt.Errorf("failed to get config history with error %w", err)) + } + payload.History = history + return payload, nil +} + +// listLogsPipelinesByVersion lists pipelines along with config version history +func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) (*logparsingpipeline.PipelinesResponse, *model.ApiError) { + payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version) + if err != nil { + return nil, model.InternalError(err) + } + + // todo(Nitya): make a new API for history pagination + limit := 10 + history, err := agentConf.GetConfigHistory(ctx, logPipelines, limit) + if err != nil { + return nil, model.InternalError(fmt.Errorf("failed to retrieve agent config history with error %w", err)) + } + + payload.History = history + return payload, nil +} + +func (ah *APIHandler) createLogsPipeline(w http.ResponseWriter, r *http.Request) { + + req := logparsingpipeline.PostablePipelines{} + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + RespondError(w, model.BadRequest(err), nil) + return + } + + ctx := auth.AttachJwtToContext(context.Background(), r) + + createPipeline := func(ctx context.Context, postable []logparsingpipeline.PostablePipeline) (*logparsingpipeline.PipelinesResponse, error) { + if len(postable) == 0 { + zap.S().Warnf("found no pipelines in the http request, this will delete all the pipelines") + } + + for _, p := range postable { + if err := p.IsValid(); err != nil { + return nil, model.BadRequestStr(err.Error()) + } + } + + return ah.LogsParsingPipelineController.ApplyPipelines(ctx, postable) + } + + res, err := createPipeline(ctx, req.Pipelines) + if err != nil { + RespondError(w, model.InternalError(err), nil) + return + } + + ah.Respond(w, res) +} + func (aH *APIHandler) getExplorerQueries(w http.ResponseWriter, r *http.Request) { queries, err := explorer.GetQueries() if err != nil { diff --git a/pkg/query-service/app/logparsingpipeline/controller.go b/pkg/query-service/app/logparsingpipeline/controller.go new file mode 100644 index 0000000000..3a1fb9e160 --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/controller.go @@ -0,0 +1,132 @@ +package logparsingpipeline + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/jmoiron/sqlx" + "go.signoz.io/signoz/pkg/query-service/agentConf" + "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/model" + "go.uber.org/zap" +) + +// Controller takes care of deployment cycle of log parsing pipelines. +type LogParsingPipelineController struct { + Repo +} + +func NewLogParsingPipelinesController(db *sqlx.DB, engine string) (*LogParsingPipelineController, error) { + repo := NewRepo(db) + err := repo.InitDB(engine) + return &LogParsingPipelineController{Repo: repo}, err +} + +// PipelinesResponse is used to prepare http response for pipelines config related requests +type PipelinesResponse struct { + *agentConf.ConfigVersion + + Pipelines []model.Pipeline `json:"pipelines"` + History []agentConf.ConfigVersion `json:"history"` +} + +// ApplyPipelines stores new or changed pipelines and initiates a new config update +func (ic *LogParsingPipelineController) ApplyPipelines(ctx context.Context, postable []PostablePipeline) (*PipelinesResponse, error) { + // get user id from context + userId, err := auth.ExtractUserIdFromContext(ctx) + if err != nil { + return nil, model.InternalError(fmt.Errorf("failed to get userId from context %v", err)) + } + + var pipelines []model.Pipeline + + // scan through postable pipelines, to select the existing pipelines or insert missing ones + for _, r := range postable { + + // note: we process only new and changed pipelines here, deleted pipelines are not expected + // from client. if user deletes a pipelines, the client should not send that pipelines in the update. + // in effect, the new config version will not have that pipelines. + + if r.Id == "" { + // looks like a new or changed pipeline, store it first + inserted, err := ic.insertPipeline(ctx, &r) + if err != nil || inserted == nil { + zap.S().Errorf("failed to insert edited pipeline %s", err.Error()) + return nil, fmt.Errorf("failed to insert edited pipeline") + } else { + pipelines = append(pipelines, *inserted) + } + } else { + selected, err := ic.GetPipeline(ctx, r.Id) + if err != nil || selected == nil { + zap.S().Errorf("failed to find edited pipeline %s", err.Error()) + return nil, fmt.Errorf("failed to find pipeline, invalid request") + } + pipelines = append(pipelines, *selected) + } + + } + + // prepare filter config (processor) from the pipelines + filterConfig, names, err := PreparePipelineProcessor(pipelines) + if err != nil { + zap.S().Errorf("failed to generate processor config from pipelines for deployment %s", err.Error()) + return nil, err + } + + if !agentConf.Ready() { + return nil, fmt.Errorf("agent updater unavailable at the moment. Please try in sometime") + } + + // prepare config elements + elements := make([]string, len(pipelines)) + for i, p := range pipelines { + elements[i] = p.Id + } + + // prepare config by calling gen func + cfg, err := agentConf.StartNewVersion(ctx, userId, agentConf.ElementTypeLogPipelines, elements) + if err != nil || cfg == nil { + return nil, err + } + + zap.S().Info("applying drop pipeline config", cfg) + // raw pipeline is needed since filterConfig doesn't contain inactive pipelines and operators + rawPipelineData, _ := json.Marshal(pipelines) + + // queue up the config to push to opamp + err = agentConf.UpsertLogParsingProcessor(ctx, cfg.Version, rawPipelineData, filterConfig, names) + history, _ := agentConf.GetConfigHistory(ctx, agentConf.ElementTypeLogPipelines, 10) + insertedCfg, _ := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, cfg.Version) + + response := &PipelinesResponse{ + ConfigVersion: insertedCfg, + Pipelines: pipelines, + History: history, + } + + if err != nil { + return response, fmt.Errorf("failed to apply pipelines") + } + return response, nil +} + +// GetPipelinesByVersion responds with version info and associated pipelines +func (ic *LogParsingPipelineController) GetPipelinesByVersion(ctx context.Context, version int) (*PipelinesResponse, error) { + pipelines, errors := ic.getPipelinesByVersion(ctx, version) + if errors != nil { + zap.S().Errorf("failed to get pipelines for version %d, %w", version, errors) + return nil, fmt.Errorf("failed to get pipelines for given version") + } + configVersion, err := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, version) + if err != nil || configVersion == nil { + zap.S().Errorf("failed to get config for version %d, %s", version, err.Error()) + return nil, fmt.Errorf("failed to get config for given version") + } + + return &PipelinesResponse{ + ConfigVersion: configVersion, + Pipelines: pipelines, + }, nil +} diff --git a/pkg/query-service/app/logparsingpipeline/db.go b/pkg/query-service/app/logparsingpipeline/db.go new file mode 100644 index 0000000000..ac6bc5ba3d --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/db.go @@ -0,0 +1,198 @@ +package logparsingpipeline + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline/sqlite" + "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/model" + "go.uber.org/zap" +) + +// Repo handles DDL and DML ops on ingestion pipeline +type Repo struct { + db *sqlx.DB +} + +const logPipelines = "log_pipelines" + +// NewRepo initiates a new ingestion repo +func NewRepo(db *sqlx.DB) Repo { + return Repo{ + db: db, + } +} + +func (r *Repo) InitDB(engine string) error { + switch engine { + case "sqlite3", "sqlite": + return sqlite.InitDB(r.db) + default: + return fmt.Errorf("unsupported db") + } +} + +// insertPipeline stores a given postable pipeline to database +func (r *Repo) insertPipeline(ctx context.Context, postable *PostablePipeline) (*model.Pipeline, error) { + if err := postable.IsValid(); err != nil { + return nil, errors.Wrap(err, "failed to validate postable pipeline") + } + + rawConfig, err := json.Marshal(postable.Config) + if err != nil { + return nil, errors.Wrap(err, "failed to unmarshal postable pipeline config") + } + + jwt, err := auth.ExtractJwtFromContext(ctx) + if err != nil { + return nil, err + } + + claims, err := auth.ParseJWT(jwt) + if err != nil { + return nil, err + } + + insertRow := &model.Pipeline{ + Id: uuid.New().String(), + OrderId: postable.OrderId, + Enabled: postable.Enabled, + Name: postable.Name, + Alias: postable.Alias, + Description: &postable.Description, + Filter: postable.Filter, + Config: postable.Config, + RawConfig: string(rawConfig), + Creator: model.Creator{ + CreatedBy: claims["email"].(string), + CreatedAt: time.Now(), + }, + } + + insertQuery := `INSERT INTO pipelines + (id, order_id, enabled, created_by, created_at, name, alias, description, filter, config_json) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)` + + _, err = r.db.ExecContext(ctx, + insertQuery, + insertRow.Id, + insertRow.OrderId, + insertRow.Enabled, + insertRow.Creator.CreatedBy, + insertRow.Creator.CreatedAt, + insertRow.Name, + insertRow.Alias, + insertRow.Description, + insertRow.Filter, + insertRow.RawConfig) + + if err != nil { + zap.S().Errorf("error in inserting pipeline data: ", zap.Error(err)) + return insertRow, errors.Wrap(err, "failed to insert pipeline") + } + + return insertRow, nil +} + +// getPipelinesByVersion returns pipelines associated with a given version +func (r *Repo) getPipelinesByVersion(ctx context.Context, version int) ([]model.Pipeline, []error) { + var errors []error + pipelines := []model.Pipeline{} + + versionQuery := `SELECT r.id, + r.name, + r.config_json, + r.alias, + r.description, + r.filter, + r.order_id, + r.created_by, + r.created_at, + r.enabled + FROM pipelines r, + agent_config_elements e, + agent_config_versions v + WHERE r.id = e.element_id + AND v.id = e.version_id + AND e.element_type = $1 + AND v.version = $2 + ORDER BY order_id asc` + + err := r.db.SelectContext(ctx, &pipelines, versionQuery, logPipelines, version) + if err != nil { + return nil, []error{fmt.Errorf("failed to get drop pipelines from db: %v", err)} + } + + if len(pipelines) == 0 { + return pipelines, nil + } + + for i := range pipelines { + if err := pipelines[i].ParseRawConfig(); err != nil { + errors = append(errors, err) + } + } + + return pipelines, errors +} + +// GetPipelines returns pipeline and errors (if any) +func (r *Repo) GetPipeline(ctx context.Context, id string) (*model.Pipeline, error) { + pipelines := []model.Pipeline{} + + pipelineQuery := `SELECT id, + name, + config_json, + alias, + description, + filter, + order_id, + created_by, + created_at, + enabled + FROM pipelines + WHERE id = $1` + + err := r.db.SelectContext(ctx, &pipelines, pipelineQuery, id) + if err != nil { + zap.S().Errorf("failed to get ingestion pipeline from db", err) + return nil, model.BadRequestStr("failed to get ingestion pipeline from db") + } + + if len(pipelines) == 0 { + zap.S().Warnf("No row found for ingestion pipeline id", id) + return nil, nil + } + + if len(pipelines) == 1 { + err := pipelines[0].ParseRawConfig() + if err != nil { + zap.S().Errorf("invalid pipeline config found", id, err) + return &pipelines[0], model.InternalError(fmt.Errorf("found an invalid pipeline config ")) + } + return &pipelines[0], nil + } + + return nil, model.InternalError(fmt.Errorf("multiple pipelines with same id")) + +} + +func (r *Repo) DeletePipeline(ctx context.Context, id string) error { + deleteQuery := `DELETE + FROM pipelines + WHERE id = $1` + + _, err := r.db.ExecContext(ctx, deleteQuery, id) + if err != nil { + return model.BadRequest(err) + } + + return nil + +} diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go new file mode 100644 index 0000000000..60f0e4df17 --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go @@ -0,0 +1,77 @@ +package logparsingpipeline + +import ( + "encoding/json" + "fmt" + + "go.signoz.io/signoz/pkg/query-service/constants" + "go.signoz.io/signoz/pkg/query-service/model" +) + +const ( + NOOP = "noop" +) + +func PreparePipelineProcessor(pipelines []model.Pipeline) (map[string]interface{}, []string, error) { + processors := map[string]interface{}{} + names := []string{} + for _, v := range pipelines { + if !v.Enabled { + continue + } + + operators := getOperators(v.Config) + if len(operators) == 0 { + continue + } + router := []model.PipelineOperator{ + { + ID: "router_signoz", + Type: "router", + Routes: &[]model.Route{ + { + Output: v.Config[0].ID, + Expr: v.Filter, + }, + }, + Default: NOOP, + }, + } + + v.Config = append(router, operators...) + + // noop operator is needed as the default operator so that logs are not dropped + noop := model.PipelineOperator{ + ID: NOOP, + Type: NOOP, + } + v.Config = append(v.Config, noop) + + processor := model.Processor{ + Operators: v.Config, + } + name := constants.LogsPPLPfx + v.Alias + processors[name] = processor + names = append(names, name) + } + return processors, names, nil +} + +func getOperators(ops []model.PipelineOperator) []model.PipelineOperator { + filteredOp := []model.PipelineOperator{} + for i, operator := range ops { + if operator.Enabled { + if i > 0 { + filteredOp[len(filteredOp)-1].Output = operator.ID + } + filteredOp = append(filteredOp, operator) + } else if i == len(ops)-1 && len(filteredOp) != 0 { + filteredOp[len(filteredOp)-1].Output = "" + } + } + for _, v := range filteredOp { + x, _ := json.Marshal(v) + fmt.Println(string(x)) + } + return filteredOp +} diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go new file mode 100644 index 0000000000..4973467d1b --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go @@ -0,0 +1,175 @@ +package logparsingpipeline + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + "go.signoz.io/signoz/pkg/query-service/model" +) + +var prepareProcessorTestData = []struct { + Name string + Operators []model.PipelineOperator + Output []model.PipelineOperator +}{ + { + Name: "Last operator disabled", + Operators: []model.PipelineOperator{ + { + ID: "t1", + Name: "t1", + Output: "t2", + Enabled: true, + }, + { + ID: "t2", + Name: "t2", + Enabled: false, + }, + }, + Output: []model.PipelineOperator{ + { + ID: "t1", + Name: "t1", + Enabled: true, + }, + }, + }, + { + Name: "Operator in middle disabled", + Operators: []model.PipelineOperator{ + { + ID: "t1", + Name: "t1", + Output: "t2", + Enabled: true, + }, + { + ID: "t2", + Name: "t2", + Output: "t3", + Enabled: false, + }, + { + ID: "t3", + Name: "t3", + Enabled: true, + }, + }, + Output: []model.PipelineOperator{ + { + ID: "t1", + Name: "t1", + Output: "t3", + Enabled: true, + }, + { + ID: "t3", + Name: "t3", + Enabled: true, + }, + }, + }, + { + Name: "Single operator disabled", + Operators: []model.PipelineOperator{ + { + ID: "t1", + Name: "t1", + Output: "t2", + Enabled: false, + }, + }, + Output: []model.PipelineOperator{}, + }, + { + Name: "Single operator enabled", + Operators: []model.PipelineOperator{ + { + ID: "t1", + Name: "t1", + Enabled: true, + }, + }, + Output: []model.PipelineOperator{ + { + ID: "t1", + Name: "t1", + Enabled: true, + }, + }, + }, + { + Name: "Empty operator", + Operators: []model.PipelineOperator{}, + Output: []model.PipelineOperator{}, + }, + { + Name: "new test", + Operators: []model.PipelineOperator{ + { + ID: "move_filename", + Output: "move_function", + Enabled: true, + Name: "move_filename", + }, + { + ID: "move_function", + Output: "move_line", + Enabled: false, + Name: "move_function", + }, + { + ID: "move_line", + Output: "move_lwp", + Enabled: true, + Name: "move_line", + }, + { + ID: "move_lwp", + Output: "move_callid", + Enabled: true, + Name: "move_lwp", + }, + { + ID: "move_callid", + Enabled: true, + Name: "move_lwp", + }, + }, + Output: []model.PipelineOperator{ + { + ID: "move_filename", + Output: "move_line", + Enabled: true, + Name: "move_filename", + }, + { + ID: "move_line", + Output: "move_lwp", + Enabled: true, + Name: "move_line", + }, + { + ID: "move_lwp", + Output: "move_callid", + Enabled: true, + Name: "move_lwp", + }, + { + ID: "move_callid", + Enabled: true, + Name: "move_lwp", + }, + }, + }, +} + +func TestPreparePipelineProcessor(t *testing.T) { + for _, test := range prepareProcessorTestData { + Convey(test.Name, t, func() { + res := getOperators(test.Operators) + So(res, ShouldResemble, test.Output) + }) + } +} diff --git a/pkg/query-service/app/logparsingpipeline/postablePipeline.go b/pkg/query-service/app/logparsingpipeline/postablePipeline.go new file mode 100644 index 0000000000..2deda650bd --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/postablePipeline.go @@ -0,0 +1,182 @@ +package logparsingpipeline + +import ( + "errors" + "fmt" + "regexp" + "strings" + + "github.com/antonmedv/expr" + + "go.signoz.io/signoz/pkg/query-service/model" +) + +// PostablePipelines are a list of user defined pielines +type PostablePipelines struct { + Pipelines []PostablePipeline `json:"pipelines"` +} + +// PostablePipeline captures user inputs in setting the pipeline + +type PostablePipeline struct { + Id string `json:"id"` + OrderId int `json:"orderId"` + Name string `json:"name"` + Alias string `json:"alias"` + Description string `json:"description"` + Enabled bool `json:"enabled"` + Filter string `json:"filter"` + Config []model.PipelineOperator `json:"config"` +} + +// IsValid checks if postable pipeline has all the required params +func (p *PostablePipeline) IsValid() error { + if p.OrderId == 0 { + return fmt.Errorf("orderId with value > 1 is required") + } + if p.Name == "" { + return fmt.Errorf("pipeline name is required") + } + + if p.Alias == "" { + return fmt.Errorf("pipeline alias is required") + } + + if p.Filter == "" { + return fmt.Errorf("pipeline filter is required") + } + + // check the expression + _, err := expr.Compile(p.Filter, expr.AsBool(), expr.AllowUndefinedVariables()) + if err != nil { + return fmt.Errorf(fmt.Sprintf("filter for pipeline %v is not correct: %v", p.Name, err.Error())) + } + + idUnique := map[string]struct{}{} + outputUnique := map[string]struct{}{} + + l := len(p.Config) + for i, op := range p.Config { + if op.OrderId == 0 { + return fmt.Errorf("orderId with value > 1 is required in operator") + } + if op.ID == "" { + return fmt.Errorf("id of an operator cannot be empty") + } + if op.Type == "" { + return fmt.Errorf("type of an operator cannot be empty") + } + if i != (l-1) && op.Output == "" { + return fmt.Errorf(fmt.Sprintf("Output of operator %s cannot be nil", op.ID)) + } + if i == (l-1) && op.Output != "" { + return fmt.Errorf(fmt.Sprintf("Output of operator %s should be empty", op.ID)) + } + + if _, ok := idUnique[op.ID]; ok { + return fmt.Errorf("duplicate id cannot be present") + } + if _, ok := outputUnique[op.Output]; ok { + return fmt.Errorf("duplicate output cannot be present") + } + + if op.ID == op.Output { + return fmt.Errorf("id and output cannot be same") + } + + err := isValidOperator(op) + if err != nil { + return err + } + + idUnique[op.ID] = struct{}{} + outputUnique[op.Output] = struct{}{} + } + return nil +} + +func isValidOperator(op model.PipelineOperator) error { + if op.ID == "" { + return errors.New("PipelineOperator.ID is required.") + } + + switch op.Type { + case "json_parser": + if op.ParseFrom == "" && op.ParseTo == "" { + return fmt.Errorf(fmt.Sprintf("parse from and parse to of %s json operator cannot be empty", op.ID)) + } + case "grok_parser": + if op.Pattern == "" { + return fmt.Errorf(fmt.Sprintf("pattern of %s grok operator cannot be empty", op.ID)) + } + case "regex_parser": + if op.Regex == "" { + return fmt.Errorf(fmt.Sprintf("regex of %s regex operator cannot be empty", op.ID)) + } + r, err := regexp.Compile(op.Regex) + if err != nil { + return fmt.Errorf(fmt.Sprintf("error compiling regex expression of %s regex operator", op.ID)) + } + namedCaptureGroups := 0 + for _, groupName := range r.SubexpNames() { + if groupName != "" { + namedCaptureGroups++ + } + } + if namedCaptureGroups == 0 { + return fmt.Errorf(fmt.Sprintf("no capture groups in regex expression of %s regex operator", op.ID)) + } + case "copy": + if op.From == "" || op.To == "" { + return fmt.Errorf(fmt.Sprintf("from or to of %s copy operator cannot be empty", op.ID)) + } + case "move": + if op.From == "" || op.To == "" { + return fmt.Errorf(fmt.Sprintf("from or to of %s move operator cannot be empty", op.ID)) + } + case "add": + if op.Field == "" || op.Value == "" { + return fmt.Errorf(fmt.Sprintf("field or value of %s add operator cannot be empty", op.ID)) + } + case "remove": + if op.Field == "" { + return fmt.Errorf(fmt.Sprintf("field of %s remove operator cannot be empty", op.ID)) + } + case "traceParser": + if op.TraceParser == nil { + return fmt.Errorf(fmt.Sprintf("field of %s remove operator cannot be empty", op.ID)) + } + + if op.TraceParser.SpanId.ParseFrom == "" && op.TraceParser.TraceId.ParseFrom == "" && op.TraceParser.TraceFlags.ParseFrom == "" { + return fmt.Errorf(fmt.Sprintf("one of trace_id,span_id,parse_from of %s traceParser operator must be present", op.ID)) + } + case "retain": + if len(op.Fields) == 0 { + return fmt.Errorf(fmt.Sprintf("fields of %s retain operator cannot be empty", op.ID)) + } + default: + return fmt.Errorf(fmt.Sprintf("operator type %s not supported for %s, use one of (grok_parser, regex_parser, copy, move, add, remove, traceParser, retain)", op.Type, op.ID)) + } + + if !isValidOtelValue(op.ParseFrom) || + !isValidOtelValue(op.ParseTo) || + !isValidOtelValue(op.From) || + !isValidOtelValue(op.To) || + !isValidOtelValue(op.Field) { + valueErrStr := "value should have prefix of body, attributes, resource" + return fmt.Errorf(fmt.Sprintf("%s for operator Id %s", valueErrStr, op.ID)) + } + return nil +} + +func isValidOtelValue(val string) bool { + if val == "" { + return true + } + if !strings.HasPrefix(val, "body") && + !strings.HasPrefix(val, "attributes") && + !strings.HasPrefix(val, "resource") { + return false + } + return true +} diff --git a/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go b/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go new file mode 100644 index 0000000000..ab9ed4414f --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go @@ -0,0 +1,239 @@ +package logparsingpipeline + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + "go.signoz.io/signoz/pkg/query-service/model" +) + +var correctQueriesTest = []struct { + Name string + Pipeline PostablePipeline + IsValid bool +}{ + { + Name: "No orderId", + Pipeline: PostablePipeline{ + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: "attributes.method == \"GET\"", + Config: []model.PipelineOperator{}, + }, + IsValid: false, + }, + { + Name: "Invalid orderId", + Pipeline: PostablePipeline{ + OrderId: 0, + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: "attributes.method == \"GET\"", + Config: []model.PipelineOperator{}, + }, + IsValid: false, + }, + { + Name: "Valid orderId", + Pipeline: PostablePipeline{ + OrderId: 1, + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: "attributes.method == \"GET\"", + Config: []model.PipelineOperator{}, + }, + IsValid: true, + }, + { + Name: "Invalid filter", + Pipeline: PostablePipeline{ + OrderId: 1, + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: "test filter", + }, + IsValid: false, + }, + { + Name: "Valid filter", + Pipeline: PostablePipeline{ + OrderId: 1, + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: "attributes.method == \"GET\"", + }, + IsValid: true, + }, +} + +func TestIsValidPostablePipeline(t *testing.T) { + for _, test := range correctQueriesTest { + Convey(test.Name, t, func() { + err := test.Pipeline.IsValid() + if test.IsValid { + So(err, ShouldBeNil) + } else { + So(err, ShouldBeError) + } + }) + } +} + +var operatorTest = []struct { + Name string + Operator model.PipelineOperator + IsValid bool +}{ + { + Name: "Operator - without id", + Operator: model.PipelineOperator{ + Type: "remove", + Field: "attributes.abc", + }, + IsValid: false, + }, + { + Name: "Operator - without type", + Operator: model.PipelineOperator{ + ID: "test", + Field: "attributes.abc", + }, + IsValid: false, + }, + { + Name: "Copy - invalid to and from", + Operator: model.PipelineOperator{ + ID: "copy", + Type: "copy", + From: "date", + To: "attributes", + }, + IsValid: false, + }, + { + Name: "Move - invalid to and from", + Operator: model.PipelineOperator{ + ID: "move", + Type: "move", + From: "attributes", + To: "data", + }, + IsValid: false, + }, + { + Name: "Add - invalid to and from", + Operator: model.PipelineOperator{ + ID: "add", + Type: "add", + Field: "data", + }, + IsValid: false, + }, + { + Name: "Remove - invalid to and from", + Operator: model.PipelineOperator{ + ID: "remove", + Type: "remove", + Field: "data", + }, + IsValid: false, + }, + { + Name: "Add - valid", + Operator: model.PipelineOperator{ + ID: "add", + Type: "add", + Field: "body", + Value: "val", + }, + IsValid: true, + }, + { + Name: "Move - valid", + Operator: model.PipelineOperator{ + ID: "move", + Type: "move", + From: "attributes.x1", + To: "attributes.x2", + }, + IsValid: true, + }, + { + Name: "Copy - valid", + Operator: model.PipelineOperator{ + ID: "copy", + Type: "copy", + From: "resource.x1", + To: "resource.x2", + }, + IsValid: true, + }, + { + Name: "Unknown operator", + Operator: model.PipelineOperator{ + ID: "copy", + Type: "operator", + From: "resource.x1", + To: "resource.x2", + }, + IsValid: false, + }, + { + Name: "Grok - valid", + Operator: model.PipelineOperator{ + ID: "grok", + Type: "grok_parser", + Pattern: "%{COMMONAPACHELOG}", + ParseTo: "attributes", + }, + IsValid: true, + }, + { + Name: "Grok - invalid", + Operator: model.PipelineOperator{ + ID: "grok", + Type: "grok_parser", + Pattern: "%{COMMONAPACHELOG}", + ParseTo: "test", + }, + IsValid: false, + }, + { + Name: "Regex - valid", + Operator: model.PipelineOperator{ + ID: "regex", + Type: "regex_parser", + Regex: "(?P