mirror of
				https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
				synced 2025-10-23 06:01:07 +08:00 
			
		
		
		
	 5bceffbeaa
			
		
	
	
		5bceffbeaa
		
			
		
	
	
	
	
		
			
			* fix: fix modules and handler * fix: fix sqlmigration package * fix: fix other fmt issues * fix: fix tests * fix: fix tests
		
			
				
	
	
		
			447 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			447 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package sqlmigration
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"database/sql"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/SigNoz/signoz/pkg/factory"
 | |
| 	"github.com/SigNoz/signoz/pkg/sqlstore"
 | |
| 	"github.com/SigNoz/signoz/pkg/types"
 | |
| 	"github.com/SigNoz/signoz/pkg/valuer"
 | |
| 	"github.com/google/uuid"
 | |
| 	"github.com/uptrace/bun"
 | |
| 	"github.com/uptrace/bun/migrate"
 | |
| 	"go.uber.org/zap"
 | |
| )
 | |
| 
 | |
| type updateIntegrations struct {
 | |
| 	store sqlstore.SQLStore
 | |
| }
 | |
| 
 | |
| func NewUpdateIntegrationsFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
 | |
| 	return factory.NewProviderFactory(factory.MustNewName("update_integrations"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
 | |
| 		return newUpdateIntegrations(ctx, ps, c, sqlstore)
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func newUpdateIntegrations(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) {
 | |
| 	return &updateIntegrations{store: store}, nil
 | |
| }
 | |
| 
 | |
| func (migration *updateIntegrations) Register(migrations *migrate.Migrations) error {
 | |
| 	if err := migrations.Register(migration.Up, migration.Down); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| type existingInstalledIntegration struct {
 | |
| 	bun.BaseModel `bun:"table:integrations_installed"`
 | |
| 
 | |
| 	IntegrationID string    `bun:"integration_id,pk,type:text"`
 | |
| 	ConfigJSON    string    `bun:"config_json,type:text"`
 | |
| 	InstalledAt   time.Time `bun:"installed_at,default:current_timestamp"`
 | |
| }
 | |
| 
 | |
| type newInstalledIntegration struct {
 | |
| 	bun.BaseModel `bun:"table:installed_integration"`
 | |
| 
 | |
| 	types.Identifiable
 | |
| 	Type        string    `json:"type" bun:"type,type:text,unique:org_id_type"`
 | |
| 	Config      string    `json:"config" bun:"config,type:text"`
 | |
| 	InstalledAt time.Time `json:"installed_at" bun:"installed_at,default:current_timestamp"`
 | |
| 	OrgID       string    `json:"org_id" bun:"org_id,type:text,unique:org_id_type"`
 | |
| }
 | |
| 
 | |
| type existingCloudIntegration struct {
 | |
| 	bun.BaseModel `bun:"table:cloud_integrations_accounts"`
 | |
| 
 | |
| 	CloudProvider       string     `bun:"cloud_provider,type:text,unique:cloud_provider_id"`
 | |
| 	ID                  string     `bun:"id,type:text,notnull,unique:cloud_provider_id"`
 | |
| 	ConfigJSON          string     `bun:"config_json,type:text"`
 | |
| 	CloudAccountID      string     `bun:"cloud_account_id,type:text"`
 | |
| 	LastAgentReportJSON string     `bun:"last_agent_report_json,type:text"`
 | |
| 	CreatedAt           time.Time  `bun:"created_at,notnull,default:current_timestamp"`
 | |
| 	RemovedAt           *time.Time `bun:"removed_at,type:timestamp"`
 | |
| }
 | |
| 
 | |
| type newCloudIntegration struct {
 | |
| 	bun.BaseModel `bun:"table:cloud_integration"`
 | |
| 
 | |
| 	types.Identifiable
 | |
| 	types.TimeAuditable
 | |
| 	Provider        string     `json:"provider" bun:"provider,type:text"`
 | |
| 	Config          string     `json:"config" bun:"config,type:text"`
 | |
| 	AccountID       string     `json:"account_id" bun:"account_id,type:text"`
 | |
| 	LastAgentReport string     `json:"last_agent_report" bun:"last_agent_report,type:text"`
 | |
| 	RemovedAt       *time.Time `json:"removed_at" bun:"removed_at,type:timestamp"`
 | |
| 	OrgID           string     `json:"org_id" bun:"org_id,type:text"`
 | |
| }
 | |
| 
 | |
| type existingCloudIntegrationService struct {
 | |
| 	bun.BaseModel `bun:"table:cloud_integrations_service_configs,alias:c1"`
 | |
| 
 | |
| 	CloudProvider  string    `bun:"cloud_provider,type:text,notnull,unique:service_cloud_provider_account"`
 | |
| 	CloudAccountID string    `bun:"cloud_account_id,type:text,notnull,unique:service_cloud_provider_account"`
 | |
| 	ServiceID      string    `bun:"service_id,type:text,notnull,unique:service_cloud_provider_account"`
 | |
| 	ConfigJSON     string    `bun:"config_json,type:text"`
 | |
| 	CreatedAt      time.Time `bun:"created_at,default:current_timestamp"`
 | |
| }
 | |
| 
 | |
| type newCloudIntegrationService struct {
 | |
| 	bun.BaseModel `bun:"table:cloud_integration_service,alias:cis"`
 | |
| 
 | |
| 	types.Identifiable
 | |
| 	types.TimeAuditable
 | |
| 	Type               string `bun:"type,type:text,notnull,unique:cloud_integration_id_type"`
 | |
| 	Config             string `bun:"config,type:text"`
 | |
| 	CloudIntegrationID string `bun:"cloud_integration_id,type:text,notnull,unique:cloud_integration_id_type"`
 | |
| }
 | |
| 
 | |
| type StorablePersonalAccessToken struct {
 | |
| 	bun.BaseModel `bun:"table:personal_access_token"`
 | |
| 	types.Identifiable
 | |
| 	types.TimeAuditable
 | |
| 	OrgID           string `json:"orgId" bun:"org_id,type:text,notnull"`
 | |
| 	Role            string `json:"role" bun:"role,type:text,notnull,default:'ADMIN'"`
 | |
| 	UserID          string `json:"userId" bun:"user_id,type:text,notnull"`
 | |
| 	Token           string `json:"token" bun:"token,type:text,notnull,unique"`
 | |
| 	Name            string `json:"name" bun:"name,type:text,notnull"`
 | |
| 	ExpiresAt       int64  `json:"expiresAt" bun:"expires_at,notnull,default:0"`
 | |
| 	LastUsed        int64  `json:"lastUsed" bun:"last_used,notnull,default:0"`
 | |
| 	Revoked         bool   `json:"revoked" bun:"revoked,notnull,default:false"`
 | |
| 	UpdatedByUserID string `json:"updatedByUserId" bun:"updated_by_user_id,type:text,notnull,default:''"`
 | |
| }
 | |
| 
 | |
| func (migration *updateIntegrations) Up(ctx context.Context, db *bun.DB) error {
 | |
| 
 | |
| 	// begin transaction
 | |
| 	tx, err := db.BeginTx(ctx, nil)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer tx.Rollback()
 | |
| 
 | |
| 	// don't run the migration if there are multiple org ids
 | |
| 	orgIDs := make([]string, 0)
 | |
| 	err = migration.store.BunDB().NewSelect().Model((*types.Organization)(nil)).Column("id").Scan(ctx, &orgIDs)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if len(orgIDs) > 1 {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// installed integrations
 | |
| 	err = migration.
 | |
| 		store.
 | |
| 		Dialect().
 | |
| 		RenameTableAndModifyModel(ctx, tx, new(existingInstalledIntegration), new(newInstalledIntegration), []string{OrgReference}, func(ctx context.Context) error {
 | |
| 			existingIntegrations := make([]*existingInstalledIntegration, 0)
 | |
| 			err = tx.
 | |
| 				NewSelect().
 | |
| 				Model(&existingIntegrations).
 | |
| 				Scan(ctx)
 | |
| 			if err != nil {
 | |
| 				if err != sql.ErrNoRows {
 | |
| 					return err
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			if err == nil && len(existingIntegrations) > 0 {
 | |
| 				newIntegrations := migration.
 | |
| 					CopyOldIntegrationsToNewIntegrations(tx, orgIDs[0], existingIntegrations)
 | |
| 				_, err = tx.
 | |
| 					NewInsert().
 | |
| 					Model(&newIntegrations).
 | |
| 					Exec(ctx)
 | |
| 				if err != nil {
 | |
| 					return err
 | |
| 				}
 | |
| 			}
 | |
| 			return nil
 | |
| 		})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// cloud integrations
 | |
| 	err = migration.
 | |
| 		store.
 | |
| 		Dialect().
 | |
| 		RenameTableAndModifyModel(ctx, tx, new(existingCloudIntegration), new(newCloudIntegration), []string{OrgReference}, func(ctx context.Context) error {
 | |
| 			existingIntegrations := make([]*existingCloudIntegration, 0)
 | |
| 			err = tx.
 | |
| 				NewSelect().
 | |
| 				Model(&existingIntegrations).
 | |
| 				Where("removed_at IS NULL"). // we will only copy the accounts that are not removed
 | |
| 				Scan(ctx)
 | |
| 			if err != nil {
 | |
| 				if err != sql.ErrNoRows {
 | |
| 					return err
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			if err == nil && len(existingIntegrations) > 0 {
 | |
| 				newIntegrations := migration.
 | |
| 					CopyOldCloudIntegrationsToNewCloudIntegrations(tx, orgIDs[0], existingIntegrations)
 | |
| 				_, err = tx.
 | |
| 					NewInsert().
 | |
| 					Model(&newIntegrations).
 | |
| 					Exec(ctx)
 | |
| 				if err != nil {
 | |
| 					return err
 | |
| 				}
 | |
| 			}
 | |
| 			return nil
 | |
| 		})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// add unique constraint to cloud_integration table
 | |
| 	_, err = tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS unique_cloud_integration ON cloud_integration (id, provider, org_id)`)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// cloud integration service
 | |
| 	err = migration.
 | |
| 		store.
 | |
| 		Dialect().
 | |
| 		RenameTableAndModifyModel(ctx, tx, new(existingCloudIntegrationService), new(newCloudIntegrationService), []string{CloudIntegrationReference}, func(ctx context.Context) error {
 | |
| 			existingServices := make([]*existingCloudIntegrationService, 0)
 | |
| 
 | |
| 			// only one service per provider,account id and type
 | |
| 			// so there won't be any duplicates.
 | |
| 			// just that these will be enabled as soon as the integration for the account is enabled
 | |
| 			err = tx.
 | |
| 				NewSelect().
 | |
| 				Model(&existingServices).
 | |
| 				Scan(ctx)
 | |
| 			if err != nil {
 | |
| 				if err != sql.ErrNoRows {
 | |
| 					return err
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			if err == nil && len(existingServices) > 0 {
 | |
| 				newServices := migration.
 | |
| 					CopyOldCloudIntegrationServicesToNewCloudIntegrationServices(tx, orgIDs[0], existingServices)
 | |
| 				if len(newServices) > 0 {
 | |
| 					_, err = tx.
 | |
| 						NewInsert().
 | |
| 						Model(&newServices).
 | |
| 						Exec(ctx)
 | |
| 					if err != nil {
 | |
| 						return err
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 			return nil
 | |
| 		})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if len(orgIDs) == 0 {
 | |
| 		err = tx.Commit()
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// copy the old aws integration user to the new user
 | |
| 	err = migration.copyOldAwsIntegrationUser(tx, orgIDs[0])
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	err = tx.Commit()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (migration *updateIntegrations) Down(ctx context.Context, db *bun.DB) error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (migration *updateIntegrations) CopyOldIntegrationsToNewIntegrations(tx bun.IDB, orgID string, existingIntegrations []*existingInstalledIntegration) []*newInstalledIntegration {
 | |
| 	newIntegrations := make([]*newInstalledIntegration, 0)
 | |
| 
 | |
| 	for _, integration := range existingIntegrations {
 | |
| 		newIntegrations = append(newIntegrations, &newInstalledIntegration{
 | |
| 			Identifiable: types.Identifiable{
 | |
| 				ID: valuer.GenerateUUID(),
 | |
| 			},
 | |
| 			Type:        integration.IntegrationID,
 | |
| 			Config:      integration.ConfigJSON,
 | |
| 			InstalledAt: integration.InstalledAt,
 | |
| 			OrgID:       orgID,
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	return newIntegrations
 | |
| }
 | |
| 
 | |
| func (migration *updateIntegrations) CopyOldCloudIntegrationsToNewCloudIntegrations(tx bun.IDB, orgID string, existingIntegrations []*existingCloudIntegration) []*newCloudIntegration {
 | |
| 	newIntegrations := make([]*newCloudIntegration, 0)
 | |
| 
 | |
| 	for _, integration := range existingIntegrations {
 | |
| 		newIntegrations = append(newIntegrations, &newCloudIntegration{
 | |
| 			Identifiable: types.Identifiable{
 | |
| 				ID: valuer.GenerateUUID(),
 | |
| 			},
 | |
| 			TimeAuditable: types.TimeAuditable{
 | |
| 				CreatedAt: integration.CreatedAt,
 | |
| 				UpdatedAt: integration.CreatedAt,
 | |
| 			},
 | |
| 			Provider:        integration.CloudProvider,
 | |
| 			AccountID:       integration.CloudAccountID,
 | |
| 			Config:          integration.ConfigJSON,
 | |
| 			LastAgentReport: integration.LastAgentReportJSON,
 | |
| 			RemovedAt:       integration.RemovedAt,
 | |
| 			OrgID:           orgID,
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	return newIntegrations
 | |
| }
 | |
| 
 | |
| func (migration *updateIntegrations) CopyOldCloudIntegrationServicesToNewCloudIntegrationServices(tx bun.IDB, orgID string, existingServices []*existingCloudIntegrationService) []*newCloudIntegrationService {
 | |
| 	newServices := make([]*newCloudIntegrationService, 0)
 | |
| 
 | |
| 	for _, service := range existingServices {
 | |
| 		var cloudIntegrationID string
 | |
| 		err := tx.NewSelect().
 | |
| 			Model((*newCloudIntegration)(nil)).
 | |
| 			Column("id").
 | |
| 			Where("account_id = ?", service.CloudAccountID).
 | |
| 			Where("provider = ?", service.CloudProvider).
 | |
| 			Where("org_id = ?", orgID).
 | |
| 			Scan(context.Background(), &cloudIntegrationID)
 | |
| 		if err != nil {
 | |
| 			if err == sql.ErrNoRows {
 | |
| 				continue
 | |
| 			}
 | |
| 			zap.L().Error("failed to get cloud integration id", zap.Error(err))
 | |
| 			return nil
 | |
| 		}
 | |
| 		newServices = append(newServices, &newCloudIntegrationService{
 | |
| 			Identifiable: types.Identifiable{
 | |
| 				ID: valuer.GenerateUUID(),
 | |
| 			},
 | |
| 			TimeAuditable: types.TimeAuditable{
 | |
| 				CreatedAt: service.CreatedAt,
 | |
| 				UpdatedAt: service.CreatedAt,
 | |
| 			},
 | |
| 			Type:               service.ServiceID,
 | |
| 			Config:             service.ConfigJSON,
 | |
| 			CloudIntegrationID: cloudIntegrationID,
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	return newServices
 | |
| }
 | |
| 
 | |
| func (migration *updateIntegrations) copyOldAwsIntegrationUser(tx bun.IDB, orgID string) error {
 | |
| 	type oldUser struct {
 | |
| 		bun.BaseModel `bun:"table:users"`
 | |
| 
 | |
| 		types.TimeAuditable
 | |
| 		ID                string `bun:"id,pk,type:text" json:"id"`
 | |
| 		Name              string `bun:"name,type:text,notnull" json:"name"`
 | |
| 		Email             string `bun:"email,type:text,notnull,unique" json:"email"`
 | |
| 		Password          string `bun:"password,type:text,notnull" json:"-"`
 | |
| 		ProfilePictureURL string `bun:"profile_picture_url,type:text" json:"profilePictureURL"`
 | |
| 		GroupID           string `bun:"group_id,type:text,notnull" json:"groupId"`
 | |
| 		OrgID             string `bun:"org_id,type:text,notnull" json:"orgId"`
 | |
| 	}
 | |
| 
 | |
| 	user := &oldUser{}
 | |
| 	err := tx.NewSelect().Model(user).Where("email = ?", "aws-integration@signoz.io").Scan(context.Background())
 | |
| 	if err != nil {
 | |
| 		if err == sql.ErrNoRows {
 | |
| 			return nil
 | |
| 		}
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// check if the id is already an uuid
 | |
| 	if _, err := uuid.Parse(user.ID); err == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// new user
 | |
| 	newUser := &oldUser{
 | |
| 		ID: uuid.New().String(),
 | |
| 		TimeAuditable: types.TimeAuditable{
 | |
| 			CreatedAt: time.Now(),
 | |
| 			UpdatedAt: time.Now(),
 | |
| 		},
 | |
| 		OrgID:    orgID,
 | |
| 		Name:     user.Name,
 | |
| 		Email:    user.Email,
 | |
| 		GroupID:  user.GroupID,
 | |
| 		Password: user.Password,
 | |
| 	}
 | |
| 
 | |
| 	// get the pat for old user
 | |
| 	pat := &StorablePersonalAccessToken{}
 | |
| 	err = tx.NewSelect().Model(pat).Where("user_id = ? and revoked = false", "aws-integration").Scan(context.Background())
 | |
| 	if err != nil {
 | |
| 		if err == sql.ErrNoRows {
 | |
| 			// delete the old user
 | |
| 			_, err = tx.ExecContext(context.Background(), `DELETE FROM users WHERE id = ?`, user.ID)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// new pat
 | |
| 	newPAT := &StorablePersonalAccessToken{
 | |
| 		Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
 | |
| 		TimeAuditable: types.TimeAuditable{
 | |
| 			CreatedAt: time.Now(),
 | |
| 			UpdatedAt: time.Now(),
 | |
| 		},
 | |
| 		OrgID:     orgID,
 | |
| 		UserID:    newUser.ID,
 | |
| 		Token:     pat.Token,
 | |
| 		Name:      pat.Name,
 | |
| 		ExpiresAt: pat.ExpiresAt,
 | |
| 		LastUsed:  pat.LastUsed,
 | |
| 		Revoked:   pat.Revoked,
 | |
| 		Role:      pat.Role,
 | |
| 	}
 | |
| 
 | |
| 	// delete old user
 | |
| 	_, err = tx.ExecContext(context.Background(), `DELETE FROM users WHERE id = ?`, user.ID)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// insert the new user
 | |
| 	_, err = tx.NewInsert().Model(newUser).Exec(context.Background())
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// insert the new pat
 | |
| 	_, err = tx.NewInsert().Model(newPAT).Exec(context.Background())
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 |