From 1dfebed93a5dcc545723a328b282b735a04c518b Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Mon, 24 Mar 2025 10:17:12 +0530 Subject: [PATCH] fix: pipelines postgres support and multitenancy (#7371) * fix: pipelines postgres support and multitenancy * fix: minor fixes * fix: address minor comments * fix: rename package pipelinetypes --- ee/query-service/app/server.go | 2 +- pkg/query-service/agentConf/manager.go | 4 +- pkg/query-service/app/http_handler.go | 26 ++- .../app/integrations/controller.go | 4 +- pkg/query-service/app/integrations/manager.go | 33 +-- .../app/integrations/pipeline_utils.go | 4 +- .../app/integrations/test_utils.go | 10 +- .../logparsingpipeline/collector_config.go | 3 +- .../collector_config_test.go | 47 +++-- .../app/logparsingpipeline/controller.go | 85 ++++---- .../app/logparsingpipeline/db.go | 194 +++++++++--------- .../app/logparsingpipeline/model.go | 104 ---------- .../app/logparsingpipeline/pipelineBuilder.go | 23 ++- .../pipelineBuilder_test.go | 177 +++++++++------- .../app/logparsingpipeline/preview.go | 3 +- .../app/logparsingpipeline/preview_test.go | 41 ++-- .../app/logparsingpipeline/processors_test.go | 129 +++++++----- .../severity_parser_test.go | 43 ++-- .../logparsingpipeline/time_parser_test.go | 69 +------ pkg/query-service/app/opamp/model/agent.go | 3 + pkg/query-service/app/server.go | 2 +- .../integration/logparsingpipeline_test.go | 92 +++++---- .../integration/signoz_integrations_test.go | 32 +-- .../tests/integration/test_utils.go | 7 + pkg/query-service/utils/testutils.go | 1 + pkg/signoz/provider.go | 1 + pkg/sqlmigration/017_update_pipelines.go | 96 +++++++++ pkg/types/pipeline.go | 22 -- .../pipelinetypes/pipeline.go} | 119 ++++++++++- .../pipelinetypes/postable_pipeline_test.go} | 10 +- .../pipelinetypes}/time_parser.go | 2 +- pkg/types/pipelinetypes/time_parser_test.go | 55 +++++ 32 files changed, 815 insertions(+), 628 deletions(-) delete mode 100644 pkg/query-service/app/logparsingpipeline/model.go create mode 100644 pkg/sqlmigration/017_update_pipelines.go delete mode 100644 pkg/types/pipeline.go rename pkg/{query-service/app/logparsingpipeline/postablePipeline.go => types/pipelinetypes/pipeline.go} (64%) rename pkg/{query-service/app/logparsingpipeline/postablePipeline_test.go => types/pipelinetypes/postable_pipeline_test.go} (98%) rename pkg/{query-service/app/logparsingpipeline => types/pipelinetypes}/time_parser.go (99%) create mode 100644 pkg/types/pipelinetypes/time_parser_test.go diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 870b2bee09..bc44bfb29b 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -217,7 +217,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { // ingestion pipelines manager logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController( - serverOptions.SigNoz.SQLStore.SQLxDB(), integrationsController.GetPipelinesForInstalledIntegrations, + serverOptions.SigNoz.SQLStore, integrationsController.GetPipelinesForInstalledIntegrations, ) if err != nil { return nil, err diff --git a/pkg/query-service/agentConf/manager.go b/pkg/query-service/agentConf/manager.go index 8a9da7741f..ba912a4e49 100644 --- a/pkg/query-service/agentConf/manager.go +++ b/pkg/query-service/agentConf/manager.go @@ -106,9 +106,7 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) ( return nil, "", errors.Wrap(apiErr.ToError(), "failed to get latest agent config version") } - updatedConf, serializedSettingsUsed, apiErr := feature.RecommendAgentConfig( - recommendation, latestConfig, - ) + updatedConf, serializedSettingsUsed, apiErr := feature.RecommendAgentConfig(recommendation, latestConfig) if apiErr != nil { return nil, "", errors.Wrap(apiErr.ToError(), fmt.Sprintf( "failed to generate agent config recommendation for %s", featureType, diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 9873623a99..7a2172d56c 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -60,6 +60,7 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/postprocess" "github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types/authtypes" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "go.uber.org/zap" @@ -4462,6 +4463,11 @@ func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http } func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } version, err := parseAgentConfigVersion(r) if err != nil { @@ -4473,9 +4479,9 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re var apierr *model.ApiError if version != -1 { - payload, apierr = aH.listLogsPipelinesByVersion(context.Background(), version) + payload, apierr = aH.listLogsPipelinesByVersion(context.Background(), claims.OrgID, version) } else { - payload, apierr = aH.listLogsPipelines(context.Background()) + payload, apierr = aH.listLogsPipelines(context.Background(), claims.OrgID) } if apierr != nil { @@ -4486,7 +4492,7 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re } // listLogsPipelines lists logs piplines for latest version -func (aH *APIHandler) listLogsPipelines(ctx context.Context) ( +func (aH *APIHandler) listLogsPipelines(ctx context.Context, orgID string) ( *logparsingpipeline.PipelinesResponse, *model.ApiError, ) { // get lateset agent config @@ -4516,7 +4522,7 @@ func (aH *APIHandler) listLogsPipelines(ctx context.Context) ( } // listLogsPipelinesByVersion lists pipelines along with config version history -func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) ( +func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, orgID string, version int) ( *logparsingpipeline.PipelinesResponse, *model.ApiError, ) { payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version) @@ -4537,7 +4543,13 @@ func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version in func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) { - req := logparsingpipeline.PostablePipelines{} + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } + + req := pipelinetypes.PostablePipelines{} if err := json.NewDecoder(r.Body).Decode(&req); err != nil { RespondError(w, model.BadRequest(err), nil) @@ -4546,7 +4558,7 @@ func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) createPipeline := func( ctx context.Context, - postable []logparsingpipeline.PostablePipeline, + postable []pipelinetypes.PostablePipeline, ) (*logparsingpipeline.PipelinesResponse, *model.ApiError) { if len(postable) == 0 { zap.L().Warn("found no pipelines in the http request, this will delete all the pipelines") @@ -4557,7 +4569,7 @@ func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) return nil, validationErr } - return aH.LogsParsingPipelineController.ApplyPipelines(ctx, postable) + return aH.LogsParsingPipelineController.ApplyPipelines(ctx, claims.OrgID, postable) } res, err := createPipeline(r.Context(), req.Pipelines) diff --git a/pkg/query-service/app/integrations/controller.go b/pkg/query-service/app/integrations/controller.go index d2c987d6da..d2fbd7c552 100644 --- a/pkg/query-service/app/integrations/controller.go +++ b/pkg/query-service/app/integrations/controller.go @@ -5,10 +5,10 @@ import ( "fmt" "github.com/SigNoz/signoz/pkg/query-service/agentConf" - "github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline" "github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/types" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" ) type Controller struct { @@ -124,7 +124,7 @@ func (c *Controller) Uninstall( func (c *Controller) GetPipelinesForInstalledIntegrations( ctx context.Context, -) ([]logparsingpipeline.Pipeline, *model.ApiError) { +) ([]pipelinetypes.GettablePipeline, *model.ApiError) { return c.mgr.GetPipelinesForInstalledIntegrations(ctx) } diff --git a/pkg/query-service/app/integrations/manager.go b/pkg/query-service/app/integrations/manager.go index 26a1ab6c1c..3e4a45bbad 100644 --- a/pkg/query-service/app/integrations/manager.go +++ b/pkg/query-service/app/integrations/manager.go @@ -7,11 +7,11 @@ import ( "strings" "time" - "github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline" "github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/rules" "github.com/SigNoz/signoz/pkg/query-service/utils" "github.com/SigNoz/signoz/pkg/types" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/google/uuid" "github.com/jmoiron/sqlx" ) @@ -39,7 +39,7 @@ type IntegrationAssets struct { } type LogsAssets struct { - Pipelines []logparsingpipeline.PostablePipeline `json:"pipelines"` + Pipelines []pipelinetypes.PostablePipeline `json:"pipelines"` } type IntegrationConfigStep struct { @@ -257,33 +257,34 @@ func (m *Manager) UninstallIntegration( func (m *Manager) GetPipelinesForInstalledIntegrations( ctx context.Context, -) ([]logparsingpipeline.Pipeline, *model.ApiError) { +) ([]pipelinetypes.GettablePipeline, *model.ApiError) { installedIntegrations, apiErr := m.getInstalledIntegrations(ctx) if apiErr != nil { return nil, apiErr } - pipelines := []logparsingpipeline.Pipeline{} + gettablePipelines := []pipelinetypes.GettablePipeline{} for _, ii := range installedIntegrations { for _, p := range ii.Assets.Logs.Pipelines { - pp := logparsingpipeline.Pipeline{ + gettablePipelines = append(gettablePipelines, pipelinetypes.GettablePipeline{ // Alias is used for identifying integration pipelines. Id can't be used for this // since versioning while saving pipelines requires a new id for each version // to avoid altering history when pipelines are edited/reordered etc - Alias: AliasForIntegrationPipeline(ii.Id, p.Alias), - Id: uuid.NewString(), - OrderId: p.OrderId, - Enabled: p.Enabled, - Name: p.Name, - Description: &p.Description, - Filter: p.Filter, - Config: p.Config, - } - pipelines = append(pipelines, pp) + StoreablePipeline: pipelinetypes.StoreablePipeline{ + Alias: AliasForIntegrationPipeline(ii.Id, p.Alias), + ID: uuid.NewString(), + OrderID: p.OrderID, + Enabled: p.Enabled, + Name: p.Name, + Description: p.Description, + }, + Filter: p.Filter, + Config: p.Config, + }) } } - return pipelines, nil + return gettablePipelines, nil } func (m *Manager) dashboardUuid(integrationId string, dashboardId string) string { diff --git a/pkg/query-service/app/integrations/pipeline_utils.go b/pkg/query-service/app/integrations/pipeline_utils.go index f9e78f3a53..0e74eb77e6 100644 --- a/pkg/query-service/app/integrations/pipeline_utils.go +++ b/pkg/query-service/app/integrations/pipeline_utils.go @@ -3,8 +3,8 @@ package integrations import ( "strings" - "github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline" "github.com/SigNoz/signoz/pkg/query-service/constants" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" ) const IntegrationPipelineIdSeparator string = "--" @@ -20,7 +20,7 @@ func AliasForIntegrationPipeline( // Returns ptr to integration_id string if `p` is a pipeline for an installed integration. // Returns null otherwise. -func IntegrationIdForPipeline(p logparsingpipeline.Pipeline) *string { +func IntegrationIdForPipeline(p pipelinetypes.GettablePipeline) *string { if strings.HasPrefix(p.Alias, constants.IntegrationPipelineIdPrefix) { parts := strings.Split(p.Alias, IntegrationPipelineIdSeparator) if len(parts) < 2 { diff --git a/pkg/query-service/app/integrations/test_utils.go b/pkg/query-service/app/integrations/test_utils.go index 187551e881..03b0a536bc 100644 --- a/pkg/query-service/app/integrations/test_utils.go +++ b/pkg/query-service/app/integrations/test_utils.go @@ -5,12 +5,12 @@ import ( "slices" "testing" - "github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline" "github.com/SigNoz/signoz/pkg/query-service/model" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" "github.com/SigNoz/signoz/pkg/query-service/rules" "github.com/SigNoz/signoz/pkg/query-service/utils" "github.com/SigNoz/signoz/pkg/types" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" ) func NewTestIntegrationsManager(t *testing.T) *Manager { @@ -59,7 +59,7 @@ func (t *TestAvailableIntegrationsRepo) list( }, Assets: IntegrationAssets{ Logs: LogsAssets{ - Pipelines: []logparsingpipeline.PostablePipeline{ + Pipelines: []pipelinetypes.PostablePipeline{ { Name: "pipeline1", Alias: "pipeline1", @@ -78,7 +78,7 @@ func (t *TestAvailableIntegrationsRepo) list( }, }, }, - Config: []logparsingpipeline.PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "add", @@ -127,7 +127,7 @@ func (t *TestAvailableIntegrationsRepo) list( }, Assets: IntegrationAssets{ Logs: LogsAssets{ - Pipelines: []logparsingpipeline.PostablePipeline{ + Pipelines: []pipelinetypes.PostablePipeline{ { Name: "pipeline2", Alias: "pipeline2", @@ -146,7 +146,7 @@ func (t *TestAvailableIntegrationsRepo) list( }, }, }, - Config: []logparsingpipeline.PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "add", diff --git a/pkg/query-service/app/logparsingpipeline/collector_config.go b/pkg/query-service/app/logparsingpipeline/collector_config.go index de4c3a17e3..367e3f8294 100644 --- a/pkg/query-service/app/logparsingpipeline/collector_config.go +++ b/pkg/query-service/app/logparsingpipeline/collector_config.go @@ -10,6 +10,7 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/constants" coreModel "github.com/SigNoz/signoz/pkg/query-service/model" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -164,7 +165,7 @@ func checkDuplicateString(pipeline []string) bool { func GenerateCollectorConfigWithPipelines( config []byte, - pipelines []Pipeline, + pipelines []pipelinetypes.GettablePipeline, ) ([]byte, *coreModel.ApiError) { var collectorConf map[string]interface{} err := yaml.Unmarshal([]byte(config), &collectorConf) diff --git a/pkg/query-service/app/logparsingpipeline/collector_config_test.go b/pkg/query-service/app/logparsingpipeline/collector_config_test.go index 79f3a4a367..45bc8bc038 100644 --- a/pkg/query-service/app/logparsingpipeline/collector_config_test.go +++ b/pkg/query-service/app/logparsingpipeline/collector_config_test.go @@ -8,6 +8,7 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/constants" "github.com/SigNoz/signoz/pkg/query-service/model" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" . "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" @@ -230,12 +231,14 @@ func TestPipelineAliasCollisionsDontResultInDuplicateCollectorProcessors(t *test - memory `) - makeTestPipeline := func(name string, alias string) Pipeline { - return Pipeline{ - OrderId: 1, - Name: name, - Alias: alias, - Enabled: true, + makeTestPipeline := func(name string, alias string) pipelinetypes.GettablePipeline { + return pipelinetypes.GettablePipeline{ + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: name, + Alias: alias, + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -250,7 +253,7 @@ func TestPipelineAliasCollisionsDontResultInDuplicateCollectorProcessors(t *test }, }, }, - Config: []PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { ID: "regex", Type: "regex_parser", @@ -264,7 +267,7 @@ func TestPipelineAliasCollisionsDontResultInDuplicateCollectorProcessors(t *test } } - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ makeTestPipeline("test pipeline 1", "pipeline-alias"), makeTestPipeline("test pipeline 2", "pipeline-alias"), } @@ -299,12 +302,14 @@ func TestPipelineAliasCollisionsDontResultInDuplicateCollectorProcessors(t *test func TestPipelineRouterWorksEvenIfFirstOpIsDisabled(t *testing.T) { require := require.New(t) - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ { - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -319,7 +324,7 @@ func TestPipelineRouterWorksEvenIfFirstOpIsDisabled(t *testing.T) { }, }, }, - Config: []PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "add", @@ -370,12 +375,14 @@ func TestPipelineRouterWorksEvenIfFirstOpIsDisabled(t *testing.T) { func TestPipeCharInAliasDoesntBreakCollectorConfig(t *testing.T) { require := require.New(t) - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ { - OrderId: 1, - Name: "test | pipeline", - Alias: "test|pipeline", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "test | pipeline", + Alias: "test|pipeline", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -390,7 +397,7 @@ func TestPipeCharInAliasDoesntBreakCollectorConfig(t *testing.T) { }, }, }, - Config: []PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "add", diff --git a/pkg/query-service/app/logparsingpipeline/controller.go b/pkg/query-service/app/logparsingpipeline/controller.go index dfd0c8ffb5..20cec17ad7 100644 --- a/pkg/query-service/app/logparsingpipeline/controller.go +++ b/pkg/query-service/app/logparsingpipeline/controller.go @@ -11,9 +11,10 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/constants" "github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/utils" + "github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/types/authtypes" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/google/uuid" - "github.com/jmoiron/sqlx" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -22,14 +23,14 @@ import ( type LogParsingPipelineController struct { Repo - GetIntegrationPipelines func(context.Context) ([]Pipeline, *model.ApiError) + GetIntegrationPipelines func(context.Context) ([]pipelinetypes.GettablePipeline, *model.ApiError) } func NewLogParsingPipelinesController( - db *sqlx.DB, - getIntegrationPipelines func(context.Context) ([]Pipeline, *model.ApiError), + sqlStore sqlstore.SQLStore, + getIntegrationPipelines func(context.Context) ([]pipelinetypes.GettablePipeline, *model.ApiError), ) (*LogParsingPipelineController, error) { - repo := NewRepo(db) + repo := NewRepo(sqlStore) return &LogParsingPipelineController{ Repo: repo, GetIntegrationPipelines: getIntegrationPipelines, @@ -40,14 +41,15 @@ func NewLogParsingPipelinesController( type PipelinesResponse struct { *agentConf.ConfigVersion - Pipelines []Pipeline `json:"pipelines"` - History []agentConf.ConfigVersion `json:"history"` + Pipelines []pipelinetypes.GettablePipeline `json:"pipelines"` + History []agentConf.ConfigVersion `json:"history"` } // ApplyPipelines stores new or changed pipelines and initiates a new config update func (ic *LogParsingPipelineController) ApplyPipelines( ctx context.Context, - postable []PostablePipeline, + orgID string, + postable []pipelinetypes.PostablePipeline, ) (*PipelinesResponse, *model.ApiError) { // get user id from context claims, ok := authtypes.ClaimsFromContext(ctx) @@ -55,7 +57,7 @@ func (ic *LogParsingPipelineController) ApplyPipelines( return nil, model.UnauthorizedError(fmt.Errorf("failed to get userId from context")) } - var pipelines []Pipeline + var pipelines []pipelinetypes.GettablePipeline // scan through postable pipelines, to select the existing pipelines or insert missing ones for idx, r := range postable { @@ -67,9 +69,9 @@ func (ic *LogParsingPipelineController) ApplyPipelines( // For versioning, pipelines get stored with unique ids each time they are saved. // This ensures updating a pipeline doesn't alter historical versions that referenced // the same pipeline id. - r.Id = uuid.NewString() - r.OrderId = idx + 1 - pipeline, apiErr := ic.insertPipeline(ctx, &r) + r.ID = uuid.NewString() + r.OrderID = idx + 1 + pipeline, apiErr := ic.insertPipeline(ctx, orgID, &r) if apiErr != nil { return nil, model.WrapApiError(apiErr, "failed to insert pipeline") } @@ -80,7 +82,7 @@ func (ic *LogParsingPipelineController) ApplyPipelines( // prepare config elements elements := make([]string, len(pipelines)) for i, p := range pipelines { - elements[i] = p.Id + elements[i] = p.ID } // prepare config by calling gen func @@ -94,7 +96,7 @@ func (ic *LogParsingPipelineController) ApplyPipelines( func (ic *LogParsingPipelineController) ValidatePipelines( ctx context.Context, - postedPipelines []PostablePipeline, + postedPipelines []pipelinetypes.PostablePipeline, ) *model.ApiError { for _, p := range postedPipelines { if err := p.IsValid(); err != nil { @@ -104,23 +106,25 @@ func (ic *LogParsingPipelineController) ValidatePipelines( // Also run a collector simulation to ensure config is fit // for e2e use with a collector - pipelines := []Pipeline{} + gettablePipelines := []pipelinetypes.GettablePipeline{} for _, pp := range postedPipelines { - pipelines = append(pipelines, Pipeline{ - Id: uuid.New().String(), - OrderId: pp.OrderId, - Enabled: pp.Enabled, - Name: pp.Name, - Alias: pp.Alias, - Description: &pp.Description, - Filter: pp.Filter, - Config: pp.Config, + gettablePipelines = append(gettablePipelines, pipelinetypes.GettablePipeline{ + StoreablePipeline: pipelinetypes.StoreablePipeline{ + ID: uuid.New().String(), + OrderID: pp.OrderID, + Enabled: pp.Enabled, + Name: pp.Name, + Alias: pp.Alias, + Description: pp.Description, + }, + Filter: pp.Filter, + Config: pp.Config, }) } sampleLogs := []model.SignozLog{{Body: ""}} _, _, simulationErr := SimulatePipelinesProcessing( - ctx, pipelines, sampleLogs, + ctx, gettablePipelines, sampleLogs, ) if simulationErr != nil { return model.BadRequest(fmt.Errorf( @@ -135,14 +139,22 @@ func (ic *LogParsingPipelineController) ValidatePipelines( // pipelines and pipelines for installed integrations func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion( ctx context.Context, version int, -) ([]Pipeline, *model.ApiError) { - result := []Pipeline{} +) ([]pipelinetypes.GettablePipeline, *model.ApiError) { + result := []pipelinetypes.GettablePipeline{} + + // todo(nitya): remove this once we fix agents in multitenancy + defaultOrgID, err := ic.GetDefaultOrgID(ctx) + if err != nil { + return nil, model.WrapApiError(err, "failed to get default org ID") + } + + fmt.Println("defaultOrgID", defaultOrgID) if version >= 0 { - savedPipelines, errors := ic.getPipelinesByVersion(ctx, version) + savedPipelines, errors := ic.getPipelinesByVersion(ctx, defaultOrgID, version) if errors != nil { zap.L().Error("failed to get pipelines for version", zap.Int("version", version), zap.Errors("errors", errors)) - return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version")) + return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version %v", errors)) } result = savedPipelines } @@ -156,10 +168,10 @@ func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion( // Filter out any integration pipelines included in pipelines saved by user // if the corresponding integration is no longer installed. - ipAliases := utils.MapSlice(integrationPipelines, func(p Pipeline) string { + ipAliases := utils.MapSlice(integrationPipelines, func(p pipelinetypes.GettablePipeline) string { return p.Alias }) - result = utils.FilterSlice(result, func(p Pipeline) bool { + result = utils.FilterSlice(result, func(p pipelinetypes.GettablePipeline) bool { if !strings.HasPrefix(p.Alias, constants.IntegrationPipelineIdPrefix) { return true } @@ -170,7 +182,7 @@ func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion( // Users are allowed to enable/disable and reorder integration pipelines while // saving the pipeline list. for _, ip := range integrationPipelines { - userPipelineIdx := slices.IndexFunc(result, func(p Pipeline) bool { + userPipelineIdx := slices.IndexFunc(result, func(p pipelinetypes.GettablePipeline) bool { return p.Alias == ip.Alias }) if userPipelineIdx >= 0 { @@ -183,7 +195,7 @@ func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion( } for idx := range result { - result[idx].OrderId = idx + 1 + result[idx].OrderID = idx + 1 } return result, nil @@ -193,10 +205,11 @@ func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion( func (ic *LogParsingPipelineController) GetPipelinesByVersion( ctx context.Context, version int, ) (*PipelinesResponse, *model.ApiError) { + pipelines, errors := ic.getEffectivePipelinesByVersion(ctx, version) if errors != nil { zap.L().Error("failed to get pipelines for version", zap.Int("version", version), zap.Error(errors)) - return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version")) + return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version %v", errors)) } var configVersion *agentConf.ConfigVersion @@ -216,8 +229,8 @@ func (ic *LogParsingPipelineController) GetPipelinesByVersion( } type PipelinesPreviewRequest struct { - Pipelines []Pipeline `json:"pipelines"` - Logs []model.SignozLog `json:"logs"` + Pipelines []pipelinetypes.GettablePipeline `json:"pipelines"` + Logs []model.SignozLog `json:"logs"` } type PipelinesPreviewResponse struct { diff --git a/pkg/query-service/app/logparsingpipeline/db.go b/pkg/query-service/app/logparsingpipeline/db.go index bc09f32e22..1aeb476d8c 100644 --- a/pkg/query-service/app/logparsingpipeline/db.go +++ b/pkg/query-service/app/logparsingpipeline/db.go @@ -7,31 +7,33 @@ import ( "time" "github.com/SigNoz/signoz/pkg/query-service/model" + "github.com/SigNoz/signoz/pkg/sqlstore" + "github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types/authtypes" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/google/uuid" - "github.com/jmoiron/sqlx" "github.com/pkg/errors" "go.uber.org/zap" ) // Repo handles DDL and DML ops on ingestion pipeline type Repo struct { - db *sqlx.DB + sqlStore sqlstore.SQLStore } const logPipelines = "log_pipelines" // NewRepo initiates a new ingestion repo -func NewRepo(db *sqlx.DB) Repo { +func NewRepo(sqlStore sqlstore.SQLStore) Repo { return Repo{ - db: db, + sqlStore: sqlStore, } } // insertPipeline stores a given postable pipeline to database func (r *Repo) insertPipeline( - ctx context.Context, postable *PostablePipeline, -) (*Pipeline, *model.ApiError) { + ctx context.Context, orgID string, postable *pipelinetypes.PostablePipeline, +) (*pipelinetypes.GettablePipeline, *model.ApiError) { if err := postable.IsValid(); err != nil { return nil, model.BadRequest(errors.Wrap(err, "pipeline is not valid", @@ -44,44 +46,43 @@ func (r *Repo) insertPipeline( "failed to unmarshal postable pipeline config", )) } + filter, err := json.Marshal(postable.Filter) + if err != nil { + return nil, model.BadRequest(errors.Wrap(err, + "failed to marshal postable pipeline filter", + )) + } claims, ok := authtypes.ClaimsFromContext(ctx) if !ok { return nil, model.UnauthorizedError(fmt.Errorf("failed to get email from context")) } - insertRow := &Pipeline{ - Id: uuid.New().String(), - OrderId: postable.OrderId, - Enabled: postable.Enabled, - Name: postable.Name, - Alias: postable.Alias, - Description: &postable.Description, - Filter: postable.Filter, - Config: postable.Config, - RawConfig: string(rawConfig), - Creator: Creator{ - CreatedBy: claims.Email, - CreatedAt: time.Now(), + insertRow := &pipelinetypes.GettablePipeline{ + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrgID: orgID, + ID: uuid.New().String(), + OrderID: postable.OrderID, + Enabled: postable.Enabled, + Name: postable.Name, + Alias: postable.Alias, + Description: postable.Description, + FilterString: string(filter), + ConfigJSON: string(rawConfig), + TimeAuditable: types.TimeAuditable{ + CreatedAt: time.Now(), + }, + UserAuditable: types.UserAuditable{ + CreatedBy: claims.Email, + }, }, + Filter: postable.Filter, + Config: postable.Config, } - insertQuery := `INSERT INTO pipelines - (id, order_id, enabled, created_by, created_at, name, alias, description, filter, config_json) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)` - - _, err = r.db.ExecContext(ctx, - insertQuery, - insertRow.Id, - insertRow.OrderId, - insertRow.Enabled, - insertRow.Creator.CreatedBy, - insertRow.Creator.CreatedAt, - insertRow.Name, - insertRow.Alias, - insertRow.Description, - insertRow.Filter, - insertRow.RawConfig) + _, err = r.sqlStore.BunDB().NewInsert(). + Model(&insertRow.StoreablePipeline). + Exec(ctx) if err != nil { zap.L().Error("error in inserting pipeline data", zap.Error(err)) @@ -93,102 +94,105 @@ func (r *Repo) insertPipeline( // getPipelinesByVersion returns pipelines associated with a given version func (r *Repo) getPipelinesByVersion( - ctx context.Context, version int, -) ([]Pipeline, []error) { + ctx context.Context, orgID string, version int, +) ([]pipelinetypes.GettablePipeline, []error) { var errors []error - pipelines := []Pipeline{} - - versionQuery := `SELECT r.id, - r.name, - r.config_json, - r.alias, - r.description, - r.filter, - r.order_id, - r.created_by, - r.created_at, - r.enabled - FROM pipelines r, - agent_config_elements e, - agent_config_versions v - WHERE r.id = e.element_id - AND v.id = e.version_id - AND e.element_type = $1 - AND v.version = $2 - ORDER BY order_id asc` - - err := r.db.SelectContext(ctx, &pipelines, versionQuery, logPipelines, version) + storablePipelines := []pipelinetypes.StoreablePipeline{} + err := r.sqlStore.BunDB().NewSelect(). + Model(&storablePipelines). + Join("JOIN agent_config_elements e ON p.id = e.element_id"). + Join("JOIN agent_config_versions v ON v.id = e.version_id"). + Where("e.element_type = ?", logPipelines). // TODO: nitya - add org_id to this as well + Where("v.version = ?", version). // TODO: nitya - add org_id to this as well + Where("p.org_id = ?", orgID). + Order("p.order_id ASC"). + Scan(ctx) if err != nil { - return nil, []error{fmt.Errorf("failed to get drop pipelines from db: %v", err)} + return nil, []error{fmt.Errorf("failed to get pipelines from db: %v", err)} } - if len(pipelines) == 0 { - return pipelines, nil + gettablePipelines := make([]pipelinetypes.GettablePipeline, len(storablePipelines)) + if len(storablePipelines) == 0 { + return gettablePipelines, nil } - for i := range pipelines { - if err := pipelines[i].ParseRawConfig(); err != nil { + for i := range storablePipelines { + gettablePipelines[i].StoreablePipeline = storablePipelines[i] + if err := gettablePipelines[i].ParseRawConfig(); err != nil { + errors = append(errors, err) + } + if err := gettablePipelines[i].ParseFilter(); err != nil { errors = append(errors, err) } } - return pipelines, errors + return gettablePipelines, errors +} + +func (r *Repo) GetDefaultOrgID(ctx context.Context) (string, *model.ApiError) { + var orgs []types.Organization + err := r.sqlStore.BunDB().NewSelect(). + Model(&orgs). + Scan(ctx) + if err != nil { + return "", model.InternalError(errors.Wrap(err, "failed to get default org ID")) + } + if len(orgs) == 0 { + return "", model.InternalError(errors.New("no orgs found")) + } + return orgs[0].ID, nil } // GetPipelines returns pipeline and errors (if any) func (r *Repo) GetPipeline( - ctx context.Context, id string, -) (*Pipeline, *model.ApiError) { - pipelines := []Pipeline{} + ctx context.Context, orgID string, id string, +) (*pipelinetypes.GettablePipeline, *model.ApiError) { + storablePipelines := []pipelinetypes.StoreablePipeline{} - pipelineQuery := `SELECT id, - name, - config_json, - alias, - description, - filter, - order_id, - created_by, - created_at, - enabled - FROM pipelines - WHERE id = $1` - - err := r.db.SelectContext(ctx, &pipelines, pipelineQuery, id) + err := r.sqlStore.BunDB().NewSelect(). + Model(&storablePipelines). + Where("id = ?", id). + Where("org_id = ?", orgID). + Scan(ctx) if err != nil { zap.L().Error("failed to get ingestion pipeline from db", zap.Error(err)) return nil, model.InternalError(errors.Wrap(err, "failed to get ingestion pipeline from db")) } - if len(pipelines) == 0 { + if len(storablePipelines) == 0 { zap.L().Warn("No row found for ingestion pipeline id", zap.String("id", id)) return nil, model.NotFoundError(fmt.Errorf("no row found for ingestion pipeline id %v", id)) } - if len(pipelines) == 1 { - err := pipelines[0].ParseRawConfig() - if err != nil { + if len(storablePipelines) == 1 { + gettablePipeline := pipelinetypes.GettablePipeline{} + gettablePipeline.StoreablePipeline = storablePipelines[0] + if err := gettablePipeline.ParseRawConfig(); err != nil { zap.L().Error("invalid pipeline config found", zap.String("id", id), zap.Error(err)) return nil, model.InternalError( errors.Wrap(err, "found an invalid pipeline config"), ) } - return &pipelines[0], nil + if err := gettablePipeline.ParseFilter(); err != nil { + zap.L().Error("invalid pipeline filter found", zap.String("id", id), zap.Error(err)) + return nil, model.InternalError( + errors.Wrap(err, "found an invalid pipeline filter"), + ) + } + return &gettablePipeline, nil } return nil, model.InternalError(fmt.Errorf("multiple pipelines with same id")) } -func (r *Repo) DeletePipeline(ctx context.Context, id string) error { - deleteQuery := `DELETE - FROM pipelines - WHERE id = $1` - - _, err := r.db.ExecContext(ctx, deleteQuery, id) +func (r *Repo) DeletePipeline(ctx context.Context, orgID string, id string) error { + _, err := r.sqlStore.BunDB().NewDelete(). + Model(&pipelinetypes.StoreablePipeline{}). + Where("id = ?", id). + Where("org_id = ?", orgID). + Exec(ctx) if err != nil { return model.BadRequest(err) } - return nil - } diff --git a/pkg/query-service/app/logparsingpipeline/model.go b/pkg/query-service/app/logparsingpipeline/model.go deleted file mode 100644 index d3e27e23d4..0000000000 --- a/pkg/query-service/app/logparsingpipeline/model.go +++ /dev/null @@ -1,104 +0,0 @@ -package logparsingpipeline - -import ( - "encoding/json" - "time" - - v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" - "github.com/pkg/errors" -) - -// Pipeline is stored and also deployed finally to collector config -type Pipeline struct { - Id string `json:"id,omitempty" db:"id"` - OrderId int `json:"orderId" db:"order_id"` - Name string `json:"name,omitempty" db:"name"` - Alias string `json:"alias" db:"alias"` - Description *string `json:"description" db:"description"` - Enabled bool `json:"enabled" db:"enabled"` - Filter *v3.FilterSet `json:"filter" db:"filter"` - - // configuration for pipeline - RawConfig string `db:"config_json" json:"-"` - - Config []PipelineOperator `json:"config"` - - // Updater not required as any change will result in new version - Creator -} - -type Creator struct { - CreatedBy string `json:"createdBy" db:"created_by"` - CreatedAt time.Time `json:"createdAt" db:"created_at"` -} - -type Processor struct { - Operators []PipelineOperator `json:"operators" yaml:"operators"` -} - -type PipelineOperator struct { - Type string `json:"type" yaml:"type"` - ID string `json:"id,omitempty" yaml:"id,omitempty"` - Output string `json:"output,omitempty" yaml:"output,omitempty"` - OnError string `json:"on_error,omitempty" yaml:"on_error,omitempty"` - If string `json:"if,omitempty" yaml:"if,omitempty"` - - // don't need the following in the final config - OrderId int `json:"orderId" yaml:"-"` - Enabled bool `json:"enabled" yaml:"-"` - Name string `json:"name,omitempty" yaml:"-"` - - // optional keys depending on the type - ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"` - Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"` - Regex string `json:"regex,omitempty" yaml:"regex,omitempty"` - ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"` - *TraceParser `yaml:",inline,omitempty"` - Field string `json:"field,omitempty" yaml:"field,omitempty"` - Value string `json:"value,omitempty" yaml:"value,omitempty"` - From string `json:"from,omitempty" yaml:"from,omitempty"` - To string `json:"to,omitempty" yaml:"to,omitempty"` - Expr string `json:"expr,omitempty" yaml:"expr,omitempty"` - Routes *[]Route `json:"routes,omitempty" yaml:"routes,omitempty"` - Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"` - Default string `json:"default,omitempty" yaml:"default,omitempty"` - - // time_parser fields. - Layout string `json:"layout,omitempty" yaml:"layout,omitempty"` - LayoutType string `json:"layout_type,omitempty" yaml:"layout_type,omitempty"` - - // severity parser fields - SeverityMapping map[string][]string `json:"mapping,omitempty" yaml:"mapping,omitempty"` - OverwriteSeverityText bool `json:"overwrite_text,omitempty" yaml:"overwrite_text,omitempty"` -} - -type TimestampParser struct { - Layout string `json:"layout" yaml:"layout"` - LayoutType string `json:"layout_type" yaml:"layout_type"` - ParseFrom string `json:"parse_from" yaml:"parse_from"` -} - -type TraceParser struct { - TraceId *ParseFrom `json:"trace_id,omitempty" yaml:"trace_id,omitempty"` - SpanId *ParseFrom `json:"span_id,omitempty" yaml:"span_id,omitempty"` - TraceFlags *ParseFrom `json:"trace_flags,omitempty" yaml:"trace_flags,omitempty"` -} - -type ParseFrom struct { - ParseFrom string `json:"parse_from" yaml:"parse_from"` -} - -type Route struct { - Output string `json:"output" yaml:"output"` - Expr string `json:"expr" yaml:"expr"` -} - -func (i *Pipeline) ParseRawConfig() error { - c := []PipelineOperator{} - err := json.Unmarshal([]byte(i.RawConfig), &c) - if err != nil { - return errors.Wrap(err, "failed to parse ingestion rule config") - } - i.Config = c - return nil -} diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go index 56268115e9..770f6f8d41 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go @@ -8,6 +8,7 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/constants" "github.com/SigNoz/signoz/pkg/query-service/queryBuilderToExpr" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/antonmedv/expr" "github.com/antonmedv/expr/ast" "github.com/antonmedv/expr/parser" @@ -22,15 +23,15 @@ const ( // only alphabets, digits and `-` are used when translating pipeline identifiers var badCharsForCollectorConfName = regexp.MustCompile("[^a-zA-Z0-9-]") -func CollectorConfProcessorName(p Pipeline) string { +func CollectorConfProcessorName(p pipelinetypes.GettablePipeline) string { normalizedAlias := badCharsForCollectorConfName.ReplaceAllString(p.Alias, "-") return constants.LogsPPLPfx + normalizedAlias } -func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []string, error) { +func PreparePipelineProcessor(gettablePipelines []pipelinetypes.GettablePipeline) (map[string]interface{}, []string, error) { processors := map[string]interface{}{} names := []string{} - for pipelineIdx, v := range pipelines { + for pipelineIdx, v := range gettablePipelines { if !v.Enabled { continue } @@ -49,11 +50,11 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s return nil, nil, errors.Wrap(err, "failed to parse pipeline filter") } - router := []PipelineOperator{ + router := []pipelinetypes.PipelineOperator{ { ID: "router_signoz", Type: "router", - Routes: &[]Route{ + Routes: &[]pipelinetypes.Route{ { Output: operators[0].ID, Expr: filterExpr, @@ -66,13 +67,13 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s v.Config = append(router, operators...) // noop operator is needed as the default operator so that logs are not dropped - noop := PipelineOperator{ + noop := pipelinetypes.PipelineOperator{ ID: NOOP, Type: NOOP, } v.Config = append(v.Config, noop) - processor := Processor{ + processor := pipelinetypes.Processor{ Operators: v.Config, } name := CollectorConfProcessorName(v) @@ -88,8 +89,8 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s return processors, names, nil } -func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) { - filteredOp := []PipelineOperator{} +func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.PipelineOperator, error) { + filteredOp := []pipelinetypes.PipelineOperator{} for i, operator := range ops { if operator.Enabled { if len(filteredOp) > 0 { @@ -179,7 +180,7 @@ func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) { operator.If = parseFromNotNilCheck if operator.LayoutType == "strptime" { - regex, err := RegexForStrptimeLayout(operator.Layout) + regex, err := pipelinetypes.RegexForStrptimeLayout(operator.Layout) if err != nil { return nil, fmt.Errorf( "couldn't generate layout regex for time_parser %s: %w", operator.Name, err, @@ -224,7 +225,7 @@ func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) { return filteredOp, nil } -func cleanTraceParser(operator *PipelineOperator) { +func cleanTraceParser(operator *pipelinetypes.PipelineOperator) { if operator.TraceId != nil && len(operator.TraceId.ParseFrom) < 1 { operator.TraceId = nil } diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go index d21b37a014..d87dcb8ff9 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go @@ -10,6 +10,8 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/model" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" "github.com/SigNoz/signoz/pkg/query-service/utils" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" + "github.com/google/uuid" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" . "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/require" @@ -17,12 +19,12 @@ import ( var prepareProcessorTestData = []struct { Name string - Operators []PipelineOperator - Output []PipelineOperator + Operators []pipelinetypes.PipelineOperator + Output []pipelinetypes.PipelineOperator }{ { Name: "Last operator disabled", - Operators: []PipelineOperator{ + Operators: []pipelinetypes.PipelineOperator{ { ID: "t1", Name: "t1", @@ -35,7 +37,7 @@ var prepareProcessorTestData = []struct { Enabled: false, }, }, - Output: []PipelineOperator{ + Output: []pipelinetypes.PipelineOperator{ { ID: "t1", Name: "t1", @@ -45,7 +47,7 @@ var prepareProcessorTestData = []struct { }, { Name: "Operator in middle disabled", - Operators: []PipelineOperator{ + Operators: []pipelinetypes.PipelineOperator{ { ID: "t1", Name: "t1", @@ -64,7 +66,7 @@ var prepareProcessorTestData = []struct { Enabled: true, }, }, - Output: []PipelineOperator{ + Output: []pipelinetypes.PipelineOperator{ { ID: "t1", Name: "t1", @@ -80,7 +82,7 @@ var prepareProcessorTestData = []struct { }, { Name: "Single operator disabled", - Operators: []PipelineOperator{ + Operators: []pipelinetypes.PipelineOperator{ { ID: "t1", Name: "t1", @@ -88,18 +90,18 @@ var prepareProcessorTestData = []struct { Enabled: false, }, }, - Output: []PipelineOperator{}, + Output: []pipelinetypes.PipelineOperator{}, }, { Name: "Single operator enabled", - Operators: []PipelineOperator{ + Operators: []pipelinetypes.PipelineOperator{ { ID: "t1", Name: "t1", Enabled: true, }, }, - Output: []PipelineOperator{ + Output: []pipelinetypes.PipelineOperator{ { ID: "t1", Name: "t1", @@ -109,12 +111,12 @@ var prepareProcessorTestData = []struct { }, { Name: "Empty operator", - Operators: []PipelineOperator{}, - Output: []PipelineOperator{}, + Operators: []pipelinetypes.PipelineOperator{}, + Output: []pipelinetypes.PipelineOperator{}, }, { Name: "new test", - Operators: []PipelineOperator{ + Operators: []pipelinetypes.PipelineOperator{ { ID: "move_filename", Output: "move_function", @@ -145,7 +147,7 @@ var prepareProcessorTestData = []struct { Name: "move_lwp", }, }, - Output: []PipelineOperator{ + Output: []pipelinetypes.PipelineOperator{ { ID: "move_filename", Output: "move_line", @@ -173,7 +175,7 @@ var prepareProcessorTestData = []struct { }, { Name: "first op disabled", - Operators: []PipelineOperator{ + Operators: []pipelinetypes.PipelineOperator{ { ID: "move_filename", Output: "move_function", @@ -186,7 +188,7 @@ var prepareProcessorTestData = []struct { Name: "move_function", }, }, - Output: []PipelineOperator{ + Output: []pipelinetypes.PipelineOperator{ { ID: "move_function", Enabled: true, @@ -223,14 +225,17 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { }, }, } - makeTestPipeline := func(config []PipelineOperator) Pipeline { - return Pipeline{ - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, - Filter: testPipelineFilter, - Config: config, + makeTestPipeline := func(config []pipelinetypes.PipelineOperator) pipelinetypes.GettablePipeline { + return pipelinetypes.GettablePipeline{ + StoreablePipeline: pipelinetypes.StoreablePipeline{ + ID: uuid.New().String(), + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, + Filter: testPipelineFilter, + Config: config, } } @@ -260,14 +265,14 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { type pipelineTestCase struct { Name string - Operator PipelineOperator + Operator pipelinetypes.PipelineOperator NonMatchingLog model.SignozLog } testCases := []pipelineTestCase{ { "regex processor should ignore log with missing field", - PipelineOperator{ + pipelinetypes.PipelineOperator{ ID: "regex", Type: "regex_parser", Enabled: true, @@ -279,7 +284,7 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { makeTestLog("mismatching log", map[string]string{}), }, { "regex processor should ignore non-matching log", - PipelineOperator{ + pipelinetypes.PipelineOperator{ ID: "regex", Type: "regex_parser", Enabled: true, @@ -291,7 +296,7 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { makeTestLog("mismatching log", map[string]string{}), }, { "json parser should ignore logs with missing field.", - PipelineOperator{ + pipelinetypes.PipelineOperator{ ID: "json", Type: "json_parser", Enabled: true, @@ -303,7 +308,7 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { }, { "json parser should ignore log with non JSON target field value", - PipelineOperator{ + pipelinetypes.PipelineOperator{ ID: "json", Type: "json_parser", Enabled: true, @@ -316,7 +321,7 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { }), }, { "move parser should ignore non matching logs", - PipelineOperator{ + pipelinetypes.PipelineOperator{ ID: "move", Type: "move", Enabled: true, @@ -327,7 +332,7 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { makeTestLog("mismatching log", map[string]string{}), }, { "copy parser should ignore non matching logs", - PipelineOperator{ + pipelinetypes.PipelineOperator{ ID: "copy", Type: "copy", Enabled: true, @@ -338,7 +343,7 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { makeTestLog("mismatching log", map[string]string{}), }, { "remove parser should ignore non matching logs", - PipelineOperator{ + pipelinetypes.PipelineOperator{ ID: "remove", Type: "remove", Enabled: true, @@ -348,7 +353,7 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { makeTestLog("mismatching log", map[string]string{}), }, { "time parser should ignore logs with missing field.", - PipelineOperator{ + pipelinetypes.PipelineOperator{ ID: "time", Type: "time_parser", Enabled: true, @@ -360,7 +365,7 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { makeTestLog("mismatching log", map[string]string{}), }, { "time parser should ignore logs timestamp values that don't contain expected strptime layout.", - PipelineOperator{ + pipelinetypes.PipelineOperator{ ID: "time", Type: "time_parser", Enabled: true, @@ -374,7 +379,7 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { }), }, { "time parser should ignore logs timestamp values that don't contain an epoch", - PipelineOperator{ + pipelinetypes.PipelineOperator{ ID: "time", Type: "time_parser", Enabled: true, @@ -388,7 +393,7 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { }), }, { "grok parser should ignore logs with missing parse from field", - PipelineOperator{ + pipelinetypes.PipelineOperator{ ID: "grok", Type: "grok_parser", Enabled: true, @@ -417,7 +422,7 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { "time parser should ignore log with timestamp value %s that doesn't match layout type %s", testValue, epochLayout, ), - PipelineOperator{ + pipelinetypes.PipelineOperator{ ID: "time", Type: "time_parser", Enabled: true, @@ -434,7 +439,7 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { } for _, testCase := range testCases { - testPipelines := []Pipeline{makeTestPipeline([]PipelineOperator{testCase.Operator})} + testPipelines := []pipelinetypes.GettablePipeline{makeTestPipeline([]pipelinetypes.PipelineOperator{testCase.Operator})} result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing( context.Background(), @@ -450,11 +455,14 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { func TestResourceFiltersWork(t *testing.T) { require := require.New(t) - testPipeline := Pipeline{ - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + testPipeline := pipelinetypes.GettablePipeline{ + StoreablePipeline: pipelinetypes.StoreablePipeline{ + ID: uuid.New().String(), + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -469,7 +477,7 @@ func TestResourceFiltersWork(t *testing.T) { }, }, }, - Config: []PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { ID: "add", Type: "add", @@ -496,7 +504,7 @@ func TestResourceFiltersWork(t *testing.T) { result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing( context.Background(), - []Pipeline{testPipeline}, + []pipelinetypes.GettablePipeline{testPipeline}, []model.SignozLog{testLog}, ) require.Nil(err) @@ -515,11 +523,14 @@ func TestPipelineFilterWithStringOpsShouldNotSpamWarningsIfAttributeIsMissing(t v3.FilterOperatorRegex, v3.FilterOperatorNotRegex, } { - testPipeline := Pipeline{ - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + testPipeline := pipelinetypes.GettablePipeline{ + StoreablePipeline: pipelinetypes.StoreablePipeline{ + ID: uuid.New().String(), + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -534,7 +545,7 @@ func TestPipelineFilterWithStringOpsShouldNotSpamWarningsIfAttributeIsMissing(t }, }, }, - Config: []PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { ID: "add", Type: "add", @@ -559,7 +570,7 @@ func TestPipelineFilterWithStringOpsShouldNotSpamWarningsIfAttributeIsMissing(t result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing( context.Background(), - []Pipeline{testPipeline}, + []pipelinetypes.GettablePipeline{testPipeline}, []model.SignozLog{testLog}, ) require.Nil(err) @@ -571,11 +582,14 @@ func TestPipelineFilterWithStringOpsShouldNotSpamWarningsIfAttributeIsMissing(t func TestAttributePathsContainingDollarDoNotBreakCollector(t *testing.T) { require := require.New(t) - testPipeline := Pipeline{ - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + testPipeline := pipelinetypes.GettablePipeline{ + StoreablePipeline: pipelinetypes.StoreablePipeline{ + ID: uuid.New().String(), + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -590,7 +604,7 @@ func TestAttributePathsContainingDollarDoNotBreakCollector(t *testing.T) { }, }, }, - Config: []PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { ID: "move", Type: "move", @@ -610,7 +624,7 @@ func TestAttributePathsContainingDollarDoNotBreakCollector(t *testing.T) { result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing( context.Background(), - []Pipeline{testPipeline}, + []pipelinetypes.GettablePipeline{testPipeline}, testLogs, ) require.Nil(err) @@ -629,11 +643,14 @@ func TestMembershipOpInProcessorFieldExpressions(t *testing.T) { }), } - testPipeline := Pipeline{ - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + testPipeline := pipelinetypes.GettablePipeline{ + StoreablePipeline: pipelinetypes.StoreablePipeline{ + ID: uuid.New().String(), + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -648,7 +665,7 @@ func TestMembershipOpInProcessorFieldExpressions(t *testing.T) { }, }, }, - Config: []PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { ID: "move", Type: "move", @@ -711,7 +728,7 @@ func TestMembershipOpInProcessorFieldExpressions(t *testing.T) { result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing( context.Background(), - []Pipeline{testPipeline}, + []pipelinetypes.GettablePipeline{testPipeline}, testLogs, ) require.Nil(err) @@ -733,11 +750,14 @@ func TestContainsFilterIsCaseInsensitive(t *testing.T) { makeTestSignozLog("test Ecom Log", map[string]interface{}{}), } - testPipelines := []Pipeline{{ - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + testPipelines := []pipelinetypes.GettablePipeline{{ + StoreablePipeline: pipelinetypes.StoreablePipeline{ + ID: uuid.New().String(), + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{{ @@ -751,7 +771,7 @@ func TestContainsFilterIsCaseInsensitive(t *testing.T) { Value: "log", }}, }, - Config: []PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { ID: "add", Type: "add", @@ -762,10 +782,13 @@ func TestContainsFilterIsCaseInsensitive(t *testing.T) { }, }, }, { - OrderId: 2, - Name: "pipeline2", - Alias: "pipeline2", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + ID: uuid.New().String(), + OrderID: 2, + Name: "pipeline2", + Alias: "pipeline2", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{{ @@ -779,7 +802,7 @@ func TestContainsFilterIsCaseInsensitive(t *testing.T) { Value: "ecom", }}, }, - Config: []PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { ID: "add", Type: "add", diff --git a/pkg/query-service/app/logparsingpipeline/preview.go b/pkg/query-service/app/logparsingpipeline/preview.go index abecd733ee..dee69f1126 100644 --- a/pkg/query-service/app/logparsingpipeline/preview.go +++ b/pkg/query-service/app/logparsingpipeline/preview.go @@ -10,6 +10,7 @@ import ( _ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok" "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor" "github.com/SigNoz/signoz/pkg/query-service/model" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/pkg/errors" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -18,7 +19,7 @@ import ( func SimulatePipelinesProcessing( ctx context.Context, - pipelines []Pipeline, + pipelines []pipelinetypes.GettablePipeline, logs []model.SignozLog, ) ( output []model.SignozLog, collectorWarnAndErrorLogs []string, apiErr *model.ApiError, diff --git a/pkg/query-service/app/logparsingpipeline/preview_test.go b/pkg/query-service/app/logparsingpipeline/preview_test.go index b20ce28bf1..011c2889fb 100644 --- a/pkg/query-service/app/logparsingpipeline/preview_test.go +++ b/pkg/query-service/app/logparsingpipeline/preview_test.go @@ -8,6 +8,7 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/model" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/google/uuid" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/stretchr/testify/require" @@ -16,12 +17,14 @@ import ( func TestPipelinePreview(t *testing.T) { require := require.New(t) - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ { - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -36,7 +39,7 @@ func TestPipelinePreview(t *testing.T) { }, }, }, - Config: []PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "add", @@ -49,10 +52,12 @@ func TestPipelinePreview(t *testing.T) { }, }, { - OrderId: 2, - Name: "pipeline2", - Alias: "pipeline2", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 2, + Name: "pipeline2", + Alias: "pipeline2", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -67,7 +72,7 @@ func TestPipelinePreview(t *testing.T) { }, }, }, - Config: []PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "add", @@ -146,12 +151,14 @@ func TestPipelinePreview(t *testing.T) { func TestGrokParsingProcessor(t *testing.T) { require := require.New(t) - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ { - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -166,7 +173,7 @@ func TestGrokParsingProcessor(t *testing.T) { }, }, }, - Config: []PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "grok", diff --git a/pkg/query-service/app/logparsingpipeline/processors_test.go b/pkg/query-service/app/logparsingpipeline/processors_test.go index 2a440318b9..0b03c9dba8 100644 --- a/pkg/query-service/app/logparsingpipeline/processors_test.go +++ b/pkg/query-service/app/logparsingpipeline/processors_test.go @@ -12,6 +12,7 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/model" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" "github.com/SigNoz/signoz/pkg/query-service/utils" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/stretchr/testify/require" ) @@ -21,12 +22,14 @@ import ( func TestRegexProcessor(t *testing.T) { require := require.New(t) - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ { - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -41,11 +44,11 @@ func TestRegexProcessor(t *testing.T) { }, }, }, - Config: []PipelineOperator{}, + Config: []pipelinetypes.PipelineOperator{}, }, } - var parserOp PipelineOperator + var parserOp pipelinetypes.PipelineOperator err := json.Unmarshal([]byte(` { "orderId": 1, @@ -86,12 +89,14 @@ func TestRegexProcessor(t *testing.T) { func TestGrokProcessor(t *testing.T) { require := require.New(t) - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ { - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -106,11 +111,11 @@ func TestGrokProcessor(t *testing.T) { }, }, }, - Config: []PipelineOperator{}, + Config: []pipelinetypes.PipelineOperator{}, }, } - var parserOp PipelineOperator + var parserOp pipelinetypes.PipelineOperator err := json.Unmarshal([]byte(` { "orderId": 1, @@ -151,12 +156,14 @@ func TestGrokProcessor(t *testing.T) { func TestJSONProcessor(t *testing.T) { require := require.New(t) - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ { - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -171,11 +178,11 @@ func TestJSONProcessor(t *testing.T) { }, }, }, - Config: []PipelineOperator{}, + Config: []pipelinetypes.PipelineOperator{}, }, } - var parserOp PipelineOperator + var parserOp pipelinetypes.PipelineOperator err := json.Unmarshal([]byte(` { "orderId": 1, @@ -215,12 +222,14 @@ func TestJSONProcessor(t *testing.T) { func TestTraceParsingProcessor(t *testing.T) { require := require.New(t) - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ { - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -235,12 +244,12 @@ func TestTraceParsingProcessor(t *testing.T) { }, }, }, - Config: []PipelineOperator{}, + Config: []pipelinetypes.PipelineOperator{}, }, } // Start with JSON serialized trace parser to validate deserialization too - var traceParserOp PipelineOperator + var traceParserOp pipelinetypes.PipelineOperator err := json.Unmarshal([]byte(` { "orderId": 1, @@ -322,12 +331,14 @@ func TestTraceParsingProcessor(t *testing.T) { func TestAddProcessor(t *testing.T) { require := require.New(t) - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ { - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -342,11 +353,11 @@ func TestAddProcessor(t *testing.T) { }, }, }, - Config: []PipelineOperator{}, + Config: []pipelinetypes.PipelineOperator{}, }, } - var parserOp PipelineOperator + var parserOp pipelinetypes.PipelineOperator err := json.Unmarshal([]byte(` { "orderId": 1, @@ -385,12 +396,14 @@ func TestAddProcessor(t *testing.T) { func TestRemoveProcessor(t *testing.T) { require := require.New(t) - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ { - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -405,11 +418,11 @@ func TestRemoveProcessor(t *testing.T) { }, }, }, - Config: []PipelineOperator{}, + Config: []pipelinetypes.PipelineOperator{}, }, } - var parserOp PipelineOperator + var parserOp pipelinetypes.PipelineOperator err := json.Unmarshal([]byte(` { "orderId": 1, @@ -448,12 +461,14 @@ func TestRemoveProcessor(t *testing.T) { func TestCopyProcessor(t *testing.T) { require := require.New(t) - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ { - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -468,11 +483,11 @@ func TestCopyProcessor(t *testing.T) { }, }, }, - Config: []PipelineOperator{}, + Config: []pipelinetypes.PipelineOperator{}, }, } - var parserOp PipelineOperator + var parserOp pipelinetypes.PipelineOperator err := json.Unmarshal([]byte(` { "orderId": 1, @@ -512,12 +527,14 @@ func TestCopyProcessor(t *testing.T) { func TestMoveProcessor(t *testing.T) { require := require.New(t) - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ { - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -532,11 +549,11 @@ func TestMoveProcessor(t *testing.T) { }, }, }, - Config: []PipelineOperator{}, + Config: []pipelinetypes.PipelineOperator{}, }, } - var parserOp PipelineOperator + var parserOp pipelinetypes.PipelineOperator err := json.Unmarshal([]byte(` { "orderId": 1, diff --git a/pkg/query-service/app/logparsingpipeline/severity_parser_test.go b/pkg/query-service/app/logparsingpipeline/severity_parser_test.go index ee32651b15..ec707dfb95 100644 --- a/pkg/query-service/app/logparsingpipeline/severity_parser_test.go +++ b/pkg/query-service/app/logparsingpipeline/severity_parser_test.go @@ -8,18 +8,21 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/model" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/stretchr/testify/require" ) func TestSeverityParsingProcessor(t *testing.T) { require := require.New(t) - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ { - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -34,11 +37,11 @@ func TestSeverityParsingProcessor(t *testing.T) { }, }, }, - Config: []PipelineOperator{}, + Config: []pipelinetypes.PipelineOperator{}, }, } - var severityParserOp PipelineOperator + var severityParserOp pipelinetypes.PipelineOperator err := json.Unmarshal([]byte(` { "orderId": 1, @@ -152,27 +155,29 @@ func TestNoCollectorErrorsFromSeverityParserForMismatchedLogs(t *testing.T) { }, }, } - makeTestPipeline := func(config []PipelineOperator) Pipeline { - return Pipeline{ - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, - Filter: testPipelineFilter, - Config: config, + makeTestPipeline := func(config []pipelinetypes.PipelineOperator) pipelinetypes.GettablePipeline { + return pipelinetypes.GettablePipeline{ + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, + Filter: testPipelineFilter, + Config: config, } } type pipelineTestCase struct { Name string - Operator PipelineOperator + Operator pipelinetypes.PipelineOperator NonMatchingLog model.SignozLog } testCases := []pipelineTestCase{ { "severity parser should ignore logs with missing field", - PipelineOperator{ + pipelinetypes.PipelineOperator{ ID: "severity", Type: "severity_parser", Enabled: true, @@ -188,7 +193,7 @@ func TestNoCollectorErrorsFromSeverityParserForMismatchedLogs(t *testing.T) { }), }, { "severity parser should ignore logs with invalid values.", - PipelineOperator{ + pipelinetypes.PipelineOperator{ ID: "severity", Type: "severity_parser", Enabled: true, @@ -207,7 +212,7 @@ func TestNoCollectorErrorsFromSeverityParserForMismatchedLogs(t *testing.T) { } for _, testCase := range testCases { - testPipelines := []Pipeline{makeTestPipeline([]PipelineOperator{testCase.Operator})} + testPipelines := []pipelinetypes.GettablePipeline{makeTestPipeline([]pipelinetypes.PipelineOperator{testCase.Operator})} result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing( context.Background(), diff --git a/pkg/query-service/app/logparsingpipeline/time_parser_test.go b/pkg/query-service/app/logparsingpipeline/time_parser_test.go index 6e9aa998ed..491d176de6 100644 --- a/pkg/query-service/app/logparsingpipeline/time_parser_test.go +++ b/pkg/query-service/app/logparsingpipeline/time_parser_test.go @@ -3,76 +3,27 @@ package logparsingpipeline import ( "context" "encoding/json" - "fmt" "strings" "testing" "time" "github.com/SigNoz/signoz/pkg/query-service/model" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" - "github.com/antonmedv/expr" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/stretchr/testify/require" ) -func TestRegexForStrptimeLayout(t *testing.T) { - require := require.New(t) - - var testCases = []struct { - strptimeLayout string - str string - shouldMatch bool - }{ - { - strptimeLayout: "%Y-%m-%dT%H:%M:%S.%f%z", - str: "2023-11-26T12:03:28.239907+0530", - shouldMatch: true, - }, { - strptimeLayout: "%d-%m-%Y", - str: "26-11-2023", - shouldMatch: true, - }, { - strptimeLayout: "%d-%m-%Y", - str: "26-11-2023", - shouldMatch: true, - }, { - strptimeLayout: "%d/%m/%y", - str: "11/03/02", - shouldMatch: true, - }, { - strptimeLayout: "%A, %d. %B %Y %I:%M%p", - str: "Tuesday, 21. November 2006 04:30PM11/03/02", - shouldMatch: true, - }, { - strptimeLayout: "%A, %d. %B %Y %I:%M%p", - str: "some random text", - shouldMatch: false, - }, - } - - for _, test := range testCases { - regex, err := RegexForStrptimeLayout(test.strptimeLayout) - require.Nil(err, test.strptimeLayout) - - code := fmt.Sprintf(`"%s" matches "%s"`, test.str, regex) - program, err := expr.Compile(code) - require.Nil(err, test.strptimeLayout) - - output, err := expr.Run(program, map[string]string{}) - require.Nil(err, test.strptimeLayout) - require.Equal(output, test.shouldMatch, test.strptimeLayout) - - } -} - func TestTimestampParsingProcessor(t *testing.T) { require := require.New(t) - testPipelines := []Pipeline{ + testPipelines := []pipelinetypes.GettablePipeline{ { - OrderId: 1, - Name: "pipeline1", - Alias: "pipeline1", - Enabled: true, + StoreablePipeline: pipelinetypes.StoreablePipeline{ + OrderID: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + }, Filter: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -87,11 +38,11 @@ func TestTimestampParsingProcessor(t *testing.T) { }, }, }, - Config: []PipelineOperator{}, + Config: []pipelinetypes.PipelineOperator{}, }, } - var timestampParserOp PipelineOperator + var timestampParserOp pipelinetypes.PipelineOperator err := json.Unmarshal([]byte(` { "orderId": 1, diff --git a/pkg/query-service/app/opamp/model/agent.go b/pkg/query-service/app/opamp/model/agent.go index 53ca64a5db..cf48719e80 100644 --- a/pkg/query-service/app/opamp/model/agent.go +++ b/pkg/query-service/app/opamp/model/agent.go @@ -251,6 +251,9 @@ func (agent *Agent) processStatusUpdate( if agentDescrChanged { // Agent description is changed. + //Get the default org ID + // agent. + // We need to recalculate the config. configChanged = agent.updateRemoteConfig(configProvider) } diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 209ffdfaec..6f2ac1d8e3 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -183,7 +183,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { } logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController( - serverOptions.SigNoz.SQLStore.SQLxDB(), integrationsController.GetPipelinesForInstalledIntegrations, + serverOptions.SigNoz.SQLStore, integrationsController.GetPipelinesForInstalledIntegrations, ) if err != nil { return nil, err diff --git a/pkg/query-service/tests/integration/logparsingpipeline_test.go b/pkg/query-service/tests/integration/logparsingpipeline_test.go index 0cac4e472c..8a4c5953f2 100644 --- a/pkg/query-service/tests/integration/logparsingpipeline_test.go +++ b/pkg/query-service/tests/integration/logparsingpipeline_test.go @@ -22,6 +22,7 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/utils" "github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/types" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/google/uuid" "github.com/gorilla/mux" "github.com/knadh/koanf/parsers/yaml" @@ -62,15 +63,15 @@ func TestLogPipelinesLifecycle(t *testing.T) { }, } - postablePipelines := logparsingpipeline.PostablePipelines{ - Pipelines: []logparsingpipeline.PostablePipeline{ + postablePipelines := pipelinetypes.PostablePipelines{ + Pipelines: []pipelinetypes.PostablePipeline{ { - OrderId: 1, + OrderID: 1, Name: "pipeline1", Alias: "pipeline1", Enabled: true, Filter: pipelineFilterSet, - Config: []logparsingpipeline.PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "add", @@ -82,12 +83,12 @@ func TestLogPipelinesLifecycle(t *testing.T) { }, }, }, { - OrderId: 2, + OrderID: 2, Name: "pipeline2", Alias: "pipeline2", Enabled: true, Filter: pipelineFilterSet, - Config: []logparsingpipeline.PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "remove", @@ -179,10 +180,10 @@ func TestLogPipelinesHistory(t *testing.T) { getPipelinesResp := testbed.GetPipelinesFromQS() require.Equal(0, len(getPipelinesResp.History)) - postablePipelines := logparsingpipeline.PostablePipelines{ - Pipelines: []logparsingpipeline.PostablePipeline{ + postablePipelines := pipelinetypes.PostablePipelines{ + Pipelines: []pipelinetypes.PostablePipeline{ { - OrderId: 1, + OrderID: 1, Name: "pipeline1", Alias: "pipeline1", Enabled: true, @@ -200,7 +201,7 @@ func TestLogPipelinesHistory(t *testing.T) { }, }, }, - Config: []logparsingpipeline.PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "add", @@ -222,7 +223,7 @@ func TestLogPipelinesHistory(t *testing.T) { postablePipelines.Pipelines[0].Config = append( postablePipelines.Pipelines[0].Config, - logparsingpipeline.PipelineOperator{ + pipelinetypes.PipelineOperator{ OrderId: 2, ID: "remove", Type: "remove", @@ -259,18 +260,18 @@ func TestLogPipelinesValidation(t *testing.T) { testCases := []struct { Name string - Pipeline logparsingpipeline.PostablePipeline + Pipeline pipelinetypes.PostablePipeline ExpectedResponseStatusCode int }{ { Name: "Valid Pipeline", - Pipeline: logparsingpipeline.PostablePipeline{ - OrderId: 1, + Pipeline: pipelinetypes.PostablePipeline{ + OrderID: 1, Name: "pipeline 1", Alias: "pipeline1", Enabled: true, Filter: validPipelineFilterSet, - Config: []logparsingpipeline.PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "add", @@ -286,13 +287,13 @@ func TestLogPipelinesValidation(t *testing.T) { }, { Name: "Invalid orderId", - Pipeline: logparsingpipeline.PostablePipeline{ - OrderId: 0, + Pipeline: pipelinetypes.PostablePipeline{ + OrderID: 0, Name: "pipeline 1", Alias: "pipeline1", Enabled: true, Filter: validPipelineFilterSet, - Config: []logparsingpipeline.PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "add", @@ -308,13 +309,13 @@ func TestLogPipelinesValidation(t *testing.T) { }, { Name: "Invalid filter", - Pipeline: logparsingpipeline.PostablePipeline{ - OrderId: 1, + Pipeline: pipelinetypes.PostablePipeline{ + OrderID: 1, Name: "pipeline 1", Alias: "pipeline1", Enabled: true, Filter: &v3.FilterSet{}, - Config: []logparsingpipeline.PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "add", @@ -330,13 +331,13 @@ func TestLogPipelinesValidation(t *testing.T) { }, { Name: "Invalid operator field", - Pipeline: logparsingpipeline.PostablePipeline{ - OrderId: 1, + Pipeline: pipelinetypes.PostablePipeline{ + OrderID: 1, Name: "pipeline 1", Alias: "pipeline1", Enabled: true, Filter: validPipelineFilterSet, - Config: []logparsingpipeline.PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "add", @@ -351,13 +352,13 @@ func TestLogPipelinesValidation(t *testing.T) { ExpectedResponseStatusCode: 400, }, { Name: "Invalid from field path", - Pipeline: logparsingpipeline.PostablePipeline{ - OrderId: 1, + Pipeline: pipelinetypes.PostablePipeline{ + OrderID: 1, Name: "pipeline 1", Alias: "pipeline1", Enabled: true, Filter: validPipelineFilterSet, - Config: []logparsingpipeline.PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "move", @@ -377,8 +378,8 @@ func TestLogPipelinesValidation(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { testbed := NewLogPipelinesTestBed(t, nil) testbed.PostPipelinesToQSExpectingStatusCode( - logparsingpipeline.PostablePipelines{ - Pipelines: []logparsingpipeline.PostablePipeline{tc.Pipeline}, + pipelinetypes.PostablePipelines{ + Pipelines: []pipelinetypes.PostablePipeline{tc.Pipeline}, }, tc.ExpectedResponseStatusCode, ) @@ -394,10 +395,10 @@ func TestCanSavePipelinesWithoutConnectedAgents(t *testing.T) { require.Equal(0, len(getPipelinesResp.Pipelines)) require.Equal(0, len(getPipelinesResp.History)) - postablePipelines := logparsingpipeline.PostablePipelines{ - Pipelines: []logparsingpipeline.PostablePipeline{ + postablePipelines := pipelinetypes.PostablePipelines{ + Pipelines: []pipelinetypes.PostablePipeline{ { - OrderId: 1, + OrderID: 1, Name: "pipeline1", Alias: "pipeline1", Enabled: true, @@ -415,7 +416,7 @@ func TestCanSavePipelinesWithoutConnectedAgents(t *testing.T) { }, }, }, - Config: []logparsingpipeline.PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "add", @@ -454,13 +455,16 @@ func NewTestbedWithoutOpamp(t *testing.T, sqlStore sqlstore.SQLStore) *LogPipeli sqlStore = utils.NewQueryServiceDBForTests(t) } + // create test org + // utils.CreateTestOrg(t, sqlStore) + ic, err := integrations.NewController(sqlStore) if err != nil { t.Fatalf("could not create integrations controller: %v", err) } controller, err := logparsingpipeline.NewLogParsingPipelinesController( - sqlStore.SQLxDB(), ic.GetPipelinesForInstalledIntegrations, + sqlStore, ic.GetPipelinesForInstalledIntegrations, ) if err != nil { t.Fatalf("could not create a logparsingpipelines controller: %v", err) @@ -529,7 +533,7 @@ func NewLogPipelinesTestBed(t *testing.T, testDB sqlstore.SQLStore) *LogPipeline } func (tb *LogPipelinesTestBed) PostPipelinesToQSExpectingStatusCode( - postablePipelines logparsingpipeline.PostablePipelines, + postablePipelines pipelinetypes.PostablePipelines, expectedStatusCode int, ) *logparsingpipeline.PipelinesResponse { req, err := AuthenticatedRequestForTest( @@ -579,7 +583,7 @@ func (tb *LogPipelinesTestBed) PostPipelinesToQSExpectingStatusCode( } func (tb *LogPipelinesTestBed) PostPipelinesToQS( - postablePipelines logparsingpipeline.PostablePipelines, + postablePipelines pipelinetypes.PostablePipelines, ) *logparsingpipeline.PipelinesResponse { return tb.PostPipelinesToQSExpectingStatusCode( postablePipelines, 200, @@ -628,7 +632,7 @@ func (tb *LogPipelinesTestBed) GetPipelinesFromQS() *logparsingpipeline.Pipeline } func (tb *LogPipelinesTestBed) assertPipelinesSentToOpampClient( - pipelines []logparsingpipeline.Pipeline, + pipelines []pipelinetypes.GettablePipeline, ) { lastMsg := tb.opampClientConn.LatestMsgFromServer() assertPipelinesRecommendedInRemoteConfig( @@ -639,7 +643,7 @@ func (tb *LogPipelinesTestBed) assertPipelinesSentToOpampClient( func assertPipelinesRecommendedInRemoteConfig( t *testing.T, msg *protobufs.ServerToAgent, - pipelines []logparsingpipeline.Pipeline, + gettablePipelines []pipelinetypes.GettablePipeline, ) { collectorConfigFiles := msg.RemoteConfig.Config.ConfigMap require.Equal( @@ -669,7 +673,7 @@ func assertPipelinesRecommendedInRemoteConfig( } } - _, expectedLogProcessorNames, err := logparsingpipeline.PreparePipelineProcessor(pipelines) + _, expectedLogProcessorNames, err := logparsingpipeline.PreparePipelineProcessor(gettablePipelines) require.NoError(t, err) require.Equal( t, expectedLogProcessorNames, collectorConfLogsPipelineProcNames, @@ -698,12 +702,12 @@ func assertPipelinesRecommendedInRemoteConfig( // find logparsingpipeline.Pipeline whose processor is being validated here pipelineIdx := slices.IndexFunc( - pipelines, func(p logparsingpipeline.Pipeline) bool { + gettablePipelines, func(p pipelinetypes.GettablePipeline) bool { return logparsingpipeline.CollectorConfProcessorName(p) == procName }, ) require.GreaterOrEqual(t, pipelineIdx, 0) - expectedExpr, err := queryBuilderToExpr.Parse(pipelines[pipelineIdx].Filter) + expectedExpr, err := queryBuilderToExpr.Parse(gettablePipelines[pipelineIdx].Filter) require.Nil(t, err) require.Equal(t, expectedExpr, pipelineFilterExpr) } @@ -724,7 +728,7 @@ func (tb *LogPipelinesTestBed) simulateOpampClientAcknowledgementForLatestConfig } func (tb *LogPipelinesTestBed) assertNewAgentGetsPipelinesOnConnection( - pipelines []logparsingpipeline.Pipeline, + pipelines []pipelinetypes.GettablePipeline, ) { newAgentConn := &opamp.MockOpAmpConnection{} tb.opampServer.OnMessage( @@ -762,7 +766,7 @@ func unmarshalPipelinesResponse(apiResponse *app.ApiResponse) ( func assertPipelinesResponseMatchesPostedPipelines( t *testing.T, - postablePipelines logparsingpipeline.PostablePipelines, + postablePipelines pipelinetypes.PostablePipelines, pipelinesResp *logparsingpipeline.PipelinesResponse, ) { require.Equal( @@ -772,7 +776,7 @@ func assertPipelinesResponseMatchesPostedPipelines( for i, pipeline := range pipelinesResp.Pipelines { postable := postablePipelines.Pipelines[i] require.Equal(t, postable.Name, pipeline.Name, "pipeline.Name mismatch") - require.Equal(t, postable.OrderId, pipeline.OrderId, "pipeline.OrderId mismatch") + require.Equal(t, postable.OrderID, pipeline.OrderID, "pipeline.OrderId mismatch") require.Equal(t, postable.Enabled, pipeline.Enabled, "pipeline.Enabled mismatch") require.Equal(t, postable.Config, pipeline.Config, "pipeline.Config mismatch") } diff --git a/pkg/query-service/tests/integration/signoz_integrations_test.go b/pkg/query-service/tests/integration/signoz_integrations_test.go index 1e1915e991..35b71019bf 100644 --- a/pkg/query-service/tests/integration/signoz_integrations_test.go +++ b/pkg/query-service/tests/integration/signoz_integrations_test.go @@ -12,7 +12,6 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/app" "github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations" "github.com/SigNoz/signoz/pkg/query-service/app/integrations" - "github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline" "github.com/SigNoz/signoz/pkg/query-service/auth" "github.com/SigNoz/signoz/pkg/query-service/dao" "github.com/SigNoz/signoz/pkg/query-service/featureManager" @@ -21,6 +20,7 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/utils" "github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/types" + "github.com/SigNoz/signoz/pkg/types/pipelinetypes" mockhouse "github.com/srikanthccv/ClickHouse-go-mock" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -176,10 +176,10 @@ func TestLogPipelinesForInstalledSignozIntegrations(t *testing.T) { // After saving a user created pipeline, pipelines response should include // both user created pipelines and pipelines for installed integrations. - postablePipelines := logparsingpipeline.PostablePipelines{ - Pipelines: []logparsingpipeline.PostablePipeline{ + postablePipelines := pipelinetypes.PostablePipelines{ + Pipelines: []pipelinetypes.PostablePipeline{ { - OrderId: 1, + OrderID: 1, Name: "pipeline1", Alias: "pipeline1", Enabled: true, @@ -197,7 +197,7 @@ func TestLogPipelinesForInstalledSignozIntegrations(t *testing.T) { }, }, }, - Config: []logparsingpipeline.PipelineOperator{ + Config: []pipelinetypes.PipelineOperator{ { OrderId: 1, ID: "add", @@ -223,7 +223,7 @@ func TestLogPipelinesForInstalledSignozIntegrations(t *testing.T) { postable := postableFromPipelines(getPipelinesResp.Pipelines) slices.Reverse(postable.Pipelines) for i := range postable.Pipelines { - postable.Pipelines[i].OrderId = i + 1 + postable.Pipelines[i].OrderID = i + 1 } pipelinesTB.PostPipelinesToQS(postable) @@ -256,7 +256,7 @@ func TestLogPipelinesForInstalledSignozIntegrations(t *testing.T) { // should not be able to edit integrations pipeline. require.Greater(len(postable.Pipelines[0].Config), 0) - postable.Pipelines[0].Config = []logparsingpipeline.PipelineOperator{} + postable.Pipelines[0].Config = []pipelinetypes.PipelineOperator{} pipelinesTB.PostPipelinesToQS(postable) getPipelinesResp = pipelinesTB.GetPipelinesFromQS() @@ -270,7 +270,7 @@ func TestLogPipelinesForInstalledSignozIntegrations(t *testing.T) { require.Greater(len(firstPipeline.Config), 0) // should not be able to delete integrations pipeline - postable.Pipelines = []logparsingpipeline.PostablePipeline{postable.Pipelines[1]} + postable.Pipelines = []pipelinetypes.PostablePipeline{postable.Pipelines[1]} pipelinesTB.PostPipelinesToQS(postable) getPipelinesResp = pipelinesTB.GetPipelinesFromQS() @@ -596,21 +596,21 @@ func NewIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *Integration } } -func postableFromPipelines(pipelines []logparsingpipeline.Pipeline) logparsingpipeline.PostablePipelines { - result := logparsingpipeline.PostablePipelines{} +func postableFromPipelines(gettablePipelines []pipelinetypes.GettablePipeline) pipelinetypes.PostablePipelines { + result := pipelinetypes.PostablePipelines{} - for _, p := range pipelines { - postable := logparsingpipeline.PostablePipeline{ - Id: p.Id, - OrderId: p.OrderId, + for _, p := range gettablePipelines { + postable := pipelinetypes.PostablePipeline{ + ID: p.ID, + OrderID: p.OrderID, Name: p.Name, Alias: p.Alias, Enabled: p.Enabled, Config: p.Config, } - if p.Description != nil { - postable.Description = *p.Description + if p.Description != "" { + postable.Description = p.Description } if p.Filter != nil { diff --git a/pkg/query-service/tests/integration/test_utils.go b/pkg/query-service/tests/integration/test_utils.go index 6c336b64bc..6d50835bb0 100644 --- a/pkg/query-service/tests/integration/test_utils.go +++ b/pkg/query-service/tests/integration/test_utils.go @@ -207,6 +207,13 @@ func AuthenticatedRequestForTest( } req.Header.Add("Authorization", "Bearer "+userJwt.AccessJwt) + + ctx, err := jwt.ContextFromRequest(req.Context(), req.Header.Get("Authorization")) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + return req, nil } diff --git a/pkg/query-service/utils/testutils.go b/pkg/query-service/utils/testutils.go index 2a753ae4aa..29a5e547c2 100644 --- a/pkg/query-service/utils/testutils.go +++ b/pkg/query-service/utils/testutils.go @@ -50,6 +50,7 @@ func NewTestSqliteDB(t *testing.T) (sqlStore sqlstore.SQLStore, testDBFilePath s sqlmigration.NewUpdateOrganizationFactory(sqlStore), sqlmigration.NewUpdateDashboardAndSavedViewsFactory(sqlStore), sqlmigration.NewUpdatePatAndOrgDomainsFactory(sqlStore), + sqlmigration.NewUpdatePipelines(sqlStore), ), ) if err != nil { diff --git a/pkg/signoz/provider.go b/pkg/signoz/provider.go index eece333889..e5b378617d 100644 --- a/pkg/signoz/provider.go +++ b/pkg/signoz/provider.go @@ -61,6 +61,7 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM sqlmigration.NewAddAlertmanagerFactory(sqlstore), sqlmigration.NewUpdateDashboardAndSavedViewsFactory(sqlstore), sqlmigration.NewUpdatePatAndOrgDomainsFactory(sqlstore), + sqlmigration.NewUpdatePipelines(sqlstore), ) } diff --git a/pkg/sqlmigration/017_update_pipelines.go b/pkg/sqlmigration/017_update_pipelines.go new file mode 100644 index 0000000000..b0e63d5800 --- /dev/null +++ b/pkg/sqlmigration/017_update_pipelines.go @@ -0,0 +1,96 @@ +package sqlmigration + +import ( + "context" + + "github.com/SigNoz/signoz/pkg/factory" + "github.com/SigNoz/signoz/pkg/sqlstore" + "github.com/SigNoz/signoz/pkg/types" + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" +) + +type updatePipelines struct { + store sqlstore.SQLStore +} + +func NewUpdatePipelines(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] { + return factory.NewProviderFactory(factory.MustNewName("update_pipelines"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) { + return newUpdatePipelines(ctx, ps, c, sqlstore) + }) +} + +func newUpdatePipelines(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) { + return &updatePipelines{ + store: store, + }, nil +} + +func (migration *updatePipelines) Register(migrations *migrate.Migrations) error { + if err := migrations.Register(migration.Up, migration.Down); err != nil { + return err + } + + return nil +} + +func (migration *updatePipelines) Up(ctx context.Context, db *bun.DB) error { + + // begin transaction + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() //nolint:errcheck + + // get all org ids + var orgIDs []string + if err := migration.store.BunDB().NewSelect().Model((*types.Organization)(nil)).Column("id").Scan(ctx, &orgIDs); err != nil { + return err + } + + // add org id to pipelines table + if exists, err := migration.store.Dialect().ColumnExists(ctx, tx, "pipelines", "org_id"); err != nil { + return err + } else if !exists { + if _, err := tx.NewAddColumn().Table("pipelines").ColumnExpr("org_id TEXT REFERENCES organizations(id) ON DELETE CASCADE").Exec(ctx); err != nil { + return err + } + + // check if there is one org ID if yes then set it to all pipelines. + if len(orgIDs) == 1 { + orgID := orgIDs[0] + if _, err := tx.NewUpdate().Table("pipelines").Set("org_id = ?", orgID).Where("org_id IS NULL").Exec(ctx); err != nil { + return err + } + } + } + + // add updated_by to pipelines table + if exists, err := migration.store.Dialect().ColumnExists(ctx, tx, "pipelines", "updated_by"); err != nil { + return err + } else if !exists { + if _, err := tx.NewAddColumn().Table("pipelines").ColumnExpr("updated_by TEXT").Exec(ctx); err != nil { + return err + } + } + + // add updated_at to pipelines table + if exists, err := migration.store.Dialect().ColumnExists(ctx, tx, "pipelines", "updated_at"); err != nil { + return err + } else if !exists { + if _, err := tx.NewAddColumn().Table("pipelines").ColumnExpr("updated_at TIMESTAMP").Exec(ctx); err != nil { + return err + } + } + + if err := tx.Commit(); err != nil { + return err + } + + return nil +} + +func (migration *updatePipelines) Down(ctx context.Context, db *bun.DB) error { + return nil +} diff --git a/pkg/types/pipeline.go b/pkg/types/pipeline.go deleted file mode 100644 index c5a6a28469..0000000000 --- a/pkg/types/pipeline.go +++ /dev/null @@ -1,22 +0,0 @@ -package types - -import ( - "time" - - "github.com/uptrace/bun" -) - -type Pipeline struct { - bun.BaseModel `bun:"table:pipelines"` - - ID string `bun:"id,pk,type:text"` - OrderID int `bun:"order_id"` - Enabled bool `bun:"enabled"` - CreatedBy string `bun:"created_by,type:text"` - CreatedAt time.Time `bun:"created_at,default:current_timestamp"` - Name string `bun:"name,type:varchar(400),notnull"` - Alias string `bun:"alias,type:varchar(20),notnull"` - Description string `bun:"description,type:text"` - Filter string `bun:"filter,type:text,notnull"` - ConfigJSON string `bun:"config_json,type:text"` -} diff --git a/pkg/query-service/app/logparsingpipeline/postablePipeline.go b/pkg/types/pipelinetypes/pipeline.go similarity index 64% rename from pkg/query-service/app/logparsingpipeline/postablePipeline.go rename to pkg/types/pipelinetypes/pipeline.go index 4c8a200019..f6cd4306ef 100644 --- a/pkg/query-service/app/logparsingpipeline/postablePipeline.go +++ b/pkg/types/pipelinetypes/pipeline.go @@ -1,17 +1,122 @@ -package logparsingpipeline +package pipelinetypes import ( - "errors" + "encoding/json" "fmt" "regexp" + "slices" "strings" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" "github.com/SigNoz/signoz/pkg/query-service/queryBuilderToExpr" - "golang.org/x/exp/slices" + "github.com/SigNoz/signoz/pkg/types" + "github.com/pkg/errors" + "github.com/uptrace/bun" ) -// PostablePipelines are a list of user defined pielines +type StoreablePipeline struct { + bun.BaseModel `bun:"table:pipelines,alias:p"` + + types.UserAuditable + types.TimeAuditable + OrgID string `json:"-" bun:"org_id,notnull"` + ID string `json:"id" bun:"id,pk,type:text"` + OrderID int `json:"orderId" bun:"order_id"` + Enabled bool `json:"enabled" bun:"enabled"` + Name string `json:"name" bun:"name,type:varchar(400),notnull"` + Alias string `json:"alias" bun:"alias,type:varchar(20),notnull"` + Description string `json:"description" bun:"description,type:text"` + FilterString string `json:"-" bun:"filter,type:text,notnull"` + ConfigJSON string `json:"-" bun:"config_json,type:text"` +} + +type GettablePipeline struct { + StoreablePipeline + Filter *v3.FilterSet `json:"filter"` + Config []PipelineOperator `json:"config"` +} + +func (i *GettablePipeline) ParseRawConfig() error { + c := []PipelineOperator{} + err := json.Unmarshal([]byte(i.ConfigJSON), &c) + if err != nil { + return errors.Wrap(err, "failed to parse ingestion rule config") + } + i.Config = c + return nil +} + +func (i *GettablePipeline) ParseFilter() error { + f := v3.FilterSet{} + err := json.Unmarshal([]byte(i.FilterString), &f) + if err != nil { + return errors.Wrap(err, "failed to parse filter") + } + i.Filter = &f + return nil +} + +type Processor struct { + Operators []PipelineOperator `json:"operators" yaml:"operators"` +} + +type PipelineOperator struct { + Type string `json:"type" yaml:"type"` + ID string `json:"id,omitempty" yaml:"id,omitempty"` + Output string `json:"output,omitempty" yaml:"output,omitempty"` + OnError string `json:"on_error,omitempty" yaml:"on_error,omitempty"` + If string `json:"if,omitempty" yaml:"if,omitempty"` + + // don't need the following in the final config + OrderId int `json:"orderId" yaml:"-"` + Enabled bool `json:"enabled" yaml:"-"` + Name string `json:"name,omitempty" yaml:"-"` + + // optional keys depending on the type + ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"` + Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"` + Regex string `json:"regex,omitempty" yaml:"regex,omitempty"` + ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"` + *TraceParser `yaml:",inline,omitempty"` + Field string `json:"field,omitempty" yaml:"field,omitempty"` + Value string `json:"value,omitempty" yaml:"value,omitempty"` + From string `json:"from,omitempty" yaml:"from,omitempty"` + To string `json:"to,omitempty" yaml:"to,omitempty"` + Expr string `json:"expr,omitempty" yaml:"expr,omitempty"` + Routes *[]Route `json:"routes,omitempty" yaml:"routes,omitempty"` + Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"` + Default string `json:"default,omitempty" yaml:"default,omitempty"` + + // time_parser fields. + Layout string `json:"layout,omitempty" yaml:"layout,omitempty"` + LayoutType string `json:"layout_type,omitempty" yaml:"layout_type,omitempty"` + + // severity parser fields + SeverityMapping map[string][]string `json:"mapping,omitempty" yaml:"mapping,omitempty"` + OverwriteSeverityText bool `json:"overwrite_text,omitempty" yaml:"overwrite_text,omitempty"` +} + +type TimestampParser struct { + Layout string `json:"layout" yaml:"layout"` + LayoutType string `json:"layout_type" yaml:"layout_type"` + ParseFrom string `json:"parse_from" yaml:"parse_from"` +} + +type TraceParser struct { + TraceId *ParseFrom `json:"trace_id,omitempty" yaml:"trace_id,omitempty"` + SpanId *ParseFrom `json:"span_id,omitempty" yaml:"span_id,omitempty"` + TraceFlags *ParseFrom `json:"trace_flags,omitempty" yaml:"trace_flags,omitempty"` +} + +type ParseFrom struct { + ParseFrom string `json:"parse_from" yaml:"parse_from"` +} + +type Route struct { + Output string `json:"output" yaml:"output"` + Expr string `json:"expr" yaml:"expr"` +} + type PostablePipelines struct { Pipelines []PostablePipeline `json:"pipelines"` } @@ -19,8 +124,8 @@ type PostablePipelines struct { // PostablePipeline captures user inputs in setting the pipeline type PostablePipeline struct { - Id string `json:"id"` - OrderId int `json:"orderId"` + ID string `json:"id"` + OrderID int `json:"orderId"` Name string `json:"name"` Alias string `json:"alias"` Description string `json:"description"` @@ -31,7 +136,7 @@ type PostablePipeline struct { // IsValid checks if postable pipeline has all the required params func (p *PostablePipeline) IsValid() error { - if p.OrderId == 0 { + if p.OrderID == 0 { return fmt.Errorf("orderId with value > 1 is required") } if p.Name == "" { diff --git a/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go b/pkg/types/pipelinetypes/postable_pipeline_test.go similarity index 98% rename from pkg/query-service/app/logparsingpipeline/postablePipeline_test.go rename to pkg/types/pipelinetypes/postable_pipeline_test.go index 89da545e02..832a786008 100644 --- a/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go +++ b/pkg/types/pipelinetypes/postable_pipeline_test.go @@ -1,4 +1,4 @@ -package logparsingpipeline +package pipelinetypes import ( "testing" @@ -42,7 +42,7 @@ func TestIsValidPostablePipeline(t *testing.T) { { Name: "Invalid orderId", Pipeline: PostablePipeline{ - OrderId: 0, + OrderID: 0, Name: "pipeline 1", Alias: "pipeline1", Enabled: true, @@ -54,7 +54,7 @@ func TestIsValidPostablePipeline(t *testing.T) { { Name: "Valid orderId", Pipeline: PostablePipeline{ - OrderId: 1, + OrderID: 1, Name: "pipeline 1", Alias: "pipeline1", Enabled: true, @@ -66,7 +66,7 @@ func TestIsValidPostablePipeline(t *testing.T) { { Name: "Invalid filter", Pipeline: PostablePipeline{ - OrderId: 1, + OrderID: 1, Name: "pipeline 1", Alias: "pipeline1", Enabled: true, @@ -90,7 +90,7 @@ func TestIsValidPostablePipeline(t *testing.T) { { Name: "Valid filter", Pipeline: PostablePipeline{ - OrderId: 1, + OrderID: 1, Name: "pipeline 1", Alias: "pipeline1", Enabled: true, diff --git a/pkg/query-service/app/logparsingpipeline/time_parser.go b/pkg/types/pipelinetypes/time_parser.go similarity index 99% rename from pkg/query-service/app/logparsingpipeline/time_parser.go rename to pkg/types/pipelinetypes/time_parser.go index a0ec384867..35ab69a408 100644 --- a/pkg/query-service/app/logparsingpipeline/time_parser.go +++ b/pkg/types/pipelinetypes/time_parser.go @@ -1,4 +1,4 @@ -package logparsingpipeline +package pipelinetypes import ( "errors" diff --git a/pkg/types/pipelinetypes/time_parser_test.go b/pkg/types/pipelinetypes/time_parser_test.go new file mode 100644 index 0000000000..822d0bbafb --- /dev/null +++ b/pkg/types/pipelinetypes/time_parser_test.go @@ -0,0 +1,55 @@ +package pipelinetypes + +import ( + "fmt" + "testing" + + "github.com/antonmedv/expr" + "github.com/stretchr/testify/require" +) + +func TestRegexForStrptimeLayout(t *testing.T) { + require := require.New(t) + + var testCases = []struct { + strptimeLayout string + str string + shouldMatch bool + }{ + { + strptimeLayout: "%Y-%m-%dT%H:%M:%S.%f%z", + str: "2023-11-26T12:03:28.239907+0530", + shouldMatch: true, + }, { + strptimeLayout: "%d-%m-%Y", + str: "26-11-2023", + shouldMatch: true, + }, { + strptimeLayout: "%d/%m/%y", + str: "11/03/02", + shouldMatch: true, + }, { + strptimeLayout: "%A, %d. %B %Y %I:%M%p", + str: "Tuesday, 21. November 2006 04:30PM11/03/02", + shouldMatch: true, + }, { + strptimeLayout: "%A, %d. %B %Y %I:%M%p", + str: "some random text", + shouldMatch: false, + }, + } + + for _, test := range testCases { + regex, err := RegexForStrptimeLayout(test.strptimeLayout) + require.Nil(err, test.strptimeLayout) + + code := fmt.Sprintf(`"%s" matches "%s"`, test.str, regex) + program, err := expr.Compile(code) + require.Nil(err, test.strptimeLayout) + + output, err := expr.Run(program, map[string]string{}) + require.Nil(err, test.strptimeLayout) + require.Equal(test.shouldMatch, output, test.strptimeLayout) + + } +}