diff --git a/pkg/sqlmigrator/000_noop.go b/pkg/sqlmigrator/000_noop.go new file mode 100644 index 0000000000..950b52c4b5 --- /dev/null +++ b/pkg/sqlmigrator/000_noop.go @@ -0,0 +1,32 @@ +package sqlmigrator + +import ( + "context" + + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" + "go.signoz.io/signoz/pkg/factory" +) + +type noopMigration struct{} + +func NoopMigrationFactory() factory.ProviderFactory[SQLMigration, Config] { + return factory.NewProviderFactory(factory.MustNewName("noop"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) { + return &noopMigration{}, nil + }) +} + +func (migration *noopMigration) Register(migrations *migrate.Migrations) error { + if err := migrations.Register(migration.Up, migration.Down); err != nil { + return err + } + return nil +} + +func (migration *noopMigration) Up(ctx context.Context, db *bun.DB) error { + return nil +} + +func (migration *noopMigration) Down(ctx context.Context, db *bun.DB) error { + return nil +} diff --git a/pkg/sqlmigrator/config.go b/pkg/sqlmigrator/config.go new file mode 100644 index 0000000000..3a0abd2fc9 --- /dev/null +++ b/pkg/sqlmigrator/config.go @@ -0,0 +1,41 @@ +package sqlmigrator + +import ( + "errors" + "time" + + "go.signoz.io/signoz/pkg/factory" +) + +type Config struct { + // Lock is the lock configuration. + Lock Lock `mapstructure:"lock"` +} + +type Lock struct { + // Timeout is the time to wait for the migration lock. + Timeout time.Duration `mapstructure:"timeout"` + // Interval is the interval to try to acquire the migration lock. + Interval time.Duration `mapstructure:"interval"` +} + +func NewConfigFactory() factory.ConfigFactory { + return factory.NewConfigFactory(factory.MustNewName("sqlmigrator"), newConfig) +} + +func newConfig() factory.Config { + return Config{ + Lock: Lock{ + Timeout: 2 * time.Minute, + Interval: 10 * time.Second, + }, + } +} + +func (c Config) Validate() error { + if c.Lock.Timeout < c.Lock.Interval { + return errors.New("lock_timeout must be greater than lock_interval") + } + + return nil +} diff --git a/pkg/sqlmigrator/migration.go b/pkg/sqlmigrator/migration.go new file mode 100644 index 0000000000..eeec2d1d89 --- /dev/null +++ b/pkg/sqlmigrator/migration.go @@ -0,0 +1,77 @@ +package sqlmigrator + +import ( + "context" + "database/sql" + "errors" + + "github.com/uptrace/bun" + "github.com/uptrace/bun/dialect" + "github.com/uptrace/bun/migrate" + "go.signoz.io/signoz/pkg/factory" +) + +var ( + ErrNoExecute = errors.New("no execute") +) + +func NewMigrations( + ctx context.Context, + settings factory.ProviderSettings, + config Config, + factories factory.NamedMap[factory.ProviderFactory[SQLMigration, Config]], +) (*migrate.Migrations, error) { + migrations := migrate.NewMigrations() + + for _, factory := range factories.GetInOrder() { + migration, err := factory.New(ctx, settings, config) + if err != nil { + return nil, err + } + + err = migration.Register(migrations) + if err != nil { + return nil, err + } + } + + return migrations, nil +} + +func MustNewMigrations( + ctx context.Context, + settings factory.ProviderSettings, + config Config, + factories factory.NamedMap[factory.ProviderFactory[SQLMigration, Config]], +) *migrate.Migrations { + migrations, err := NewMigrations(ctx, settings, config, factories) + if err != nil { + panic(err) + } + return migrations +} + +func WrapIfNotExists(ctx context.Context, db *bun.DB, table string, column string) func(q *bun.AddColumnQuery) *bun.AddColumnQuery { + return func(q *bun.AddColumnQuery) *bun.AddColumnQuery { + if db.Dialect().Name() != dialect.SQLite { + return q.IfNotExists() + } + + var result string + err := db. + NewSelect(). + ColumnExpr("name"). + Table("pragma_table_info"). + Where("arg = ?", table). + Where("name = ?", column). + Scan(ctx, &result) + if err != nil { + if err == sql.ErrNoRows { + return q + } + return q.Err(err) + } + + return q.Err(ErrNoExecute) + } +} diff --git a/pkg/sqlmigrator/migration/000_add_data_migrations.go b/pkg/sqlmigrator/migration/000_add_data_migrations.go new file mode 100644 index 0000000000..516cd7e6dc --- /dev/null +++ b/pkg/sqlmigrator/migration/000_add_data_migrations.go @@ -0,0 +1,45 @@ +package migration + +import ( + "context" + + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/sqlmigrator" +) + +type addDataMigrations struct{} + +func NewAddDataMigrationsFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] { + return factory.NewProviderFactory(factory.MustNewName("add_data_migrations"), newAddDataMigrations) +} + +func newAddDataMigrations(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) { + return &addDataMigrations{}, nil +} + +func (migration *addDataMigrations) Register(migrations *migrate.Migrations) error { + if err := migrations.Register(migration.Up, migration.Down); err != nil { + return err + } + return nil +} + +func (migration *addDataMigrations) Up(ctx context.Context, db *bun.DB) error { + // table:data_migrations + if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS data_migrations ( + id SERIAL PRIMARY KEY, + version VARCHAR(255) NOT NULL UNIQUE, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + succeeded BOOLEAN NOT NULL DEFAULT FALSE + );`); err != nil { + return err + } + + return nil +} + +func (migration *addDataMigrations) Down(ctx context.Context, db *bun.DB) error { + return nil +} diff --git a/pkg/sqlmigrator/migration/001_add_organization.go b/pkg/sqlmigrator/migration/001_add_organization.go new file mode 100644 index 0000000000..c26ea4dc60 --- /dev/null +++ b/pkg/sqlmigrator/migration/001_add_organization.go @@ -0,0 +1,125 @@ +package migration + +import ( + "context" + + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/sqlmigrator" +) + +type addOrganization struct{} + +func NewAddOrganizationFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] { + return factory.NewProviderFactory(factory.MustNewName("add_organization"), newAddOrganization) +} + +func newAddOrganization(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) { + return &addOrganization{}, nil +} + +func (migration *addOrganization) Register(migrations *migrate.Migrations) error { + if err := migrations.Register(migration.Up, migration.Down); err != nil { + return err + } + + return nil +} + +func (migration *addOrganization) Up(ctx context.Context, db *bun.DB) error { + // table:invites + if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS invites ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + email TEXT NOT NULL UNIQUE, + token TEXT NOT NULL, + created_at INTEGER NOT NULL, + role TEXT NOT NULL, + org_id TEXT NOT NULL, + FOREIGN KEY(org_id) REFERENCES organizations(id) + )`).Exec(ctx); err != nil { + return err + } + + // table:organizations + if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS organizations ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + created_at INTEGER NOT NULL, + is_anonymous INTEGER NOT NULL DEFAULT 0 CHECK(is_anonymous IN (0,1)), + has_opted_updates INTEGER NOT NULL DEFAULT 1 CHECK(has_opted_updates IN (0,1)) + )`).Exec(ctx); err != nil { + return err + } + + // table:users + if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS users ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + email TEXT NOT NULL UNIQUE, + password TEXT NOT NULL, + created_at INTEGER NOT NULL, + profile_picture_url TEXT, + group_id TEXT NOT NULL, + org_id TEXT NOT NULL, + FOREIGN KEY(group_id) REFERENCES groups(id), + FOREIGN KEY(org_id) REFERENCES organizations(id) + )`).Exec(ctx); err != nil { + return err + } + + // table:groups + if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS groups ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL UNIQUE + )`).Exec(ctx); err != nil { + return err + } + + // table:reset_password_request + if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS reset_password_request ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL, + token TEXT NOT NULL, + FOREIGN KEY(user_id) REFERENCES users(id) + )`).Exec(ctx); err != nil { + return err + } + + // table:user_flags + if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS user_flags ( + user_id TEXT PRIMARY KEY, + flags TEXT, + FOREIGN KEY(user_id) REFERENCES users(id) + )`).Exec(ctx); err != nil { + return err + } + + // table:apdex_settings + if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS apdex_settings ( + service_name TEXT PRIMARY KEY, + threshold FLOAT NOT NULL, + exclude_status_codes TEXT NOT NULL + )`).Exec(ctx); err != nil { + return err + } + + // table:ingestion_keys + if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS ingestion_keys ( + key_id TEXT PRIMARY KEY, + name TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + ingestion_key TEXT NOT NULL, + ingestion_url TEXT NOT NULL, + data_region TEXT NOT NULL + )`).Exec(ctx); err != nil { + return err + } + + return nil +} + +func (migration *addOrganization) Down(ctx context.Context, db *bun.DB) error { + return nil +} diff --git a/pkg/sqlmigrator/migration/002_add_preferences.go b/pkg/sqlmigrator/migration/002_add_preferences.go new file mode 100644 index 0000000000..23f7773991 --- /dev/null +++ b/pkg/sqlmigrator/migration/002_add_preferences.go @@ -0,0 +1,58 @@ +package migration + +import ( + "context" + + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/sqlmigrator" +) + +type addPreferences struct{} + +func NewAddPreferencesFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] { + return factory.NewProviderFactory(factory.MustNewName("add_preferences"), newAddPreferences) +} + +func newAddPreferences(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) { + return &addPreferences{}, nil +} + +func (migration *addPreferences) Register(migrations *migrate.Migrations) error { + if err := migrations.Register(migration.Up, migration.Down); err != nil { + return err + } + + return nil +} + +func (migration *addPreferences) Up(ctx context.Context, db *bun.DB) error { + // table:user_preference + if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS user_preference ( + preference_id TEXT NOT NULL, + preference_value TEXT, + user_id TEXT NOT NULL, + PRIMARY KEY (preference_id,user_id), + FOREIGN KEY (user_id) REFERENCES users(id) ON UPDATE CASCADE ON DELETE CASCADE + )`); err != nil { + return err + } + + // table:org_preference + if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS org_preference ( + preference_id TEXT NOT NULL, + preference_value TEXT, + org_id TEXT NOT NULL, + PRIMARY KEY (preference_id,org_id), + FOREIGN KEY (org_id) REFERENCES organizations(id) ON UPDATE CASCADE ON DELETE CASCADE + );`); err != nil { + return err + } + + return nil +} + +func (migration *addPreferences) Down(ctx context.Context, db *bun.DB) error { + return nil +} diff --git a/pkg/sqlmigrator/migration/003_add_dashboards.go b/pkg/sqlmigrator/migration/003_add_dashboards.go new file mode 100644 index 0000000000..65f45ae1e6 --- /dev/null +++ b/pkg/sqlmigrator/migration/003_add_dashboards.go @@ -0,0 +1,159 @@ +package migration + +import ( + "context" + + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/sqlmigrator" +) + +type addDashboards struct{} + +func NewAddDashboardsFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] { + return factory.NewProviderFactory(factory.MustNewName("add_dashboards"), newAddDashboards) +} + +func newAddDashboards(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) { + return &addDashboards{}, nil +} + +func (migration *addDashboards) Register(migrations *migrate.Migrations) error { + if err := migrations.Register(migration.Up, migration.Down); err != nil { + return err + } + + return nil +} + +func (migration *addDashboards) Up(ctx context.Context, db *bun.DB) error { + // table:dashboards + if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS dashboards ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + uuid TEXT NOT NULL UNIQUE, + created_at datetime NOT NULL, + updated_at datetime NOT NULL, + data TEXT NOT NULL + );`); err != nil { + return err + } + + // table:rules + if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS rules ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + updated_at datetime NOT NULL, + deleted INTEGER DEFAULT 0, + data TEXT NOT NULL + );`); err != nil { + return err + } + + // table:notification_channels + if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS notification_channels ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created_at datetime NOT NULL, + updated_at datetime NOT NULL, + name TEXT NOT NULL UNIQUE, + type TEXT NOT NULL, + deleted INTEGER DEFAULT 0, + data TEXT NOT NULL + );`); err != nil { + return err + } + + // table:planned_maintenance + if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS planned_maintenance ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + description TEXT, + alert_ids TEXT, + schedule TEXT NOT NULL, + created_at datetime NOT NULL, + created_by TEXT NOT NULL, + updated_at datetime NOT NULL, + updated_by TEXT NOT NULL + );`); err != nil { + return err + } + + // table:ttl_status + if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS ttl_status ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + transaction_id TEXT NOT NULL, + created_at datetime NOT NULL, + updated_at datetime NOT NULL, + table_name TEXT NOT NULL, + ttl INTEGER DEFAULT 0, + cold_storage_ttl INTEGER DEFAULT 0, + status TEXT NOT NULL + );`); err != nil { + return err + } + + // table:rules op:add column created_at + if _, err := db. + NewAddColumn(). + Table("rules"). + ColumnExpr("created_at datetime"). + Apply(sqlmigrator.WrapIfNotExists(ctx, db, "rules", "created_at")). + Exec(ctx); err != nil && err != sqlmigrator.ErrNoExecute { + return err + } + + // table:rules op:add column created_by + if _, err := db. + NewAddColumn(). + Table("rules"). + ColumnExpr("created_by TEXT"). + Apply(sqlmigrator.WrapIfNotExists(ctx, db, "rules", "created_by")). + Exec(ctx); err != nil && err != sqlmigrator.ErrNoExecute { + return err + } + + // table:rules op:add column updated_by + if _, err := db. + NewAddColumn(). + Table("rules"). + ColumnExpr("updated_by TEXT"). + Apply(sqlmigrator.WrapIfNotExists(ctx, db, "rules", "updated_by")). + Exec(ctx); err != nil && err != sqlmigrator.ErrNoExecute { + return err + } + + // table:dashboards op:add column created_by + if _, err := db. + NewAddColumn(). + Table("dashboards"). + ColumnExpr("created_by TEXT"). + Apply(sqlmigrator.WrapIfNotExists(ctx, db, "dashboards", "created_by")). + Exec(ctx); err != nil && err != sqlmigrator.ErrNoExecute { + return err + } + + // table:dashboards op:add column updated_by + if _, err := db. + NewAddColumn(). + Table("dashboards"). + ColumnExpr("updated_by TEXT"). + Apply(sqlmigrator.WrapIfNotExists(ctx, db, "dashboards", "updated_by")). + Exec(ctx); err != nil && err != sqlmigrator.ErrNoExecute { + return err + } + + // table:dashboards op:add column locked + if _, err := db. + NewAddColumn(). + Table("dashboards"). + ColumnExpr("locked INTEGER DEFAULT 0"). + Apply(sqlmigrator.WrapIfNotExists(ctx, db, "dashboards", "locked")). + Exec(ctx); err != nil && err != sqlmigrator.ErrNoExecute { + return err + } + + return nil +} + +func (migration *addDashboards) Down(ctx context.Context, db *bun.DB) error { + return nil +} diff --git a/pkg/sqlmigrator/migration/004_add_saved_views.go b/pkg/sqlmigrator/migration/004_add_saved_views.go new file mode 100644 index 0000000000..0f97be3f29 --- /dev/null +++ b/pkg/sqlmigrator/migration/004_add_saved_views.go @@ -0,0 +1,53 @@ +package migration + +import ( + "context" + + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/sqlmigrator" +) + +type addSavedViews struct{} + +func NewAddSavedViewsFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] { + return factory.NewProviderFactory(factory.MustNewName("add_saved_views"), newAddSavedViews) +} + +func newAddSavedViews(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) { + return &addSavedViews{}, nil +} + +func (migration *addSavedViews) Register(migrations *migrate.Migrations) error { + if err := migrations.Register(migration.Up, migration.Down); err != nil { + return err + } + + return nil +} + +func (migration *addSavedViews) Up(ctx context.Context, db *bun.DB) error { + // table:saved_views op:create + if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS saved_views ( + uuid TEXT PRIMARY KEY, + name TEXT NOT NULL, + category TEXT NOT NULL, + created_at datetime NOT NULL, + created_by TEXT, + updated_at datetime NOT NULL, + updated_by TEXT, + source_page TEXT NOT NULL, + tags TEXT, + data TEXT NOT NULL, + extra_data TEXT + );`); err != nil { + return err + } + + return nil +} + +func (migration *addSavedViews) Down(ctx context.Context, db *bun.DB) error { + return nil +} diff --git a/pkg/sqlmigrator/migration/005_add_agents.go b/pkg/sqlmigrator/migration/005_add_agents.go new file mode 100644 index 0000000000..ede646f63c --- /dev/null +++ b/pkg/sqlmigrator/migration/005_add_agents.go @@ -0,0 +1,92 @@ +package migration + +import ( + "context" + + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/sqlmigrator" +) + +type addAgents struct{} + +func NewAddAgentsFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] { + return factory.NewProviderFactory(factory.MustNewName("add_agents"), newAddAgents) +} + +func newAddAgents(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) { + return &addAgents{}, nil +} + +func (migration *addAgents) Register(migrations *migrate.Migrations) error { + if err := migrations.Register(migration.Up, migration.Down); err != nil { + return err + } + + return nil +} + +func (migration *addAgents) Up(ctx context.Context, db *bun.DB) error { + if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS agents ( + agent_id TEXT PRIMARY KEY UNIQUE, + started_at datetime NOT NULL, + terminated_at datetime, + current_status TEXT NOT NULL, + effective_config TEXT NOT NULL + );`); err != nil { + return err + } + + if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS agent_config_versions( + id TEXT PRIMARY KEY, + created_by TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_by TEXT, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + version INTEGER DEFAULT 1, + active int, + is_valid int, + disabled int, + element_type VARCHAR(120) NOT NULL, + deploy_status VARCHAR(80) NOT NULL DEFAULT 'DIRTY', + deploy_sequence INTEGER, + deploy_result TEXT, + last_hash TEXT, + last_config TEXT, + UNIQUE(element_type, version) + );`); err != nil { + return err + } + + if _, err := db.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS agent_config_versions_u1 ON agent_config_versions(element_type, version);`); err != nil { + return err + } + + if _, err := db.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS agent_config_versions_nu1 ON agent_config_versions(last_hash);`); err != nil { + return err + } + + if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS agent_config_elements( + id TEXT PRIMARY KEY, + created_by TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_by TEXT, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + element_id TEXT NOT NULL, + element_type VARCHAR(120) NOT NULL, + version_id TEXT NOT NULL + );`); err != nil { + return err + } + + if _, err := db.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS agent_config_elements_u1 ON agent_config_elements(version_id, element_id, element_type);`); err != nil { + return err + } + + return nil +} + +func (migration *addAgents) Down(ctx context.Context, db *bun.DB) error { + return nil +} diff --git a/pkg/sqlmigrator/migration/006_add_pipelines.go b/pkg/sqlmigrator/migration/006_add_pipelines.go new file mode 100644 index 0000000000..82fa19ac3d --- /dev/null +++ b/pkg/sqlmigrator/migration/006_add_pipelines.go @@ -0,0 +1,51 @@ +package migration + +import ( + "context" + + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/sqlmigrator" +) + +type addPipelines struct{} + +func NewAddPipelinesFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] { + return factory.NewProviderFactory(factory.MustNewName("add_pipelines"), newAddPipelines) +} + +func newAddPipelines(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) { + return &addPipelines{}, nil +} + +func (migration *addPipelines) Register(migrations *migrate.Migrations) error { + if err := migrations.Register(migration.Up, migration.Down); err != nil { + return err + } + + return nil +} + +func (migration *addPipelines) Up(ctx context.Context, db *bun.DB) error { + if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS pipelines( + id TEXT PRIMARY KEY, + order_id INTEGER, + enabled BOOLEAN, + created_by TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + name VARCHAR(400) NOT NULL, + alias VARCHAR(20) NOT NULL, + description TEXT, + filter TEXT NOT NULL, + config_json TEXT + );`); err != nil { + return err + } + + return nil +} + +func (migration *addPipelines) Down(ctx context.Context, db *bun.DB) error { + return nil +} diff --git a/pkg/sqlmigrator/migration/007_add_integrations.go b/pkg/sqlmigrator/migration/007_add_integrations.go new file mode 100644 index 0000000000..571a2d90fe --- /dev/null +++ b/pkg/sqlmigrator/migration/007_add_integrations.go @@ -0,0 +1,57 @@ +package migration + +import ( + "context" + + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/sqlmigrator" +) + +type addIntegrations struct{} + +func NewAddIntegrationsFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] { + return factory.NewProviderFactory(factory.MustNewName("add_integrations"), newAddIntegrations) +} + +func newAddIntegrations(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) { + return &addIntegrations{}, nil +} + +func (migration *addIntegrations) Register(migrations *migrate.Migrations) error { + if err := migrations.Register(migration.Up, migration.Down); err != nil { + return err + } + + return nil +} + +func (migration *addIntegrations) Up(ctx context.Context, db *bun.DB) error { + if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS integrations_installed( + integration_id TEXT PRIMARY KEY, + config_json TEXT, + installed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + );`); err != nil { + return err + } + + if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS cloud_integrations_accounts( + cloud_provider TEXT NOT NULL, + id TEXT NOT NULL, + config_json TEXT, + cloud_account_id TEXT, + last_agent_report_json TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, + removed_at TIMESTAMP, + UNIQUE(cloud_provider, id) + )`); err != nil { + return err + } + + return nil +} + +func (migration *addIntegrations) Down(ctx context.Context, db *bun.DB) error { + return nil +} diff --git a/pkg/sqlmigrator/migrator.go b/pkg/sqlmigrator/migrator.go new file mode 100644 index 0000000000..e98e2f8313 --- /dev/null +++ b/pkg/sqlmigrator/migrator.go @@ -0,0 +1,108 @@ +package sqlmigrator + +import ( + "context" + "errors" + "time" + + "github.com/uptrace/bun/migrate" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/sqlstore" + "go.uber.org/zap" +) + +var ( + migrationTableName string = "migration" + migrationLockTableName string = "migration_lock" +) + +type migrator struct { + settings factory.ScopedProviderSettings + config Config + migrator *migrate.Migrator + dialect string +} + +func New(ctx context.Context, providerSettings factory.ProviderSettings, sqlstore sqlstore.SQLStore, migrations *migrate.Migrations, config Config) SQLMigrator { + return &migrator{ + migrator: migrate.NewMigrator(sqlstore.BunDB(), migrations, migrate.WithTableName(migrationTableName), migrate.WithLocksTableName(migrationLockTableName)), + settings: factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/sqlmigrator"), + config: config, + dialect: sqlstore.BunDB().Dialect().Name().String(), + } +} + +func (migrator *migrator) Migrate(ctx context.Context) error { + migrator.settings.ZapLogger().Info("starting sqlstore migrations", zap.String("dialect", migrator.dialect)) + if err := migrator.migrator.Init(ctx); err != nil { + return err + } + + if err := migrator.Lock(ctx); err != nil { + return err + } + + defer migrator.migrator.Unlock(ctx) //nolint:errcheck + + group, err := migrator.migrator.Migrate(ctx) + if err != nil { + return err + } + + if group.IsZero() { + migrator.settings.ZapLogger().Info("no new migrations to run (database is up to date)", zap.String("dialect", migrator.dialect)) + return nil + } + + migrator.settings.ZapLogger().Info("migrated to", zap.String("group", group.String()), zap.String("dialect", migrator.dialect)) + return nil +} + +func (migrator *migrator) Rollback(ctx context.Context) error { + if err := migrator.Lock(ctx); err != nil { + return err + } + defer migrator.migrator.Unlock(ctx) //nolint:errcheck + + group, err := migrator.migrator.Rollback(ctx) + if err != nil { + return err + } + + if group.IsZero() { + migrator.settings.ZapLogger().Info("no groups to roll back", zap.String("dialect", migrator.dialect)) + return nil + } + + migrator.settings.ZapLogger().Info("rolled back", zap.String("group", group.String()), zap.String("dialect", migrator.dialect)) + return nil +} + +func (migrator *migrator) Lock(ctx context.Context) error { + if err := migrator.migrator.Lock(ctx); err == nil { + migrator.settings.ZapLogger().Info("acquired migration lock", zap.String("dialect", migrator.dialect)) + return nil + } + + timer := time.NewTimer(migrator.config.Lock.Timeout) + defer timer.Stop() + + ticker := time.NewTicker(migrator.config.Lock.Interval) + defer ticker.Stop() + + for { + select { + case <-timer.C: + err := errors.New("timed out waiting for lock") + migrator.settings.ZapLogger().Error("cannot acquire lock", zap.Error(err), zap.Duration("lock_timeout", migrator.config.Lock.Timeout), zap.String("dialect", migrator.dialect)) + return err + case <-ticker.C: + if err := migrator.migrator.Lock(ctx); err == nil { + migrator.settings.ZapLogger().Info("acquired migration lock", zap.String("dialect", migrator.dialect)) + return nil + } + case <-ctx.Done(): + return ctx.Err() + } + } +} diff --git a/pkg/sqlmigrator/migrator_test.go b/pkg/sqlmigrator/migrator_test.go new file mode 100644 index 0000000000..7449e3b00f --- /dev/null +++ b/pkg/sqlmigrator/migrator_test.go @@ -0,0 +1,48 @@ +package sqlmigrator + +import ( + "context" + "database/sql/driver" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/require" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/instrumentation/instrumentationtest" + "go.signoz.io/signoz/pkg/sqlstore" + "go.signoz.io/signoz/pkg/sqlstore/sqlstoretest" +) + +func TestMigratorWithSqliteAndNoopMigration(t *testing.T) { + ctx := context.Background() + sqlstoreConfig := sqlstore.Config{ + Provider: "sqlite", + } + + migrationConfig := Config{ + Lock: Lock{ + Timeout: 10 * time.Second, + Interval: 1 * time.Second, + }, + } + + providerSettings := instrumentationtest.New().ToProviderSettings() + sqlstore := sqlstoretest.New(sqlstoreConfig, sqlmock.QueryMatcherRegexp) + migrator := New( + ctx, + providerSettings, + sqlstore, + MustNewMigrations(ctx, providerSettings, migrationConfig, factory.MustNewNamedMap(NoopMigrationFactory())), + migrationConfig, + ) + + sqlstore.Mock().ExpectExec("CREATE TABLE IF NOT EXISTS migration (.+)").WillReturnResult(driver.ResultNoRows) + sqlstore.Mock().ExpectExec("CREATE TABLE IF NOT EXISTS migration_lock (.+)").WillReturnResult(driver.ResultNoRows) + sqlstore.Mock().ExpectQuery("INSERT INTO migration_lock (.+)").WillReturnRows(sqlstore.Mock().NewRows([]string{"id"}).AddRow(1)) + sqlstore.Mock().ExpectQuery("(.+) FROM migration").WillReturnRows(sqlstore.Mock().NewRows([]string{"id"}).AddRow(1)) + sqlstore.Mock().ExpectQuery("INSERT INTO migration (.+)").WillReturnRows(sqlstore.Mock().NewRows([]string{"id", "migrated_at"}).AddRow(1, time.Now())) + + err := migrator.Migrate(ctx) + require.NoError(t, err) +} diff --git a/pkg/sqlmigrator/sqlmigrator.go b/pkg/sqlmigrator/sqlmigrator.go new file mode 100644 index 0000000000..ce3f5e43c1 --- /dev/null +++ b/pkg/sqlmigrator/sqlmigrator.go @@ -0,0 +1,27 @@ +package sqlmigrator + +import ( + "context" + + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" +) + +// SQLMigrator is the interface for the SQLMigrator. +type SQLMigrator interface { + // Migrate migrates the database. Migrate acquires a lock on the database and runs the migrations. + Migrate(context.Context) error + // Rollback rolls back the database. Rollback acquires a lock on the database and rolls back the migrations. + Rollback(context.Context) error +} + +// SQLMigration is the interface for a single migration. +type SQLMigration interface { + // Register registers the migration with the given migrations. Each migration needs to be registered + //in a dedicated `*.go` file so that the correct migration semantics can be detected. + Register(*migrate.Migrations) error + // Up runs the migration. + Up(context.Context, *bun.DB) error + // Down rolls back the migration. + Down(context.Context, *bun.DB) error +}