feat(sqlmigrator): add sqlmigrator package (#6836)

### Summary

- add sqlmigrator package
This commit is contained in:
Vibhu Pandey 2025-01-17 16:52:55 +05:30 committed by GitHub
parent c574adc634
commit 268f283785
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 973 additions and 0 deletions

View File

@ -0,0 +1,32 @@
package sqlmigrator
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
)
type noopMigration struct{}
func NoopMigrationFactory() factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("noop"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return &noopMigration{}, nil
})
}
func (migration *noopMigration) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *noopMigration) Up(ctx context.Context, db *bun.DB) error {
return nil
}
func (migration *noopMigration) Down(ctx context.Context, db *bun.DB) error {
return nil
}

41
pkg/sqlmigrator/config.go Normal file
View File

@ -0,0 +1,41 @@
package sqlmigrator
import (
"errors"
"time"
"go.signoz.io/signoz/pkg/factory"
)
type Config struct {
// Lock is the lock configuration.
Lock Lock `mapstructure:"lock"`
}
type Lock struct {
// Timeout is the time to wait for the migration lock.
Timeout time.Duration `mapstructure:"timeout"`
// Interval is the interval to try to acquire the migration lock.
Interval time.Duration `mapstructure:"interval"`
}
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("sqlmigrator"), newConfig)
}
func newConfig() factory.Config {
return Config{
Lock: Lock{
Timeout: 2 * time.Minute,
Interval: 10 * time.Second,
},
}
}
func (c Config) Validate() error {
if c.Lock.Timeout < c.Lock.Interval {
return errors.New("lock_timeout must be greater than lock_interval")
}
return nil
}

View File

@ -0,0 +1,77 @@
package sqlmigrator
import (
"context"
"database/sql"
"errors"
"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
)
var (
ErrNoExecute = errors.New("no execute")
)
func NewMigrations(
ctx context.Context,
settings factory.ProviderSettings,
config Config,
factories factory.NamedMap[factory.ProviderFactory[SQLMigration, Config]],
) (*migrate.Migrations, error) {
migrations := migrate.NewMigrations()
for _, factory := range factories.GetInOrder() {
migration, err := factory.New(ctx, settings, config)
if err != nil {
return nil, err
}
err = migration.Register(migrations)
if err != nil {
return nil, err
}
}
return migrations, nil
}
func MustNewMigrations(
ctx context.Context,
settings factory.ProviderSettings,
config Config,
factories factory.NamedMap[factory.ProviderFactory[SQLMigration, Config]],
) *migrate.Migrations {
migrations, err := NewMigrations(ctx, settings, config, factories)
if err != nil {
panic(err)
}
return migrations
}
func WrapIfNotExists(ctx context.Context, db *bun.DB, table string, column string) func(q *bun.AddColumnQuery) *bun.AddColumnQuery {
return func(q *bun.AddColumnQuery) *bun.AddColumnQuery {
if db.Dialect().Name() != dialect.SQLite {
return q.IfNotExists()
}
var result string
err := db.
NewSelect().
ColumnExpr("name").
Table("pragma_table_info").
Where("arg = ?", table).
Where("name = ?", column).
Scan(ctx, &result)
if err != nil {
if err == sql.ErrNoRows {
return q
}
return q.Err(err)
}
return q.Err(ErrNoExecute)
}
}

View File

