From 9ace37485575fc28d298ab9a4a894b8fc22dc193 Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+raj-k-singh@users.noreply.github.com> Date: Mon, 11 Mar 2024 14:15:11 +0530 Subject: [PATCH] Feat: QS: Log Pipelines for installed integrations (#4674) * chore: refactor: inject sqlx.DB into opamp.initDB instead of DB file name * chore: reorganize test utils a little * chore: add test validating pipelines for installed integrations show up in pipelines list * chore: get basic integration pipelines testcase passing * chore: reconcile experimental changes with latest state of develop * chore: add integration test for reordering of pipelines * chore: marker for integration pipelines using Id * chore: hookup propagation of installed integration pipelines by opamp * chore: add util for mapping slices * chore: add support for reordering integration pipelines * chore: exclude user saved integration pipelines if no longer installed * chore: flesh out rest of intgeration pipelines scenarios * chore: handle scenario when an integration is installed before any pipelines exist * chore: notify agentConf of update after uninstalling an integration * chore: some minor cleanup * chore: some more cleanup * chore: update ee server for changed controllers * chore: some more cleanup * chore: change builtin integration id prefix to avoid using colons that break yaml * chore: update builtin integration id in test --- ee/query-service/app/server.go | 6 +- pkg/query-service/agentConf/manager.go | 23 +- pkg/query-service/app/http_handler.go | 15 +- pkg/query-service/app/integrations/builtin.go | 2 +- .../app/integrations/builtin_test.go | 2 +- .../app/integrations/controller.go | 22 +- pkg/query-service/app/integrations/manager.go | 41 ++++ .../app/integrations/pipeline_utils.go | 33 +++ .../app/integrations/sqlite_repo.go | 1 + .../app/integrations/test_utils.go | 24 +- .../app/logparsingpipeline/controller.go | 142 +++++++---- .../app/opamp/config_provider_test.go | 14 +- pkg/query-service/app/opamp/model/agents.go | 13 +- pkg/query-service/app/server.go | 10 +- pkg/query-service/constants/constants.go | 2 + .../integration/logparsingpipeline_test.go | 157 ++++-------- .../integration/signoz_integrations_test.go | 224 +++++++++++++++++- .../tests/integration/test_utils.go | 67 ++++++ pkg/query-service/utils/slices.go | 29 +++ pkg/query-service/utils/testutils.go | 29 +++ 20 files changed, 631 insertions(+), 225 deletions(-) create mode 100644 pkg/query-service/app/integrations/pipeline_utils.go create mode 100644 pkg/query-service/utils/slices.go create mode 100644 pkg/query-service/utils/testutils.go diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 81173f2e0f..02fecb50e9 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -172,7 +172,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { } // initiate opamp - _, err = opAmpModel.InitDB(baseconst.RELATIONAL_DATASOURCE_PATH) + _, err = opAmpModel.InitDB(localDB) if err != nil { return nil, err } @@ -185,7 +185,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { } // ingestion pipelines manager - logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(localDB, "sqlite") + logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController( + localDB, "sqlite", integrationsController.GetPipelinesForInstalledIntegrations, + ) if err != nil { return nil, err } diff --git a/pkg/query-service/agentConf/manager.go b/pkg/query-service/agentConf/manager.go index 0e77383f7e..0fdab4e990 100644 --- a/pkg/query-service/agentConf/manager.go +++ b/pkg/query-service/agentConf/manager.go @@ -111,10 +111,6 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) ( return nil, "", errors.Wrap(apiErr.ToError(), "failed to get latest agent config version") } - if latestConfig == nil { - continue - } - updatedConf, serializedSettingsUsed, apiErr := feature.RecommendAgentConfig( recommendation, latestConfig, ) @@ -124,13 +120,24 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) ( )) } recommendation = updatedConf - configId := fmt.Sprintf("%s:%d", featureType, latestConfig.Version) + + // It is possible for a feature to recommend collector config + // before any user created config versions exist. + // + // For example, log pipeline config for installed integrations will + // have to be recommended even if the user hasn't created any pipelines yet + configVersion := -1 + if latestConfig != nil { + configVersion = latestConfig.Version + } + configId := fmt.Sprintf("%s:%d", featureType, configVersion) + settingVersionsUsed = append(settingVersionsUsed, configId) m.updateDeployStatus( context.Background(), featureType, - latestConfig.Version, + configVersion, string(DeployInitiated), "Deployment has started", configId, @@ -209,6 +216,10 @@ func StartNewVersion( return cfg, nil } +func NotifyConfigUpdate(ctx context.Context) { + m.notifyConfigUpdateSubscribers() +} + func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiError { configVersion, err := GetConfigVersion(ctx, typ, version) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index dac4e3e51e..7cfdc7b39f 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2787,16 +2787,17 @@ func (ah *APIHandler) listLogsPipelines(ctx context.Context) ( *logparsingpipeline.PipelinesResponse, *model.ApiError, ) { // get lateset agent config + latestVersion := -1 lastestConfig, err := agentConf.GetLatestVersion(ctx, logPipelines) - if err != nil { - if err.Type() != model.ErrorNotFound { - return nil, model.WrapApiError(err, "failed to get latest agent config version") - } else { - return nil, nil - } + if err != nil && err.Type() != model.ErrorNotFound { + return nil, model.WrapApiError(err, "failed to get latest agent config version") } - payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, lastestConfig.Version) + if lastestConfig != nil { + latestVersion = lastestConfig.Version + } + + payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, latestVersion) if err != nil { return nil, model.WrapApiError(err, "failed to get pipelines") } diff --git a/pkg/query-service/app/integrations/builtin.go b/pkg/query-service/app/integrations/builtin.go index 7b6c98c453..905bc39c84 100644 --- a/pkg/query-service/app/integrations/builtin.go +++ b/pkg/query-service/app/integrations/builtin.go @@ -127,7 +127,7 @@ func readBuiltInIntegration(dirpath string) ( ) } - integration.Id = "builtin::" + integration.Id + integration.Id = "builtin-" + integration.Id return &integration, nil } diff --git a/pkg/query-service/app/integrations/builtin_test.go b/pkg/query-service/app/integrations/builtin_test.go index 66d7cea0fa..cb72d5dcba 100644 --- a/pkg/query-service/app/integrations/builtin_test.go +++ b/pkg/query-service/app/integrations/builtin_test.go @@ -20,7 +20,7 @@ func TestBuiltinIntegrations(t *testing.T) { "some built in integrations are expected to be bundled.", ) - nginxIntegrationId := "builtin::nginx" + nginxIntegrationId := "builtin-nginx" res, apiErr := repo.get(context.Background(), []string{ nginxIntegrationId, }) diff --git a/pkg/query-service/app/integrations/controller.go b/pkg/query-service/app/integrations/controller.go index e047333bc5..1a347a73a5 100644 --- a/pkg/query-service/app/integrations/controller.go +++ b/pkg/query-service/app/integrations/controller.go @@ -5,6 +5,8 @@ import ( "fmt" "github.com/jmoiron/sqlx" + "go.signoz.io/signoz/pkg/query-service/agentConf" + "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/model" ) @@ -74,9 +76,14 @@ type InstallIntegrationRequest struct { func (c *Controller) Install( ctx context.Context, req *InstallIntegrationRequest, ) (*IntegrationsListItem, *model.ApiError) { - return c.mgr.InstallIntegration( + res, apiErr := c.mgr.InstallIntegration( ctx, req.IntegrationId, req.Config, ) + if apiErr != nil { + return nil, apiErr + } + agentConf.NotifyConfigUpdate(ctx) + return res, nil } type UninstallIntegrationRequest struct { @@ -92,7 +99,18 @@ func (c *Controller) Uninstall( )) } - return c.mgr.UninstallIntegration( + apiErr := c.mgr.UninstallIntegration( ctx, req.IntegrationId, ) + if apiErr != nil { + return apiErr + } + agentConf.NotifyConfigUpdate(ctx) + return nil +} + +func (c *Controller) GetPipelinesForInstalledIntegrations( + ctx context.Context, +) ([]logparsingpipeline.Pipeline, *model.ApiError) { + return c.mgr.GetPipelinesForInstalledIntegrations(ctx) } diff --git a/pkg/query-service/app/integrations/manager.go b/pkg/query-service/app/integrations/manager.go index 1685afff65..37541b0a19 100644 --- a/pkg/query-service/app/integrations/manager.go +++ b/pkg/query-service/app/integrations/manager.go @@ -7,12 +7,14 @@ import ( "strings" "time" + "github.com/google/uuid" "github.com/jmoiron/sqlx" "go.signoz.io/signoz/pkg/query-service/app/dashboards" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/rules" + "go.signoz.io/signoz/pkg/query-service/utils" ) type IntegrationAuthor struct { @@ -294,3 +296,42 @@ func (m *Manager) getInstalledIntegration( } return &installation, nil } + +func (m *Manager) GetPipelinesForInstalledIntegrations( + ctx context.Context, +) ([]logparsingpipeline.Pipeline, *model.ApiError) { + installations, apiErr := m.installedIntegrationsRepo.list(ctx) + if apiErr != nil { + return nil, apiErr + } + + installedIds := utils.MapSlice(installations, func(i InstalledIntegration) string { + return i.IntegrationId + }) + installedIntegrations, apiErr := m.availableIntegrationsRepo.get(ctx, installedIds) + if apiErr != nil { + return nil, apiErr + } + + pipelines := []logparsingpipeline.Pipeline{} + for _, ii := range installedIntegrations { + for _, p := range ii.Assets.Logs.Pipelines { + pp := logparsingpipeline.Pipeline{ + // Alias is used for identifying integration pipelines. Id can't be used for this + // since versioning while saving pipelines requires a new id for each version + // to avoid altering history when pipelines are edited/reordered etc + Alias: AliasForIntegrationPipeline(ii.Id, p.Alias), + Id: uuid.NewString(), + OrderId: p.OrderId, + Enabled: p.Enabled, + Name: p.Name, + Description: &p.Description, + Filter: p.Filter, + Config: p.Config, + } + pipelines = append(pipelines, pp) + } + } + + return pipelines, nil +} diff --git a/pkg/query-service/app/integrations/pipeline_utils.go b/pkg/query-service/app/integrations/pipeline_utils.go new file mode 100644 index 0000000000..49ab5dd82a --- /dev/null +++ b/pkg/query-service/app/integrations/pipeline_utils.go @@ -0,0 +1,33 @@ +package integrations + +import ( + "strings" + + "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" + "go.signoz.io/signoz/pkg/query-service/constants" +) + +const IntegrationPipelineIdSeparator string = "--" + +func AliasForIntegrationPipeline( + integrationId string, pipelineName string, +) string { + return strings.Join( + []string{constants.IntegrationPipelineIdPrefix, integrationId, pipelineName}, + IntegrationPipelineIdSeparator, + ) +} + +// Returns ptr to integration_id string if `p` is a pipeline for an installed integration. +// Returns null otherwise. +func IntegrationIdForPipeline(p logparsingpipeline.Pipeline) *string { + if strings.HasPrefix(p.Alias, constants.IntegrationPipelineIdPrefix) { + parts := strings.Split(p.Alias, IntegrationPipelineIdSeparator) + if len(parts) < 2 { + return nil + } + integrationId := parts[1] + return &integrationId + } + return nil +} diff --git a/pkg/query-service/app/integrations/sqlite_repo.go b/pkg/query-service/app/integrations/sqlite_repo.go index 94e9c4d51d..2c3e9fc699 100644 --- a/pkg/query-service/app/integrations/sqlite_repo.go +++ b/pkg/query-service/app/integrations/sqlite_repo.go @@ -62,6 +62,7 @@ func (r *InstalledIntegrationsSqliteRepo) list( config_json, installed_at from integrations_installed + order by installed_at `, ) if err != nil { diff --git a/pkg/query-service/app/integrations/test_utils.go b/pkg/query-service/app/integrations/test_utils.go index 2616bcba19..d06b2db75c 100644 --- a/pkg/query-service/app/integrations/test_utils.go +++ b/pkg/query-service/app/integrations/test_utils.go @@ -2,39 +2,19 @@ package integrations import ( "context" - "os" "slices" "testing" - "github.com/jmoiron/sqlx" "go.signoz.io/signoz/pkg/query-service/app/dashboards" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/rules" + "go.signoz.io/signoz/pkg/query-service/utils" ) -func NewTestSqliteDB(t *testing.T) ( - db *sqlx.DB, dbFilePath string, -) { - testDBFile, err := os.CreateTemp("", "test-signoz-db-*") - if err != nil { - t.Fatalf("could not create temp file for test db: %v", err) - } - testDBFilePath := testDBFile.Name() - t.Cleanup(func() { os.Remove(testDBFilePath) }) - testDBFile.Close() - - testDB, err := sqlx.Open("sqlite3", testDBFilePath) - if err != nil { - t.Fatalf("could not open test db sqlite file: %v", err) - } - - return testDB, testDBFilePath -} - func NewTestIntegrationsManager(t *testing.T) *Manager { - testDB, _ := NewTestSqliteDB(t) + testDB := utils.NewQueryServiceDBForTests(t) installedIntegrationsRepo, err := NewInstalledIntegrationsSqliteRepo(testDB) if err != nil { diff --git a/pkg/query-service/app/logparsingpipeline/controller.go b/pkg/query-service/app/logparsingpipeline/controller.go index eed3befec5..9527fe9e8d 100644 --- a/pkg/query-service/app/logparsingpipeline/controller.go +++ b/pkg/query-service/app/logparsingpipeline/controller.go @@ -4,25 +4,38 @@ import ( "context" "encoding/json" "fmt" + "slices" + "strings" + "github.com/google/uuid" "github.com/jmoiron/sqlx" "github.com/pkg/errors" "go.signoz.io/signoz/pkg/query-service/agentConf" "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/model" - "go.uber.org/multierr" + "go.signoz.io/signoz/pkg/query-service/utils" "go.uber.org/zap" ) // Controller takes care of deployment cycle of log parsing pipelines. type LogParsingPipelineController struct { Repo + + GetIntegrationPipelines func(context.Context) ([]Pipeline, *model.ApiError) } -func NewLogParsingPipelinesController(db *sqlx.DB, engine string) (*LogParsingPipelineController, error) { +func NewLogParsingPipelinesController( + db *sqlx.DB, + engine string, + getIntegrationPipelines func(context.Context) ([]Pipeline, *model.ApiError), +) (*LogParsingPipelineController, error) { repo := NewRepo(db) err := repo.InitDB(engine) - return &LogParsingPipelineController{Repo: repo}, err + return &LogParsingPipelineController{ + Repo: repo, + GetIntegrationPipelines: getIntegrationPipelines, + }, err } // PipelinesResponse is used to prepare http response for pipelines config related requests @@ -47,29 +60,22 @@ func (ic *LogParsingPipelineController) ApplyPipelines( var pipelines []Pipeline // scan through postable pipelines, to select the existing pipelines or insert missing ones - for _, r := range postable { + for idx, r := range postable { // note: we process only new and changed pipelines here, deleted pipelines are not expected // from client. if user deletes a pipelines, the client should not send that pipelines in the update. // in effect, the new config version will not have that pipelines. - if r.Id == "" { - // looks like a new or changed pipeline, store it first - inserted, err := ic.insertPipeline(ctx, &r) - if err != nil { - zap.S().Errorf("failed to insert edited pipeline %s", err.Error()) - return nil, model.WrapApiError(err, "failed to insert edited pipeline") - } else { - pipelines = append(pipelines, *inserted) - } - } else { - selected, err := ic.GetPipeline(ctx, r.Id) - if err != nil { - zap.S().Errorf("failed to find edited pipeline %s", err.Error()) - return nil, model.WrapApiError(err, "failed to find edited pipeline") - } - pipelines = append(pipelines, *selected) + // For versioning, pipelines get stored with unique ids each time they are saved. + // This ensures updating a pipeline doesn't alter historical versions that referenced + // the same pipeline id. + r.Id = uuid.NewString() + r.OrderId = idx + 1 + pipeline, apiErr := ic.insertPipeline(ctx, &r) + if apiErr != nil { + return nil, model.WrapApiError(apiErr, "failed to insert pipeline") } + pipelines = append(pipelines, *pipeline) } @@ -85,34 +91,85 @@ func (ic *LogParsingPipelineController) ApplyPipelines( return nil, err } - history, _ := agentConf.GetConfigHistory(ctx, agentConf.ElementTypeLogPipelines, 10) - insertedCfg, _ := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, cfg.Version) + return ic.GetPipelinesByVersion(ctx, cfg.Version) +} - response := &PipelinesResponse{ - ConfigVersion: insertedCfg, - Pipelines: pipelines, - History: history, +// Returns effective list of pipelines including user created +// pipelines and pipelines for installed integrations +func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion( + ctx context.Context, version int, +) ([]Pipeline, *model.ApiError) { + result := []Pipeline{} + + if version >= 0 { + savedPipelines, errors := ic.getPipelinesByVersion(ctx, version) + if errors != nil { + zap.S().Errorf("failed to get pipelines for version %d, %w", version, errors) + return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version")) + } + result = savedPipelines } - if err != nil { - return response, model.WrapApiError(err, "failed to apply pipelines") + integrationPipelines, apiErr := ic.GetIntegrationPipelines(ctx) + if apiErr != nil { + return nil, model.WrapApiError( + apiErr, "could not get pipelines for installed integrations", + ) } - return response, nil + + // Filter out any integration pipelines included in pipelines saved by user + // if the corresponding integration is no longer installed. + ipAliases := utils.MapSlice(integrationPipelines, func(p Pipeline) string { + return p.Alias + }) + result = utils.FilterSlice(result, func(p Pipeline) bool { + if !strings.HasPrefix(p.Alias, constants.IntegrationPipelineIdPrefix) { + return true + } + return slices.Contains(ipAliases, p.Alias) + }) + + // Add installed integration pipelines to the list of pipelines saved by user. + // Users are allowed to enable/disable and reorder integration pipelines while + // saving the pipeline list. + for _, ip := range integrationPipelines { + userPipelineIdx := slices.IndexFunc(result, func(p Pipeline) bool { + return p.Alias == ip.Alias + }) + if userPipelineIdx >= 0 { + ip.Enabled = result[userPipelineIdx].Enabled + result[userPipelineIdx] = ip + } else { + // installed integration pipelines get added to the end of the list by default. + result = append(result, ip) + } + } + + for idx := range result { + result[idx].OrderId = idx + 1 + } + + return result, nil } // GetPipelinesByVersion responds with version info and associated pipelines func (ic *LogParsingPipelineController) GetPipelinesByVersion( ctx context.Context, version int, ) (*PipelinesResponse, *model.ApiError) { - pipelines, errors := ic.getPipelinesByVersion(ctx, version) + pipelines, errors := ic.getEffectivePipelinesByVersion(ctx, version) if errors != nil { zap.S().Errorf("failed to get pipelines for version %d, %w", version, errors) return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version")) } - configVersion, err := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, version) - if err != nil { - zap.S().Errorf("failed to get config for version %d, %s", version, err.Error()) - return nil, model.WrapApiError(err, "failed to get config for given version") + + var configVersion *agentConf.ConfigVersion + if version >= 0 { + cv, err := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, version) + if err != nil { + zap.S().Errorf("failed to get config for version %d, %s", version, err.Error()) + return nil, model.WrapApiError(err, "failed to get config for given version") + } + configVersion = cv } return &PipelinesResponse{ @@ -163,26 +220,29 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig( serializedSettingsUsed string, apiErr *model.ApiError, ) { + pipelinesVersion := -1 + if configVersion != nil { + pipelinesVersion = configVersion.Version + } - pipelines, errs := pc.getPipelinesByVersion( - context.Background(), configVersion.Version, + pipelinesResp, apiErr := pc.GetPipelinesByVersion( + context.Background(), pipelinesVersion, ) - if len(errs) > 0 { - return nil, "", model.InternalError(multierr.Combine(errs...)) + if apiErr != nil { + return nil, "", apiErr } updatedConf, apiErr := GenerateCollectorConfigWithPipelines( - currentConfYaml, pipelines, + currentConfYaml, pipelinesResp.Pipelines, ) if apiErr != nil { return nil, "", model.WrapApiError(apiErr, "could not marshal yaml for updated conf") } - rawPipelineData, err := json.Marshal(pipelines) + rawPipelineData, err := json.Marshal(pipelinesResp.Pipelines) if err != nil { return nil, "", model.BadRequest(errors.Wrap(err, "could not serialize pipelines to JSON")) } return updatedConf, string(rawPipelineData), nil - } diff --git a/pkg/query-service/app/opamp/config_provider_test.go b/pkg/query-service/app/opamp/config_provider_test.go index 6718ff1581..1a6efe122a 100644 --- a/pkg/query-service/app/opamp/config_provider_test.go +++ b/pkg/query-service/app/opamp/config_provider_test.go @@ -2,7 +2,6 @@ package opamp import ( "fmt" - "os" "testing" "github.com/knadh/koanf" @@ -13,6 +12,7 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/require" "go.signoz.io/signoz/pkg/query-service/app/opamp/model" + "go.signoz.io/signoz/pkg/query-service/utils" "golang.org/x/exp/maps" ) @@ -165,16 +165,8 @@ type testbed struct { } func newTestbed(t *testing.T) *testbed { - // Init opamp model. - testDBFile, err := os.CreateTemp("", "test-signoz-db-*") - if err != nil { - t.Fatalf("could not create temp file for test db: %v", err) - } - testDBFilePath := testDBFile.Name() - t.Cleanup(func() { os.Remove(testDBFilePath) }) - testDBFile.Close() - - _, err = model.InitDB(testDBFilePath) + testDB := utils.NewQueryServiceDBForTests(t) + _, err := model.InitDB(testDB) if err != nil { t.Fatalf("could not init opamp model: %v", err) } diff --git a/pkg/query-service/app/opamp/model/agents.go b/pkg/query-service/app/opamp/model/agents.go index 50a554b957..2e2118e216 100644 --- a/pkg/query-service/app/opamp/model/agents.go +++ b/pkg/query-service/app/opamp/model/agents.go @@ -29,14 +29,9 @@ func (a *Agents) Count() int { return len(a.connections) } -// InitDB initializes the database and creates the agents table. -func InitDB(dataSourceName string) (*sqlx.DB, error) { - var err error - - db, err = sqlx.Open("sqlite3", dataSourceName) - if err != nil { - return nil, err - } +// Initialize the database and create schema if needed +func InitDB(qsDB *sqlx.DB) (*sqlx.DB, error) { + db = qsDB tableSchema := `CREATE TABLE IF NOT EXISTS agents ( agent_id TEXT PRIMARY KEY UNIQUE, @@ -46,7 +41,7 @@ func InitDB(dataSourceName string) (*sqlx.DB, error) { effective_config TEXT NOT NULL );` - _, err = db.Exec(tableSchema) + _, err := db.Exec(tableSchema) if err != nil { return nil, fmt.Errorf("Error in creating agents table: %s", err.Error()) } diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 5c4e6dd7e8..9d934d0632 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -159,12 +159,12 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { integrationsController, err := integrations.NewController(localDB) if err != nil { - return nil, fmt.Errorf( - "couldn't create integrations controller: %w", err, - ) + return nil, fmt.Errorf("couldn't create integrations controller: %w", err) } - logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(localDB, "sqlite") + logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController( + localDB, "sqlite", integrationsController.GetPipelinesForInstalledIntegrations, + ) if err != nil { return nil, err } @@ -213,7 +213,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { s.privateHTTP = privateServer - _, err = opAmpModel.InitDB(constants.RELATIONAL_DATASOURCE_PATH) + _, err = opAmpModel.InitDB(localDB) if err != nil { return nil, err } diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 59e76c0ec7..b820a62c58 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -308,6 +308,8 @@ var ReservedColumnTargetAliases = map[string]struct{}{ // logsPPLPfx is a short constant for logsPipelinePrefix const LogsPPLPfx = "logstransform/pipeline_" +const IntegrationPipelineIdPrefix = "integration" + // The datatype present here doesn't represent the actual datatype of column in the logs table. var StaticFieldsLogsV3 = map[string]v3.AttributeKey{ diff --git a/pkg/query-service/tests/integration/logparsingpipeline_test.go b/pkg/query-service/tests/integration/logparsingpipeline_test.go index 81e2c23cb9..9ef47171a4 100644 --- a/pkg/query-service/tests/integration/logparsingpipeline_test.go +++ b/pkg/query-service/tests/integration/logparsingpipeline_test.go @@ -1,14 +1,11 @@ package tests import ( - "bytes" - "context" "encoding/json" "fmt" "io" - "net/http" "net/http/httptest" - "os" + "runtime/debug" "strings" "testing" @@ -18,10 +15,10 @@ import ( "github.com/knadh/koanf/parsers/yaml" "github.com/open-telemetry/opamp-go/protobufs" "github.com/pkg/errors" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.signoz.io/signoz/pkg/query-service/agentConf" "go.signoz.io/signoz/pkg/query-service/app" + "go.signoz.io/signoz/pkg/query-service/app/integrations" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/app/opamp" opampModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model" @@ -31,20 +28,21 @@ import ( "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr" + "go.signoz.io/signoz/pkg/query-service/utils" "golang.org/x/exp/maps" "golang.org/x/exp/slices" ) func TestLogPipelinesLifecycle(t *testing.T) { - testbed := NewLogPipelinesTestBed(t) - assert := assert.New(t) + testbed := NewLogPipelinesTestBed(t, nil) + require := require.New(t) getPipelinesResp := testbed.GetPipelinesFromQS() - assert.Equal( + require.Equal( 0, len(getPipelinesResp.Pipelines), "There should be no pipelines at the start", ) - assert.Equal( + require.Equal( 0, len(getPipelinesResp.History), "There should be no pipelines config history at the start", ) @@ -118,11 +116,11 @@ func TestLogPipelinesLifecycle(t *testing.T) { ) // Deployment status should be pending. - assert.Equal( + require.Equal( 1, len(getPipelinesResp.History), "pipelines config history should not be empty after 1st configuration", ) - assert.Equal( + require.Equal( agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus, "pipelines deployment should be in progress after 1st configuration", ) @@ -134,7 +132,7 @@ func TestLogPipelinesLifecycle(t *testing.T) { assertPipelinesResponseMatchesPostedPipelines( t, postablePipelines, getPipelinesResp, ) - assert.Equal( + require.Equal( agentConf.Deployed, getPipelinesResp.History[0].DeployStatus, "pipeline deployment should be complete after acknowledgment from opamp client", @@ -149,12 +147,13 @@ func TestLogPipelinesLifecycle(t *testing.T) { testbed.assertPipelinesSentToOpampClient(updatePipelinesResp.Pipelines) testbed.assertNewAgentGetsPipelinesOnConnection(updatePipelinesResp.Pipelines) - assert.Equal( - 2, len(updatePipelinesResp.History), + getPipelinesResp = testbed.GetPipelinesFromQS() + require.Equal( + 2, len(getPipelinesResp.History), "there should be 2 history entries after posting pipelines config for the 2nd time", ) - assert.Equal( - agentConf.DeployInitiated, updatePipelinesResp.History[0].DeployStatus, + require.Equal( + agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus, "deployment should be in progress for latest pipeline config", ) @@ -165,7 +164,7 @@ func TestLogPipelinesLifecycle(t *testing.T) { assertPipelinesResponseMatchesPostedPipelines( t, postablePipelines, getPipelinesResp, ) - assert.Equal( + require.Equal( agentConf.Deployed, getPipelinesResp.History[0].DeployStatus, "deployment for latest pipeline config should be complete after acknowledgment from opamp client", @@ -174,7 +173,7 @@ func TestLogPipelinesLifecycle(t *testing.T) { func TestLogPipelinesHistory(t *testing.T) { require := require.New(t) - testbed := NewLogPipelinesTestBed(t) + testbed := NewLogPipelinesTestBed(t, nil) // Only the latest config version can be "IN_PROGRESS", // other incomplete deployments should have status "UNKNOWN" @@ -356,7 +355,7 @@ func TestLogPipelinesValidation(t *testing.T) { for _, tc := range testCases { t.Run(tc.Name, func(t *testing.T) { - testbed := NewLogPipelinesTestBed(t) + testbed := NewLogPipelinesTestBed(t, nil) testbed.PostPipelinesToQSExpectingStatusCode( logparsingpipeline.PostablePipelines{ Pipelines: []logparsingpipeline.PostablePipeline{tc.Pipeline}, @@ -369,7 +368,7 @@ func TestLogPipelinesValidation(t *testing.T) { func TestCanSavePipelinesWithoutConnectedAgents(t *testing.T) { require := require.New(t) - testbed := NewTestbedWithoutOpamp(t) + testbed := NewTestbedWithoutOpamp(t, nil) getPipelinesResp := testbed.GetPipelinesFromQS() require.Equal(0, len(getPipelinesResp.Pipelines)) @@ -422,7 +421,6 @@ func TestCanSavePipelinesWithoutConnectedAgents(t *testing.T) { // configuring log pipelines and provides test helpers. type LogPipelinesTestBed struct { t *testing.T - testDBFilePath string testUser *model.User apiHandler *app.APIHandler agentConfMgr *agentConf.Manager @@ -430,25 +428,20 @@ type LogPipelinesTestBed struct { opampClientConn *opamp.MockOpAmpConnection } -func NewTestbedWithoutOpamp(t *testing.T) *LogPipelinesTestBed { - // Create a tmp file based sqlite db for testing. - testDBFile, err := os.CreateTemp("", "test-signoz-db-*") - if err != nil { - t.Fatalf("could not create temp file for test db: %v", err) +// testDB can be injected for sharing a DB across multiple integration testbeds. +func NewTestbedWithoutOpamp(t *testing.T, testDB *sqlx.DB) *LogPipelinesTestBed { + if testDB == nil { + testDB = utils.NewQueryServiceDBForTests(t) } - testDBFilePath := testDBFile.Name() - t.Cleanup(func() { os.Remove(testDBFilePath) }) - testDBFile.Close() - // TODO(Raj): move away from singleton DB instances to avoid - // issues when running tests in parallel. - dao.InitDao("sqlite", testDBFilePath) - - testDB, err := sqlx.Open("sqlite3", testDBFilePath) + ic, err := integrations.NewController(testDB) if err != nil { - t.Fatalf("could not open test db sqlite file: %v", err) + t.Fatalf("could not create integrations controller: %v", err) } - controller, err := logparsingpipeline.NewLogParsingPipelinesController(testDB, "sqlite") + + controller, err := logparsingpipeline.NewLogParsingPipelinesController( + testDB, "sqlite", ic.GetPipelinesForInstalledIntegrations, + ) if err != nil { t.Fatalf("could not create a logparsingpipelines controller: %v", err) } @@ -467,7 +460,7 @@ func NewTestbedWithoutOpamp(t *testing.T) *LogPipelinesTestBed { } // Mock an available opamp agent - testDB, err = opampModel.InitDB(testDBFilePath) + testDB, err = opampModel.InitDB(testDB) require.Nil(t, err, "failed to init opamp model") agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{ @@ -479,16 +472,15 @@ func NewTestbedWithoutOpamp(t *testing.T) *LogPipelinesTestBed { require.Nil(t, err, "failed to init agentConf") return &LogPipelinesTestBed{ - t: t, - testDBFilePath: testDBFilePath, - testUser: user, - apiHandler: apiHandler, - agentConfMgr: agentConfMgr, + t: t, + testUser: user, + apiHandler: apiHandler, + agentConfMgr: agentConfMgr, } } -func NewLogPipelinesTestBed(t *testing.T) *LogPipelinesTestBed { - testbed := NewTestbedWithoutOpamp(t) +func NewLogPipelinesTestBed(t *testing.T, testDB *sqlx.DB) *LogPipelinesTestBed { + testbed := NewTestbedWithoutOpamp(t, testDB) opampServer := opamp.InitializeServer(nil, testbed.agentConfMgr) err := opampServer.Start(opamp.GetAvailableLocalAddress()) @@ -590,8 +582,8 @@ func (tb *LogPipelinesTestBed) GetPipelinesFromQS() *logparsingpipeline.Pipeline if response.StatusCode != 200 { tb.t.Fatalf( - "could not list log parsing pipelines. status: %d, body: %v", - response.StatusCode, string(responseBody), + "could not list log parsing pipelines. status: %d, body: %v\n%s", + response.StatusCode, string(responseBody), string(debug.Stack()), ) } @@ -625,7 +617,7 @@ func assertPipelinesRecommendedInRemoteConfig( pipelines []logparsingpipeline.Pipeline, ) { collectorConfigFiles := msg.RemoteConfig.Config.ConfigMap - assert.Equal( + require.Equal( t, len(collectorConfigFiles), 1, "otel config sent to client is expected to contain atleast 1 file", ) @@ -653,7 +645,7 @@ func assertPipelinesRecommendedInRemoteConfig( } _, expectedLogProcessorNames, err := logparsingpipeline.PreparePipelineProcessor(pipelines) - assert.Equal( + require.Equal( t, expectedLogProcessorNames, collectorConfLogsPipelineProcNames, "config sent to opamp client doesn't contain expected log pipelines", ) @@ -661,7 +653,7 @@ func assertPipelinesRecommendedInRemoteConfig( collectorConfProcessors := collectorConfSentToClient["processors"].(map[string]interface{}) for _, procName := range expectedLogProcessorNames { pipelineProcessorInConf, procExists := collectorConfProcessors[procName] - assert.True(t, procExists, fmt.Sprintf( + require.True(t, procExists, fmt.Sprintf( "%s processor not found in config sent to opamp client", procName, )) @@ -747,16 +739,16 @@ func assertPipelinesResponseMatchesPostedPipelines( postablePipelines logparsingpipeline.PostablePipelines, pipelinesResp *logparsingpipeline.PipelinesResponse, ) { - assert.Equal( + require.Equal( t, len(postablePipelines.Pipelines), len(pipelinesResp.Pipelines), "length mistmatch between posted pipelines and pipelines in response", ) for i, pipeline := range pipelinesResp.Pipelines { postable := postablePipelines.Pipelines[i] - assert.Equal(t, postable.Name, pipeline.Name, "pipeline.Name mismatch") - assert.Equal(t, postable.OrderId, pipeline.OrderId, "pipeline.OrderId mismatch") - assert.Equal(t, postable.Enabled, pipeline.Enabled, "pipeline.Enabled mismatch") - assert.Equal(t, postable.Config, pipeline.Config, "pipeline.Config mismatch") + require.Equal(t, postable.Name, pipeline.Name, "pipeline.Name mismatch") + require.Equal(t, postable.OrderId, pipeline.OrderId, "pipeline.OrderId mismatch") + require.Equal(t, postable.Enabled, pipeline.Enabled, "pipeline.Enabled mismatch") + require.Equal(t, postable.Config, pipeline.Config, "pipeline.Config mismatch") } } @@ -792,60 +784,3 @@ func newInitialAgentConfigMap() *protobufs.AgentConfigMap { }, } } - -func createTestUser() (*model.User, *model.ApiError) { - // Create a test user for auth - ctx := context.Background() - org, apiErr := dao.DB().CreateOrg(ctx, &model.Organization{ - Name: "test", - }) - if apiErr != nil { - return nil, apiErr - } - - group, apiErr := dao.DB().GetGroupByName(ctx, constants.AdminGroup) - if apiErr != nil { - return nil, apiErr - } - - auth.InitAuthCache(ctx) - - return dao.DB().CreateUser( - ctx, - &model.User{ - Name: "test", - Email: "test@test.com", - Password: "test", - OrgId: org.Id, - GroupId: group.Id, - }, - true, - ) -} - -func NewAuthenticatedTestRequest( - user *model.User, - path string, - postData interface{}, -) (*http.Request, error) { - userJwt, err := auth.GenerateJWTForUser(user) - if err != nil { - return nil, err - } - - var req *http.Request - - if postData != nil { - var body bytes.Buffer - err = json.NewEncoder(&body).Encode(postData) - if err != nil { - return nil, err - } - req = httptest.NewRequest(http.MethodPost, path, &body) - } else { - req = httptest.NewRequest(http.MethodGet, path, nil) - } - - req.Header.Add("Authorization", "Bearer "+userJwt.AccessJwt) - return req, nil -} diff --git a/pkg/query-service/tests/integration/signoz_integrations_test.go b/pkg/query-service/tests/integration/signoz_integrations_test.go index 59fa256f7f..8c6402f3f1 100644 --- a/pkg/query-service/tests/integration/signoz_integrations_test.go +++ b/pkg/query-service/tests/integration/signoz_integrations_test.go @@ -7,23 +7,28 @@ import ( "net/http" "net/http/httptest" "runtime/debug" + "slices" "testing" + "github.com/jmoiron/sqlx" mockhouse "github.com/srikanthccv/ClickHouse-go-mock" "github.com/stretchr/testify/require" "go.signoz.io/signoz/pkg/query-service/app" "go.signoz.io/signoz/pkg/query-service/app/integrations" + "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/dao" "go.signoz.io/signoz/pkg/query-service/featureManager" "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" ) // Higher level tests for UI facing APIs func TestSignozIntegrationLifeCycle(t *testing.T) { require := require.New(t) - testbed := NewIntegrationsTestBed(t) + testbed := NewIntegrationsTestBed(t, nil) installedResp := testbed.GetInstalledIntegrationsFromQS() require.Equal( @@ -92,6 +97,184 @@ func TestSignozIntegrationLifeCycle(t *testing.T) { require.False(availableIntegrations[0].IsInstalled) } +func TestLogPipelinesForInstalledSignozIntegrations(t *testing.T) { + require := require.New(t) + + testDB := utils.NewQueryServiceDBForTests(t) + integrationsTB := NewIntegrationsTestBed(t, testDB) + pipelinesTB := NewLogPipelinesTestBed(t, testDB) + + availableIntegrationsResp := integrationsTB.GetAvailableIntegrationsFromQS() + availableIntegrations := availableIntegrationsResp.Integrations + require.Greater( + len(availableIntegrations), 0, + "some integrations should come bundled with SigNoz", + ) + + getPipelinesResp := pipelinesTB.GetPipelinesFromQS() + require.Equal( + 0, len(getPipelinesResp.Pipelines), + "There should be no pipelines at the start", + ) + + // Find an available integration that contains a log pipeline + var testAvailableIntegration *integrations.IntegrationsListItem + for _, ai := range availableIntegrations { + details := integrationsTB.GetIntegrationDetailsFromQS(ai.Id) + require.NotNil(details) + if len(details.Assets.Logs.Pipelines) > 0 { + testAvailableIntegration = &ai + break + } + } + require.NotNil(testAvailableIntegration) + + // Installing an integration should add its pipelines to pipelines list + require.False(testAvailableIntegration.IsInstalled) + integrationsTB.RequestQSToInstallIntegration( + testAvailableIntegration.Id, map[string]interface{}{}, + ) + + testIntegration := integrationsTB.GetIntegrationDetailsFromQS(testAvailableIntegration.Id) + require.NotNil(testIntegration.Installation) + testIntegrationPipelines := testIntegration.Assets.Logs.Pipelines + require.Greater( + len(testIntegrationPipelines), 0, + "test integration expected to have a pipeline", + ) + + getPipelinesResp = pipelinesTB.GetPipelinesFromQS() + require.Equal( + len(testIntegrationPipelines), len(getPipelinesResp.Pipelines), + "Pipelines for installed integrations should appear in pipelines list", + ) + lastPipeline := getPipelinesResp.Pipelines[len(getPipelinesResp.Pipelines)-1] + require.NotNil(integrations.IntegrationIdForPipeline(lastPipeline)) + require.Equal(testIntegration.Id, *integrations.IntegrationIdForPipeline(lastPipeline)) + + pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines) + pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines) + + // After saving a user created pipeline, pipelines response should include + // both user created pipelines and pipelines for installed integrations. + postablePipelines := logparsingpipeline.PostablePipelines{ + Pipelines: []logparsingpipeline.PostablePipeline{ + { + OrderId: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + Filter: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "GET", + }, + }, + }, + Config: []logparsingpipeline.PipelineOperator{ + { + OrderId: 1, + ID: "add", + Type: "add", + Field: "attributes.test", + Value: "val", + Enabled: true, + Name: "test add", + }, + }, + }, + }, + } + + pipelinesTB.PostPipelinesToQS(postablePipelines) + + getPipelinesResp = pipelinesTB.GetPipelinesFromQS() + require.Equal(1+len(testIntegrationPipelines), len(getPipelinesResp.Pipelines)) + pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines) + pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines) + + // Reordering integration pipelines should be possible. + postable := postableFromPipelines(getPipelinesResp.Pipelines) + slices.Reverse(postable.Pipelines) + for i := range postable.Pipelines { + postable.Pipelines[i].OrderId = i + 1 + } + + pipelinesTB.PostPipelinesToQS(postable) + + getPipelinesResp = pipelinesTB.GetPipelinesFromQS() + firstPipeline := getPipelinesResp.Pipelines[0] + require.NotNil(integrations.IntegrationIdForPipeline(firstPipeline)) + require.Equal(testIntegration.Id, *integrations.IntegrationIdForPipeline(firstPipeline)) + + pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines) + pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines) + + // enabling/disabling integration pipelines should be possible. + require.True(firstPipeline.Enabled) + + postable.Pipelines[0].Enabled = false + pipelinesTB.PostPipelinesToQS(postable) + + getPipelinesResp = pipelinesTB.GetPipelinesFromQS() + require.Equal(1+len(testIntegrationPipelines), len(getPipelinesResp.Pipelines)) + + firstPipeline = getPipelinesResp.Pipelines[0] + require.NotNil(integrations.IntegrationIdForPipeline(firstPipeline)) + require.Equal(testIntegration.Id, *integrations.IntegrationIdForPipeline(firstPipeline)) + + require.False(firstPipeline.Enabled) + + pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines) + pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines) + + // should not be able to edit integrations pipeline. + require.Greater(len(postable.Pipelines[0].Config), 0) + postable.Pipelines[0].Config = []logparsingpipeline.PipelineOperator{} + pipelinesTB.PostPipelinesToQS(postable) + + getPipelinesResp = pipelinesTB.GetPipelinesFromQS() + require.Equal(1+len(testIntegrationPipelines), len(getPipelinesResp.Pipelines)) + + firstPipeline = getPipelinesResp.Pipelines[0] + require.NotNil(integrations.IntegrationIdForPipeline(firstPipeline)) + require.Equal(testIntegration.Id, *integrations.IntegrationIdForPipeline(firstPipeline)) + + require.False(firstPipeline.Enabled) + require.Greater(len(firstPipeline.Config), 0) + + // should not be able to delete integrations pipeline + postable.Pipelines = []logparsingpipeline.PostablePipeline{postable.Pipelines[1]} + pipelinesTB.PostPipelinesToQS(postable) + + getPipelinesResp = pipelinesTB.GetPipelinesFromQS() + require.Equal(1+len(testIntegrationPipelines), len(getPipelinesResp.Pipelines)) + + lastPipeline = getPipelinesResp.Pipelines[1] + require.NotNil(integrations.IntegrationIdForPipeline(lastPipeline)) + require.Equal(testIntegration.Id, *integrations.IntegrationIdForPipeline(lastPipeline)) + + // Uninstalling an integration should remove its pipelines + // from pipelines list in the UI + integrationsTB.RequestQSToUninstallIntegration( + testIntegration.Id, + ) + getPipelinesResp = pipelinesTB.GetPipelinesFromQS() + require.Equal( + 1, len(getPipelinesResp.Pipelines), + "Pipelines for uninstalled integrations should get removed from pipelines list", + ) + pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines) + pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines) +} + type IntegrationsTestBed struct { t *testing.T testUser *model.User @@ -125,7 +308,7 @@ func (tb *IntegrationsTestBed) GetInstalledIntegrationsFromQS() *integrations.In var integrationsResp integrations.IntegrationsListResponse err = json.Unmarshal(dataJson, &integrationsResp) if err != nil { - tb.t.Fatalf("could not unmarshal apiResponse.Data json into PipelinesResponse") + tb.t.Fatalf(" could not unmarshal apiResponse.Data json into PipelinesResponse") } return &integrationsResp @@ -232,11 +415,11 @@ func (tb *IntegrationsTestBed) mockLogQueryResponse(logsInResponse []model.Signo addLogsQueryExpectation(tb.mockClickhouse, logsInResponse) } -func NewIntegrationsTestBed(t *testing.T) *IntegrationsTestBed { - testDB, testDBFilePath := integrations.NewTestSqliteDB(t) - - // TODO(Raj): This should not require passing in the DB file path - dao.InitDao("sqlite", testDBFilePath) +// testDB can be injected for sharing a DB across multiple integration testbeds. +func NewIntegrationsTestBed(t *testing.T, testDB *sqlx.DB) *IntegrationsTestBed { + if testDB == nil { + testDB = utils.NewQueryServiceDBForTests(t) + } controller, err := integrations.NewController(testDB) if err != nil { @@ -272,3 +455,30 @@ func NewIntegrationsTestBed(t *testing.T) *IntegrationsTestBed { mockClickhouse: mockClickhouse, } } + +func postableFromPipelines(pipelines []logparsingpipeline.Pipeline) logparsingpipeline.PostablePipelines { + result := logparsingpipeline.PostablePipelines{} + + for _, p := range pipelines { + postable := logparsingpipeline.PostablePipeline{ + Id: p.Id, + OrderId: p.OrderId, + Name: p.Name, + Alias: p.Alias, + Enabled: p.Enabled, + Config: p.Config, + } + + if p.Description != nil { + postable.Description = *p.Description + } + + if p.Filter != nil { + postable.Filter = p.Filter + } + + result.Pipelines = append(result.Pipelines, postable) + } + + return result +} diff --git a/pkg/query-service/tests/integration/test_utils.go b/pkg/query-service/tests/integration/test_utils.go index 1dadec02bc..ac6e1db7c5 100644 --- a/pkg/query-service/tests/integration/test_utils.go +++ b/pkg/query-service/tests/integration/test_utils.go @@ -1,7 +1,12 @@ package tests import ( + "bytes" + "context" + "encoding/json" "fmt" + "net/http" + "net/http/httptest" "testing" "time" @@ -12,6 +17,9 @@ import ( mockhouse "github.com/srikanthccv/ClickHouse-go-mock" "github.com/stretchr/testify/require" "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader" + "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/constants" + "go.signoz.io/signoz/pkg/query-service/dao" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" "golang.org/x/exp/maps" @@ -131,3 +139,62 @@ func makeTestSignozLog( return testLog } + +func createTestUser() (*model.User, *model.ApiError) { + // Create a test user for auth + ctx := context.Background() + org, apiErr := dao.DB().CreateOrg(ctx, &model.Organization{ + Name: "test", + }) + if apiErr != nil { + return nil, apiErr + } + + group, apiErr := dao.DB().GetGroupByName(ctx, constants.AdminGroup) + if apiErr != nil { + return nil, apiErr + } + + auth.InitAuthCache(ctx) + + userId := uuid.NewString() + return dao.DB().CreateUser( + ctx, + &model.User{ + Id: userId, + Name: "test", + Email: userId[:8] + "test@test.com", + Password: "test", + OrgId: org.Id, + GroupId: group.Id, + }, + true, + ) +} + +func NewAuthenticatedTestRequest( + user *model.User, + path string, + postData interface{}, +) (*http.Request, error) { + userJwt, err := auth.GenerateJWTForUser(user) + if err != nil { + return nil, err + } + + var req *http.Request + + if postData != nil { + var body bytes.Buffer + err = json.NewEncoder(&body).Encode(postData) + if err != nil { + return nil, err + } + req = httptest.NewRequest(http.MethodPost, path, &body) + } else { + req = httptest.NewRequest(http.MethodGet, path, nil) + } + + req.Header.Add("Authorization", "Bearer "+userJwt.AccessJwt) + return req, nil +} diff --git a/pkg/query-service/utils/slices.go b/pkg/query-service/utils/slices.go new file mode 100644 index 0000000000..c196529a6a --- /dev/null +++ b/pkg/query-service/utils/slices.go @@ -0,0 +1,29 @@ +package utils + +// Map as in map-reduce. +func MapSlice[Slice ~[]Elem, Elem any, Output any]( + slice Slice, mapper func(Elem) Output, +) []Output { + result := []Output{} + + for _, item := range slice { + mapped := mapper(item) + result = append(result, mapped) + } + + return result +} + +func FilterSlice[Slice ~[]Elem, Elem any]( + slice Slice, filterFn func(Elem) bool, +) Slice { + result := Slice{} + + for _, item := range slice { + if filterFn(item) { + result = append(result, item) + } + } + + return result +} diff --git a/pkg/query-service/utils/testutils.go b/pkg/query-service/utils/testutils.go new file mode 100644 index 0000000000..33fd76dafa --- /dev/null +++ b/pkg/query-service/utils/testutils.go @@ -0,0 +1,29 @@ +package utils + +import ( + "os" + "testing" + + "github.com/jmoiron/sqlx" + "go.signoz.io/signoz/pkg/query-service/dao" +) + +func NewQueryServiceDBForTests(t *testing.T) *sqlx.DB { + testDBFile, err := os.CreateTemp("", "test-signoz-db-*") + if err != nil { + t.Fatalf("could not create temp file for test db: %v", err) + } + testDBFilePath := testDBFile.Name() + t.Cleanup(func() { os.Remove(testDBFilePath) }) + testDBFile.Close() + + testDB, err := sqlx.Open("sqlite3", testDBFilePath) + if err != nil { + t.Fatalf("could not open test db sqlite file: %v", err) + } + + // TODO(Raj): This should not require passing in the DB file path + dao.InitDao("sqlite", testDBFilePath) + + return testDB +}