mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-06-04 11:25:52 +08:00
294 lines
7.2 KiB
Go
294 lines
7.2 KiB
Go
package sqlmigration
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/SigNoz/signoz/pkg/alertmanager/alertmanagerserver"
|
|
"github.com/SigNoz/signoz/pkg/factory"
|
|
"github.com/SigNoz/signoz/pkg/sqlstore"
|
|
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
|
"github.com/prometheus/alertmanager/config"
|
|
"github.com/tidwall/gjson"
|
|
"github.com/uptrace/bun"
|
|
"github.com/uptrace/bun/migrate"
|
|
)
|
|
|
|
type addAlertmanager struct {
|
|
store sqlstore.SQLStore
|
|
}
|
|
|
|
func NewAddAlertmanagerFactory(store sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
|
|
return factory.NewProviderFactory(factory.MustNewName("add_alertmanager"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
|
|
return newAddAlertmanager(ctx, ps, c, store)
|
|
})
|
|
}
|
|
|
|
func newAddAlertmanager(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) {
|
|
return &addAlertmanager{
|
|
store: store,
|
|
}, nil
|
|
}
|
|
|
|
func (migration *addAlertmanager) Register(migrations *migrate.Migrations) error {
|
|
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (migration *addAlertmanager) Up(ctx context.Context, db *bun.DB) error {
|
|
tx, err := db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
_ = tx.Rollback()
|
|
}()
|
|
|
|
if exists, err := migration.store.Dialect().ColumnExists(ctx, tx, "notification_channels", "deleted"); err != nil {
|
|
return err
|
|
} else if exists {
|
|
if _, err := tx.
|
|
NewDropColumn().
|
|
Table("notification_channels").
|
|
ColumnExpr("deleted").
|
|
Exec(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if exists, err := migration.store.Dialect().ColumnExists(ctx, tx, "notification_channels", "org_id"); err != nil {
|
|
return err
|
|
} else if !exists {
|
|
if _, err := tx.
|
|
NewAddColumn().
|
|
Table("notification_channels").
|
|
ColumnExpr("org_id TEXT REFERENCES organizations(id) ON DELETE CASCADE").
|
|
Exec(ctx); err != nil && err != ErrNoExecute {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if _, err := tx.
|
|
NewCreateTable().
|
|
Model(&struct {
|
|
bun.BaseModel `bun:"table:alertmanager_config"`
|
|
ID uint64 `bun:"id,pk,autoincrement"`
|
|
Config string `bun:"config,notnull,type:text"`
|
|
Hash string `bun:"hash,notnull,type:text"`
|
|
CreatedAt time.Time `bun:"created_at,notnull"`
|
|
UpdatedAt time.Time `bun:"updated_at,notnull"`
|
|
OrgID string `bun:"org_id,notnull,unique"`
|
|
}{}).
|
|
ForeignKey(`("org_id") REFERENCES "organizations" ("id")`).
|
|
IfNotExists().
|
|
Exec(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := tx.
|
|
NewCreateTable().
|
|
Model(&struct {
|
|
bun.BaseModel `bun:"table:alertmanager_state"`
|
|
ID uint64 `bun:"id,pk,autoincrement"`
|
|
Silences string `bun:"silences,nullzero,type:text"`
|
|
NFLog string `bun:"nflog,nullzero,type:text"`
|
|
CreatedAt time.Time `bun:"created_at,notnull"`
|
|
UpdatedAt time.Time `bun:"updated_at,notnull"`
|
|
OrgID string `bun:"org_id,notnull,unique"`
|
|
}{}).
|
|
ForeignKey(`("org_id") REFERENCES "organizations" ("id")`).
|
|
IfNotExists().
|
|
Exec(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
var orgID string
|
|
err = tx.
|
|
NewSelect().
|
|
ColumnExpr("id").
|
|
Table("organizations").
|
|
Limit(1).
|
|
Scan(ctx, &orgID)
|
|
if err != nil {
|
|
if err != sql.ErrNoRows {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err == nil {
|
|
if err := migration.populateOrgIDInChannels(ctx, tx, orgID); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := migration.populateAlertmanagerConfig(ctx, tx, orgID); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (migration *addAlertmanager) populateOrgIDInChannels(ctx context.Context, tx bun.Tx, orgID string) error {
|
|
if _, err := tx.
|
|
NewUpdate().
|
|
Table("notification_channels").
|
|
Set("org_id = ?", orgID).
|
|
Where("org_id IS NULL").
|
|
Exec(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (migration *addAlertmanager) populateAlertmanagerConfig(ctx context.Context, tx bun.Tx, orgID string) error {
|
|
var channels []*alertmanagertypes.Channel
|
|
|
|
err := tx.
|
|
NewSelect().
|
|
Model(&channels).
|
|
Where("org_id = ?", orgID).
|
|
Scan(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var receiversFromChannels []string
|
|
for _, channel := range channels {
|
|
receiversFromChannels = append(receiversFromChannels, channel.Name)
|
|
}
|
|
|
|
type matcher struct {
|
|
bun.BaseModel `bun:"table:rules"`
|
|
ID int `bun:"id,pk"`
|
|
Data string `bun:"data"`
|
|
}
|
|
|
|
matchers := []matcher{}
|
|
|
|
err = tx.
|
|
NewSelect().
|
|
Column("id", "data").
|
|
Model(&matchers).
|
|
Scan(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
matchersMap := make(map[string][]string)
|
|
for _, matcher := range matchers {
|
|
receivers := gjson.Get(matcher.Data, "preferredChannels").Array()
|
|
for _, receiver := range receivers {
|
|
matchersMap[strconv.Itoa(matcher.ID)] = append(matchersMap[strconv.Itoa(matcher.ID)], receiver.String())
|
|
}
|
|
|
|
if len(receivers) == 0 {
|
|
matchersMap[strconv.Itoa(matcher.ID)] = append(matchersMap[strconv.Itoa(matcher.ID)], receiversFromChannels...)
|
|
}
|
|
}
|
|
|
|
for _, channel := range channels {
|
|
if err := migration.msTeamsChannelToMSTeamsV2Channel(channel); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
config, err := alertmanagertypes.NewConfigFromChannels(alertmanagerserver.NewConfig().Global, alertmanagerserver.NewConfig().Route, channels, orgID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for ruleID, receivers := range matchersMap {
|
|
err = config.CreateRuleIDMatcher(ruleID, receivers)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if _, err := tx.
|
|
NewInsert().
|
|
Model(config.StoreableConfig()).
|
|
On("CONFLICT (org_id) DO UPDATE").
|
|
Set("config = ?", config.StoreableConfig().Config).
|
|
Set("hash = ?", config.StoreableConfig().Hash).
|
|
Set("updated_at = ?", config.StoreableConfig().UpdatedAt).
|
|
Exec(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, channel := range channels {
|
|
if channel.Type == "msteamsv2" {
|
|
if _, err := tx.
|
|
NewUpdate().
|
|
Model(channel).
|
|
WherePK().
|
|
Exec(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (migration *addAlertmanager) Down(ctx context.Context, db *bun.DB) error {
|
|
return nil
|
|
}
|
|
|
|
func (migration *addAlertmanager) msTeamsChannelToMSTeamsV2Channel(c *alertmanagertypes.Channel) error {
|
|
if c.Type != "msteams" {
|
|
return nil
|
|
}
|
|
|
|
receiver, err := alertmanagertypes.NewReceiver(c.Data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
receiver = migration.msTeamsReceiverToMSTeamsV2Receiver(receiver)
|
|
data, err := json.Marshal(receiver)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.Type = "msteamsv2"
|
|
c.Data = string(data)
|
|
c.UpdatedAt = time.Now()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (migration *addAlertmanager) msTeamsReceiverToMSTeamsV2Receiver(receiver alertmanagertypes.Receiver) alertmanagertypes.Receiver {
|
|
if receiver.MSTeamsConfigs == nil {
|
|
return receiver
|
|
}
|
|
|
|
var msTeamsV2Configs []*config.MSTeamsV2Config
|
|
for _, cfg := range receiver.MSTeamsConfigs {
|
|
msTeamsV2Configs = append(msTeamsV2Configs, &config.MSTeamsV2Config{
|
|
NotifierConfig: cfg.NotifierConfig,
|
|
HTTPConfig: cfg.HTTPConfig,
|
|
WebhookURL: cfg.WebhookURL,
|
|
WebhookURLFile: cfg.WebhookURLFile,
|
|
Title: cfg.Title,
|
|
Text: cfg.Text,
|
|
})
|
|
}
|
|
|
|
receiver.MSTeamsConfigs = nil
|
|
receiver.MSTeamsV2Configs = msTeamsV2Configs
|
|
|
|
return receiver
|
|
}
|