@ -0,0 +1,45 @@
package migration
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlmigrator"
)
type addDataMigrations struct{}
func NewAddDataMigrationsFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] {
return factory.NewProviderFactory(factory.MustNewName("add_data_migrations"), newAddDataMigrations)
}
func newAddDataMigrations(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) {
return &addDataMigrations{}, nil
}
func (migration *addDataMigrations) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addDataMigrations) Up(ctx context.Context, db *bun.DB) error {
// table:data_migrations
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS data_migrations (
id SERIAL PRIMARY KEY,
version VARCHAR(255) NOT NULL UNIQUE,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
succeeded BOOLEAN NOT NULL DEFAULT FALSE
);`); err != nil {
return err
}
return nil
}
func (migration *addDataMigrations) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@ -0,0 +1,125 @@
package migration
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlmigrator"
)
type addOrganization struct{}
func NewAddOrganizationFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] {
return factory.NewProviderFactory(factory.MustNewName("add_organization"), newAddOrganization)
}
func newAddOrganization(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) {
return &addOrganization{}, nil
}
func (migration *addOrganization) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addOrganization) Up(ctx context.Context, db *bun.DB) error {
// table:invites
if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS invites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
token TEXT NOT NULL,
created_at INTEGER NOT NULL,
role TEXT NOT NULL,
org_id TEXT NOT NULL,
FOREIGN KEY(org_id) REFERENCES organizations(id)
)`).Exec(ctx); err != nil {
return err
}
// table:organizations
if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS organizations (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
created_at INTEGER NOT NULL,
is_anonymous INTEGER NOT NULL DEFAULT 0 CHECK(is_anonymous IN (0,1)),
has_opted_updates INTEGER NOT NULL DEFAULT 1 CHECK(has_opted_updates IN (0,1))
)`).Exec(ctx); err != nil {
return err
}
// table:users
if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
password TEXT NOT NULL,
created_at INTEGER NOT NULL,
profile_picture_url TEXT,
group_id TEXT NOT NULL,
org_id TEXT NOT NULL,
FOREIGN KEY(group_id) REFERENCES groups(id),
FOREIGN KEY(org_id) REFERENCES organizations(id)
)`).Exec(ctx); err != nil {
return err
}
// table:groups
if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS groups (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE
)`).Exec(ctx); err != nil {
return err
}
// table:reset_password_request
if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS reset_password_request (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
token TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(id)
)`).Exec(ctx); err != nil {
return err
}
// table:user_flags
if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS user_flags (
user_id TEXT PRIMARY KEY,
flags TEXT,
FOREIGN KEY(user_id) REFERENCES users(id)
)`).Exec(ctx); err != nil {
return err
}
// table:apdex_settings
if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS apdex_settings (
service_name TEXT PRIMARY KEY,
threshold FLOAT NOT NULL,
exclude_status_codes TEXT NOT NULL
)`).Exec(ctx); err != nil {
return err
}
// table:ingestion_keys
if _, err := db.NewRaw(`CREATE TABLE IF NOT EXISTS ingestion_keys (
key_id TEXT PRIMARY KEY,
name TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ingestion_key TEXT NOT NULL,
ingestion_url TEXT NOT NULL,
data_region TEXT NOT NULL
)`).Exec(ctx); err != nil {
return err
}
return nil
}
func (migration *addOrganization) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@ -0,0 +1,58 @@
package migration
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlmigrator"
)
type addPreferences struct{}
func NewAddPreferencesFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] {
return factory.NewProviderFactory(factory.MustNewName("add_preferences"), newAddPreferences)
}
func newAddPreferences(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) {
return &addPreferences{}, nil
}
func (migration *addPreferences) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addPreferences) Up(ctx context.Context, db *bun.DB) error {
// table:user_preference
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS user_preference (
preference_id TEXT NOT NULL,
preference_value TEXT,
user_id TEXT NOT NULL,
PRIMARY KEY (preference_id,user_id),
FOREIGN KEY (user_id) REFERENCES users(id) ON UPDATE CASCADE ON DELETE CASCADE
)`); err != nil {
return err
}
// table:org_preference
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS org_preference (
preference_id TEXT NOT NULL,
preference_value TEXT,
org_id TEXT NOT NULL,
PRIMARY KEY (preference_id,org_id),
FOREIGN KEY (org_id) REFERENCES organizations(id) ON UPDATE CASCADE ON DELETE CASCADE
);`); err != nil {
return err
}
return nil
}
func (migration *addPreferences) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@ -0,0 +1,159 @@
package migration
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlmigrator"
)
type addDashboards struct{}
func NewAddDashboardsFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] {
return factory.NewProviderFactory(factory.MustNewName("add_dashboards"), newAddDashboards)
}
func newAddDashboards(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) {
return &addDashboards{}, nil
}
func (migration *addDashboards) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addDashboards) Up(ctx context.Context, db *bun.DB) error {
// table:dashboards
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS dashboards (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT NOT NULL UNIQUE,
created_at datetime NOT NULL,
updated_at datetime NOT NULL,
data TEXT NOT NULL
);`); err != nil {
return err
}
// table:rules
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS rules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
updated_at datetime NOT NULL,
deleted INTEGER DEFAULT 0,
data TEXT NOT NULL
);`); err != nil {
return err
}
// table:notification_channels
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS notification_channels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at datetime NOT NULL,
updated_at datetime NOT NULL,
name TEXT NOT NULL UNIQUE,
type TEXT NOT NULL,
deleted INTEGER DEFAULT 0,
data TEXT NOT NULL
);`); err != nil {
return err
}
// table:planned_maintenance
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS planned_maintenance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
alert_ids TEXT,
schedule TEXT NOT NULL,
created_at datetime NOT NULL,
created_by TEXT NOT NULL,
updated_at datetime NOT NULL,
updated_by TEXT NOT NULL
);`); err != nil {
return err
}
// table:ttl_status
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS ttl_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transaction_id TEXT NOT NULL,
created_at datetime NOT NULL,
updated_at datetime NOT NULL,
table_name TEXT NOT NULL,
ttl INTEGER DEFAULT 0,
cold_storage_ttl INTEGER DEFAULT 0,
status TEXT NOT NULL
);`); err != nil {
return err
}
// table:rules op:add column created_at
if _, err := db.
NewAddColumn().
Table("rules").
ColumnExpr("created_at datetime").
Apply(sqlmigrator.WrapIfNotExists(ctx, db, "rules", "created_at")).
Exec(ctx); err != nil && err != sqlmigrator.ErrNoExecute {
return err
}
// table:rules op:add column created_by
if _, err := db.
NewAddColumn().
Table("rules").
ColumnExpr("created_by TEXT").
Apply(sqlmigrator.WrapIfNotExists(ctx, db, "rules", "created_by")).
Exec(ctx); err != nil && err != sqlmigrator.ErrNoExecute {
return err
}
// table:rules op:add column updated_by
if _, err := db.
NewAddColumn().
Table("rules").
ColumnExpr("updated_by TEXT").
Apply(sqlmigrator.WrapIfNotExists(ctx, db, "rules", "updated_by")).
Exec(ctx); err != nil && err != sqlmigrator.ErrNoExecute {
return err
}
// table:dashboards op:add column created_by
if _, err := db.
NewAddColumn().
Table("dashboards").
ColumnExpr("created_by TEXT").
Apply(sqlmigrator.WrapIfNotExists(ctx, db, "dashboards", "created_by")).
Exec(ctx); err != nil && err != sqlmigrator.ErrNoExecute {
return err
}
// table:dashboards op:add column updated_by
if _, err := db.
NewAddColumn().
Table("dashboards").
ColumnExpr("updated_by TEXT").
Apply(sqlmigrator.WrapIfNotExists(ctx, db, "dashboards", "updated_by")).
Exec(ctx); err != nil && err != sqlmigrator.ErrNoExecute {
return err
}
// table:dashboards op:add column locked
if _, err := db.
NewAddColumn().
Table("dashboards").
ColumnExpr("locked INTEGER DEFAULT 0").
Apply(sqlmigrator.WrapIfNotExists(ctx, db, "dashboards", "locked")).
Exec(ctx); err != nil && err != sqlmigrator.ErrNoExecute {
return err
}
return nil
}
func (migration *addDashboards) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@ -0,0 +1,53 @@
package migration
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlmigrator"
)
type addSavedViews struct{}
func NewAddSavedViewsFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] {
return factory.NewProviderFactory(factory.MustNewName("add_saved_views"), newAddSavedViews)
}
func newAddSavedViews(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) {
return &addSavedViews{}, nil
}
func (migration *addSavedViews) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addSavedViews) Up(ctx context.Context, db *bun.DB) error {
// table:saved_views op:create
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS saved_views (
uuid TEXT PRIMARY KEY,
name TEXT NOT NULL,
category TEXT NOT NULL,
created_at datetime NOT NULL,
created_by TEXT,
updated_at datetime NOT NULL,
updated_by TEXT,
source_page TEXT NOT NULL,
tags TEXT,
data TEXT NOT NULL,
extra_data TEXT
);`); err != nil {
return err
}
return nil
}
func (migration *addSavedViews) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@ -0,0 +1,92 @@
package migration
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlmigrator"
)
type addAgents struct{}
func NewAddAgentsFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] {
return factory.NewProviderFactory(factory.MustNewName("add_agents"), newAddAgents)
}
func newAddAgents(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) {
return &addAgents{}, nil
}
func (migration *addAgents) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addAgents) Up(ctx context.Context, db *bun.DB) error {
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS agents (
agent_id TEXT PRIMARY KEY UNIQUE,
started_at datetime NOT NULL,
terminated_at datetime,
current_status TEXT NOT NULL,
effective_config TEXT NOT NULL
);`); err != nil {
return err
}
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS agent_config_versions(
id TEXT PRIMARY KEY,
created_by TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_by TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
version INTEGER DEFAULT 1,
active int,
is_valid int,
disabled int,
element_type VARCHAR(120) NOT NULL,
deploy_status VARCHAR(80) NOT NULL DEFAULT 'DIRTY',
deploy_sequence INTEGER,
deploy_result TEXT,
last_hash TEXT,
last_config TEXT,
UNIQUE(element_type, version)
);`); err != nil {
return err
}
if _, err := db.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS agent_config_versions_u1 ON agent_config_versions(element_type, version);`); err != nil {
return err
}
if _, err := db.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS agent_config_versions_nu1 ON agent_config_versions(last_hash);`); err != nil {
return err
}
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS agent_config_elements(
id TEXT PRIMARY KEY,
created_by TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_by TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
element_id TEXT NOT NULL,
element_type VARCHAR(120) NOT NULL,
version_id TEXT NOT NULL
);`); err != nil {
return err
}
if _, err := db.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS agent_config_elements_u1 ON agent_config_elements(version_id, element_id, element_type);`); err != nil {
return err
}
return nil
}
func (migration *addAgents) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@ -0,0 +1,51 @@
package migration
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlmigrator"
)
type addPipelines struct{}
func NewAddPipelinesFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] {
return factory.NewProviderFactory(factory.MustNewName("add_pipelines"), newAddPipelines)
}
func newAddPipelines(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) {
return &addPipelines{}, nil
}
func (migration *addPipelines) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addPipelines) Up(ctx context.Context, db *bun.DB) error {
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS pipelines(
id TEXT PRIMARY KEY,
order_id INTEGER,
enabled BOOLEAN,
created_by TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
name VARCHAR(400) NOT NULL,
alias VARCHAR(20) NOT NULL,
description TEXT,
filter TEXT NOT NULL,
config_json TEXT
);`); err != nil {
return err
}
return nil
}
func (migration *addPipelines) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@ -0,0 +1,57 @@
package migration
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlmigrator"
)
type addIntegrations struct{}
func NewAddIntegrationsFactory() factory.ProviderFactory[sqlmigrator.SQLMigration, sqlmigrator.Config] {
return factory.NewProviderFactory(factory.MustNewName("add_integrations"), newAddIntegrations)
}
func newAddIntegrations(_ context.Context, _ factory.ProviderSettings, _ sqlmigrator.Config) (sqlmigrator.SQLMigration, error) {
return &addIntegrations{}, nil
}
func (migration *addIntegrations) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addIntegrations) Up(ctx context.Context, db *bun.DB) error {
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS integrations_installed(
integration_id TEXT PRIMARY KEY,
config_json TEXT,
installed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);`); err != nil {
return err
}
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS cloud_integrations_accounts(
cloud_provider TEXT NOT NULL,
id TEXT NOT NULL,
config_json TEXT,
cloud_account_id TEXT,
last_agent_report_json TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
removed_at TIMESTAMP,
UNIQUE(cloud_provider, id)
)`); err != nil {
return err
}
return nil
}
func (migration *addIntegrations) Down(ctx context.Context, db *bun.DB) error {
return nil
}

108
pkg/sqlmigrator/migrator.go Normal file
View File

@ -0,0 +1,108 @@
package sqlmigrator
import (
"context"
"errors"
"time"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlstore"
"go.uber.org/zap"
)
var (
migrationTableName string = "migration"
migrationLockTableName string = "migration_lock"
)
type migrator struct {
settings factory.ScopedProviderSettings
config Config
migrator *migrate.Migrator
dialect string
}
func New(ctx context.Context, providerSettings factory.ProviderSettings, sqlstore sqlstore.SQLStore, migrations *migrate.Migrations, config Config) SQLMigrator {
return &migrator{
migrator: migrate.NewMigrator(sqlstore.BunDB(), migrations, migrate.WithTableName(migrationTableName), migrate.WithLocksTableName(migrationLockTableName)),
settings: factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/sqlmigrator"),
config: config,
dialect: sqlstore.BunDB().Dialect().Name().String(),
}
}
func (migrator *migrator) Migrate(ctx context.Context) error {
migrator.settings.ZapLogger().Info("starting sqlstore migrations", zap.String("dialect", migrator.dialect))
if err := migrator.migrator.Init(ctx); err != nil {
return err
}
if err := migrator.Lock(ctx); err != nil {
return err
}
defer migrator.migrator.Unlock(ctx) //nolint:errcheck
group, err := migrator.migrator.Migrate(ctx)
if err != nil {
return err
}
if group.IsZero() {
migrator.settings.ZapLogger().Info("no new migrations to run (database is up to date)", zap.String("dialect", migrator.dialect))
return nil
}
migrator.settings.ZapLogger().Info("migrated to", zap.String("group", group.String()), zap.String("dialect", migrator.dialect))
return nil
}
func (migrator *migrator) Rollback(ctx context.Context) error {
if err := migrator.Lock(ctx); err != nil {
return err
}
defer migrator.migrator.Unlock(ctx) //nolint:errcheck
group, err := migrator.migrator.Rollback(ctx)
if err != nil {
return err
}
if group.IsZero() {
migrator.settings.ZapLogger().Info("no groups to roll back", zap.String("dialect", migrator.dialect))
return nil
}
migrator.settings.ZapLogger().Info("rolled back", zap.String("group", group.String()), zap.String("dialect", migrator.dialect))
return nil
}
func (migrator *migrator) Lock(ctx context.Context) error {
if err := migrator.migrator.Lock(ctx); err == nil {
migrator.settings.ZapLogger().Info("acquired migration lock", zap.String("dialect", migrator.dialect))
return nil
}
timer := time.NewTimer(migrator.config.Lock.Timeout)
defer timer.Stop()
ticker := time.NewTicker(migrator.config.Lock.Interval)
defer ticker.Stop()
for {
select {
case <-timer.C:
err := errors.New("timed out waiting for lock")
migrator.settings.ZapLogger().Error("cannot acquire lock", zap.Error(err), zap.Duration("lock_timeout", migrator.config.Lock.Timeout), zap.String("dialect", migrator.dialect))
return err
case <-ticker.C:
if err := migrator.migrator.Lock(ctx); err == nil {
migrator.settings.ZapLogger().Info("acquired migration lock", zap.String("dialect", migrator.dialect))
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}

View File

@ -0,0 +1,48 @@
package sqlmigrator
import (
"context"
"database/sql/driver"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/instrumentation/instrumentationtest"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/sqlstore/sqlstoretest"
)
func TestMigratorWithSqliteAndNoopMigration(t *testing.T) {
ctx := context.Background()
sqlstoreConfig := sqlstore.Config{
Provider: "sqlite",
}
migrationConfig := Config{
Lock: Lock{
Timeout: 10 * time.Second,
Interval: 1 * time.Second,
},
}
providerSettings := instrumentationtest.New().ToProviderSettings()
sqlstore := sqlstoretest.New(sqlstoreConfig, sqlmock.QueryMatcherRegexp)
migrator := New(
ctx,
providerSettings,
sqlstore,
MustNewMigrations(ctx, providerSettings, migrationConfig, factory.MustNewNamedMap(NoopMigrationFactory())),
migrationConfig,
)
sqlstore.Mock().ExpectExec("CREATE TABLE IF NOT EXISTS migration (.+)").WillReturnResult(driver.ResultNoRows)
sqlstore.Mock().ExpectExec("CREATE TABLE IF NOT EXISTS migration_lock (.+)").WillReturnResult(driver.ResultNoRows)
sqlstore.Mock().ExpectQuery("INSERT INTO migration_lock (.+)").WillReturnRows(sqlstore.Mock().NewRows([]string{"id"}).AddRow(1))
sqlstore.Mock().ExpectQuery("(.+) FROM migration").WillReturnRows(sqlstore.Mock().NewRows([]string{"id"}).AddRow(1))
sqlstore.Mock().ExpectQuery("INSERT INTO migration (.+)").WillReturnRows(sqlstore.Mock().NewRows([]string{"id", "migrated_at"}).AddRow(1, time.Now()))
err := migrator.Migrate(ctx)
require.NoError(t, err)
}

View File

@ -0,0 +1,27 @@
package sqlmigrator
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
// SQLMigrator is the interface for the SQLMigrator.
type SQLMigrator interface {
// Migrate migrates the database. Migrate acquires a lock on the database and runs the migrations.
Migrate(context.Context) error
// Rollback rolls back the database. Rollback acquires a lock on the database and rolls back the migrations.
Rollback(context.Context) error
}
// SQLMigration is the interface for a single migration.
type SQLMigration interface {
// Register registers the migration with the given migrations. Each migration needs to be registered
//in a dedicated `*.go` file so that the correct migration semantics can be detected.
Register(*migrate.Migrations) error
// Up runs the migration.
Up(context.Context, *bun.DB) error
// Down rolls back the migration.
Down(context.Context, *bun.DB) error
}