diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go index b83f392caa..c228386ff7 100644 --- a/pkg/alertmanager/alertmanager.go +++ b/pkg/alertmanager/alertmanager.go @@ -22,4 +22,19 @@ type Alertmanager interface { // TestReceiver sends a test alert to a receiver. TestReceiver(context.Context, string, alertmanagertypes.Receiver) error + + // ListChannels lists all channels for the organization. + ListChannels(context.Context, string) ([]*alertmanagertypes.Channel, error) + + // GetChannelByID gets a channel for the organization. + GetChannelByID(context.Context, string, int) (*alertmanagertypes.Channel, error) + + // UpdateChannel updates a channel for the organization. + UpdateChannelByReceiver(context.Context, string, alertmanagertypes.Receiver) error + + // CreateChannel creates a channel for the organization. + CreateChannel(context.Context, string, alertmanagertypes.Receiver) error + + // DeleteChannelByID deletes a channel for the organization. + DeleteChannelByID(context.Context, string, int) error } diff --git a/pkg/alertmanager/alertmanagerbatcher/batcher.go b/pkg/alertmanager/alertmanagerbatcher/batcher.go new file mode 100644 index 0000000000..1264f1dce5 --- /dev/null +++ b/pkg/alertmanager/alertmanagerbatcher/batcher.go @@ -0,0 +1,132 @@ +package alertmanagerbatcher + +import ( + "context" + "log/slog" + + "sync" + + "go.signoz.io/signoz/pkg/types/alertmanagertypes" +) + +// Notifier is responsible for dispatching alert notifications to an alertmanager. +type Batcher struct { + C chan alertmanagertypes.PostableAlerts + // logger + logger *slog.Logger + + // queue of alerts to be sent to alertmanager + queue alertmanagertypes.PostableAlerts + + // config + config Config + + // more channel to signal the sender goroutine to send alerts + moreC chan struct{} + stopC chan struct{} + mtx sync.RWMutex +} + +func New(logger *slog.Logger, config Config) *Batcher { + batcher := &Batcher{ + logger: logger, + queue: make(alertmanagertypes.PostableAlerts, config.Capacity), + config: config, + moreC: make(chan struct{}, 1), + stopC: make(chan struct{}), + C: make(chan alertmanagertypes.PostableAlerts, config.Size), + } + + return batcher +} + +// Start dispatches notifications continuously. +func (n *Batcher) Start(ctx context.Context) error { + go func() { + n.logger.InfoContext(ctx, "starting alertmanager batcher") + for { + select { + case <-ctx.Done(): + return + case <-n.stopC: + return + case <-n.moreC: + } + alerts := n.nextBatch() + n.C <- alerts + // If the queue still has items left, kick off the next iteration. + if n.queueLen() > 0 { + n.setMore() + } + } + }() + + return nil +} + +func (n *Batcher) queueLen() int { + n.mtx.RLock() + defer n.mtx.RUnlock() + + return len(n.queue) +} + +func (n *Batcher) nextBatch() alertmanagertypes.PostableAlerts { + n.mtx.Lock() + defer n.mtx.Unlock() + + var alerts alertmanagertypes.PostableAlerts + + if len(n.queue) > n.config.Size { + alerts = append(make(alertmanagertypes.PostableAlerts, 0, n.config.Size), n.queue[:n.config.Size]...) + n.queue = n.queue[n.config.Size:] + } else { + alerts = append(make(alertmanagertypes.PostableAlerts, 0, len(n.queue)), n.queue...) + n.queue = n.queue[:0] + } + + return alerts +} + +// Send queues the given notification requests for processing. +// Panics if called on a handler that is not running. +func (n *Batcher) Send(ctx context.Context, alerts ...*alertmanagertypes.PostableAlert) { + n.mtx.Lock() + defer n.mtx.Unlock() + + // Queue capacity should be significantly larger than a single alert + // batch could be. + if d := len(alerts) - n.config.Capacity; d > 0 { + alerts = alerts[d:] + n.logger.WarnContext(ctx, "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d) + } + + // If the queue is full, remove the oldest alerts in favor + // of newer ones. + if d := (len(n.queue) + len(alerts)) - n.config.Capacity; d > 0 { + n.queue = n.queue[d:] + + n.logger.WarnContext(ctx, "Alert notification queue full, dropping alerts", "num_dropped", d) + } + n.queue = append(n.queue, alerts...) + + // Notify sending goroutine that there are alerts to be processed. + n.setMore() +} + +// setMore signals that the alert queue has items. +func (n *Batcher) setMore() { + // If we cannot send on the channel, it means the signal already exists + // and has not been consumed yet. + select { + case n.moreC <- struct{}{}: + default: + } +} + +// Stop shuts down the notification handler. +func (n *Batcher) Stop(ctx context.Context) { + n.logger.InfoContext(ctx, "Stopping alertmanager batcher") + close(n.moreC) + close(n.stopC) +} diff --git a/pkg/alertmanager/alertmanagerbatcher/config.go b/pkg/alertmanager/alertmanagerbatcher/config.go new file mode 100644 index 0000000000..ca6446a58a --- /dev/null +++ b/pkg/alertmanager/alertmanagerbatcher/config.go @@ -0,0 +1,13 @@ +package alertmanagerbatcher + +type Config struct { + Capacity int + Size int +} + +func NewConfig() Config { + return Config{ + Capacity: 1000, + Size: 64, + } +} diff --git a/pkg/alertmanager/server/config.go b/pkg/alertmanager/alertmanagerserver/config.go similarity index 99% rename from pkg/alertmanager/server/config.go rename to pkg/alertmanager/alertmanagerserver/config.go index 394801fd16..fbc27ec517 100644 --- a/pkg/alertmanager/server/config.go +++ b/pkg/alertmanager/alertmanagerserver/config.go @@ -1,4 +1,4 @@ -package server +package alertmanagerserver import ( "net/url" diff --git a/pkg/alertmanager/server/server.go b/pkg/alertmanager/alertmanagerserver/server.go similarity index 99% rename from pkg/alertmanager/server/server.go rename to pkg/alertmanager/alertmanagerserver/server.go index 06252beec8..3ab34d0df8 100644 --- a/pkg/alertmanager/server/server.go +++ b/pkg/alertmanager/alertmanagerserver/server.go @@ -1,4 +1,4 @@ -package server +package alertmanagerserver import ( "context" diff --git a/pkg/alertmanager/server/server_test.go b/pkg/alertmanager/alertmanagerserver/server_test.go similarity index 99% rename from pkg/alertmanager/server/server_test.go rename to pkg/alertmanager/alertmanagerserver/server_test.go index 0e6d34bee2..bb2d6a6c57 100644 --- a/pkg/alertmanager/server/server_test.go +++ b/pkg/alertmanager/alertmanagerserver/server_test.go @@ -1,4 +1,4 @@ -package server +package alertmanagerserver import ( "bytes" diff --git a/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go b/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go index a50d3d8b51..2dfa5ef424 100644 --- a/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go +++ b/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go @@ -45,7 +45,7 @@ func (store *config) Get(ctx context.Context, orgID string) (*alertmanagertypes. } // Set implements alertmanagertypes.ConfigStore. -func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config) error { +func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config, cb func(context.Context) error) error { tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil) if err != nil { return err @@ -75,6 +75,12 @@ func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config) } } + if cb != nil { + if err = cb(ctx); err != nil { + return err + } + } + if err = tx.Commit(); err != nil { return err } diff --git a/pkg/alertmanager/api.go b/pkg/alertmanager/api.go index beb51d4fad..c34fd3d636 100644 --- a/pkg/alertmanager/api.go +++ b/pkg/alertmanager/api.go @@ -15,13 +15,11 @@ import ( ) type API struct { - configStore alertmanagertypes.ConfigStore alertmanager Alertmanager } -func NewAPI(configStore alertmanagertypes.ConfigStore, alertmanager Alertmanager) *API { +func NewAPI(alertmanager Alertmanager) *API { return &API{ - configStore: configStore, alertmanager: alertmanager, } } @@ -83,7 +81,7 @@ func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) { render.Success(rw, http.StatusNoContent, nil) } -func (api *API) GetChannels(req *http.Request, rw http.ResponseWriter) { +func (api *API) ListChannels(req *http.Request, rw http.ResponseWriter) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() @@ -93,20 +91,13 @@ func (api *API) GetChannels(req *http.Request, rw http.ResponseWriter) { return } - config, err := api.configStore.Get(ctx, claims.OrgID) + channels, err := api.alertmanager.ListChannels(ctx, claims.OrgID) if err != nil { render.Error(rw, err) return } - channels := config.Channels() - - channelList := make([]*alertmanagertypes.Channel, 0, len(channels)) - for _, channel := range channels { - channelList = append(channelList, channel) - } - - render.Success(rw, http.StatusOK, channelList) + render.Success(rw, http.StatusOK, channels) } func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) { @@ -137,14 +128,7 @@ func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) { return } - config, err := api.configStore.Get(ctx, claims.OrgID) - if err != nil { - render.Error(rw, err) - return - } - - channels := config.Channels() - channel, err := alertmanagertypes.GetChannelByID(channels, id) + channel, err := api.alertmanager.GetChannelByID(ctx, claims.OrgID, id) if err != nil { render.Error(rw, err) return @@ -176,19 +160,7 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) { return } - config, err := api.configStore.Get(ctx, claims.OrgID) - if err != nil { - render.Error(rw, err) - return - } - - err = config.UpdateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver) - if err != nil { - render.Error(rw, err) - return - } - - err = api.configStore.Set(ctx, config) + err = api.alertmanager.UpdateChannelByReceiver(ctx, claims.OrgID, receiver) if err != nil { render.Error(rw, err) return @@ -225,26 +197,39 @@ func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) { return } - config, err := api.configStore.Get(ctx, claims.OrgID) - if err != nil { - render.Error(rw, err) - return - } - - channels := config.Channels() - channel, err := alertmanagertypes.GetChannelByID(channels, id) - if err != nil { - render.Error(rw, err) - return - } - - err = config.DeleteReceiver(channel.Name) - if err != nil { - render.Error(rw, err) - return - } - - err = api.configStore.Set(ctx, config) + err = api.alertmanager.DeleteChannelByID(ctx, claims.OrgID, id) + if err != nil { + render.Error(rw, err) + return + } + + render.Success(rw, http.StatusNoContent, nil) +} + +func (api *API) CreateChannel(req *http.Request, rw http.ResponseWriter) { + ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) + defer cancel() + + claims, ok := authtypes.ClaimsFromContext(ctx) + if !ok { + render.Error(rw, errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated")) + return + } + + body, err := io.ReadAll(req.Body) + if err != nil { + render.Error(rw, err) + return + } + defer req.Body.Close() //nolint:errcheck + + receiver, err := alertmanagertypes.NewReceiver(string(body)) + if err != nil { + render.Error(rw, err) + return + } + + err = api.alertmanager.CreateChannel(ctx, claims.OrgID, receiver) if err != nil { render.Error(rw, err) return diff --git a/pkg/alertmanager/config.go b/pkg/alertmanager/config.go index e5fd4fe148..261d2b2dbe 100644 --- a/pkg/alertmanager/config.go +++ b/pkg/alertmanager/config.go @@ -1,37 +1,46 @@ package alertmanager import ( + "net/url" "time" - "go.signoz.io/signoz/pkg/alertmanager/server" + "go.signoz.io/signoz/pkg/alertmanager/alertmanagerserver" "go.signoz.io/signoz/pkg/factory" ) type Config struct { // Config is the config for the alertmanager server. - server.Config `mapstructure:",squash"` + alertmanagerserver.Config `mapstructure:",squash"` // Provider is the provider for the alertmanager service. Provider string `mapstructure:"provider"` // Internal is the internal alertmanager configuration. - Internal Internal `mapstructure:"internal"` + Signoz Signoz `mapstructure:"signoz"` + + // Legacy is the legacy alertmanager configuration. + Legacy Legacy `mapstructure:"legacy"` } -type Internal struct { +type Signoz struct { // PollInterval is the interval at which the alertmanager is synced. PollInterval time.Duration `mapstructure:"poll_interval"` } +type Legacy struct { + // URL is the URL of the legacy alertmanager. + URL *url.URL `mapstructure:"url"` +} + func NewConfigFactory() factory.ConfigFactory { return factory.NewConfigFactory(factory.MustNewName("alertmanager"), newConfig) } func newConfig() factory.Config { return Config{ - Config: server.NewConfig(), - Provider: "internal", - Internal: Internal{ + Config: alertmanagerserver.NewConfig(), + Provider: "signoz", + Signoz: Signoz{ PollInterval: 15 * time.Second, }, } diff --git a/pkg/alertmanager/internalalertmanager/provider.go b/pkg/alertmanager/internalalertmanager/provider.go deleted file mode 100644 index 022d458a59..0000000000 --- a/pkg/alertmanager/internalalertmanager/provider.go +++ /dev/null @@ -1,70 +0,0 @@ -package internalalertmanager - -import ( - "context" - "time" - - "go.signoz.io/signoz/pkg/alertmanager" - "go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore" - "go.signoz.io/signoz/pkg/factory" - "go.signoz.io/signoz/pkg/sqlstore" - "go.signoz.io/signoz/pkg/types/alertmanagertypes" -) - -type provider struct { - service *alertmanager.Service - config alertmanager.Config - settings factory.ScopedProviderSettings -} - -func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] { - return factory.NewProviderFactory(factory.MustNewName("internal"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanager.Config) (alertmanager.Alertmanager, error) { - return New(ctx, settings, config, sqlstore) - }) -} - -func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore) (alertmanager.Alertmanager, error) { - settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/internalalertmanager") - return &provider{ - service: alertmanager.New( - ctx, - settings, - config, - sqlalertmanagerstore.NewStateStore(sqlstore), - sqlalertmanagerstore.NewConfigStore(sqlstore), - ), - settings: settings, - config: config, - }, nil -} - -func (provider *provider) Start(ctx context.Context) error { - ticker := time.NewTicker(provider.config.Internal.PollInterval) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - if err := provider.service.SyncServers(ctx); err != nil { - provider.settings.Logger().ErrorContext(ctx, "failed to sync alertmanager servers", "error", err) - } - } - } -} - -func (provider *provider) Stop(ctx context.Context) error { - return provider.service.Stop(ctx) -} - -func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) { - return provider.service.GetAlerts(ctx, orgID, params) -} - -func (provider *provider) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error { - return provider.service.PutAlerts(ctx, orgID, alerts) -} - -func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error { - return provider.service.TestReceiver(ctx, orgID, receiver) -} diff --git a/pkg/alertmanager/legacyalertmanager/provider.go b/pkg/alertmanager/legacyalertmanager/provider.go new file mode 100644 index 0000000000..93f43a7440 --- /dev/null +++ b/pkg/alertmanager/legacyalertmanager/provider.go @@ -0,0 +1,342 @@ +package legacyalertmanager + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "go.signoz.io/signoz/pkg/alertmanager" + "go.signoz.io/signoz/pkg/alertmanager/alertmanagerbatcher" + "go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/sqlstore" + "go.signoz.io/signoz/pkg/types/alertmanagertypes" +) + +const ( + alertsPath string = "/v1/alerts" + routesPath string = "/v1/routes" + testReceiverPath string = "/v1/testReceiver" +) + +type provider struct { + config alertmanager.Config + settings factory.ScopedProviderSettings + client *http.Client + configStore alertmanagertypes.ConfigStore + batcher *alertmanagerbatcher.Batcher +} + +func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] { + return factory.NewProviderFactory(factory.MustNewName("legacy"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanager.Config) (alertmanager.Alertmanager, error) { + return New(ctx, settings, config, sqlstore) + }) +} + +func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore) (*provider, error) { + settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager") + configStore := sqlalertmanagerstore.NewConfigStore(sqlstore) + + return &provider{ + config: config, + settings: settings, + client: &http.Client{ + Timeout: 30 * time.Second, + }, + configStore: configStore, + batcher: alertmanagerbatcher.New(settings.Logger(), alertmanagerbatcher.NewConfig()), + }, nil +} + +func (provider *provider) Start(ctx context.Context) error { + err := provider.batcher.Start(ctx) + if err != nil { + return err + } + defer provider.batcher.Stop(ctx) + + for { + select { + case <-ctx.Done(): + return nil + case alerts := <-provider.batcher.C: + if err := provider.putAlerts(ctx, "", alerts); err != nil { + provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err) + } + } + } +} + +func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) { + url := provider.config.Legacy.URL.JoinPath(alertsPath) + url.RawQuery = params.RawQuery + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil) + if err != nil { + return nil, err + } + req.Header.Add("Content-Type", "application/json") + + resp, err := provider.client.Do(req) + if err != nil { + return nil, err + } + + defer resp.Body.Close() //nolint:errcheck + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var alerts alertmanagertypes.GettableAlerts + if err := json.Unmarshal(body, &alerts); err != nil { + return nil, err + } + + return alerts, nil +} + +func (provider *provider) PutAlerts(ctx context.Context, _ string, alerts alertmanagertypes.PostableAlerts) error { + provider.batcher.Send(ctx, alerts...) + return nil +} + +func (provider *provider) putAlerts(ctx context.Context, _ string, alerts alertmanagertypes.PostableAlerts) error { + url := provider.config.Legacy.URL.JoinPath(alertsPath) + + body, err := json.Marshal(alerts) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), bytes.NewBuffer(body)) + if err != nil { + return err + } + req.Header.Add("Content-Type", "application/json") + + resp, err := provider.client.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() //nolint:errcheck + + // Any HTTP status 2xx is OK. + if resp.StatusCode/100 != 2 { + return fmt.Errorf("bad response status %v", resp.Status) + } + + return nil +} + +func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error { + url := provider.config.Legacy.URL.JoinPath(testReceiverPath) + + body, err := json.Marshal(receiver) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), bytes.NewBuffer(body)) + if err != nil { + return err + } + req.Header.Add("Content-Type", "application/json") + + resp, err := provider.client.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() //nolint:errcheck + + // Any HTTP status 2xx is OK. + if resp.StatusCode/100 != 2 { + return fmt.Errorf("bad response status %v", resp.Status) + } + + return nil +} + +func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) { + config, err := provider.configStore.Get(ctx, orgID) + if err != nil { + return nil, err + } + + channels := config.Channels() + channelList := make([]*alertmanagertypes.Channel, 0, len(channels)) + for _, channel := range channels { + channelList = append(channelList, channel) + } + + return channelList, nil +} + +func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) { + config, err := provider.configStore.Get(ctx, orgID) + if err != nil { + return nil, err + } + + channels := config.Channels() + channel, err := alertmanagertypes.GetChannelByID(channels, channelID) + if err != nil { + return nil, err + } + + return channel, nil +} + +func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error { + config, err := provider.configStore.Get(ctx, orgID) + if err != nil { + return err + } + + err = config.UpdateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver) + if err != nil { + return err + } + + err = provider.configStore.Set(ctx, config, func(ctx context.Context) error { + url := provider.config.Legacy.URL.JoinPath(routesPath) + + body, err := json.Marshal(receiver) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPut, url.String(), bytes.NewBuffer(body)) + if err != nil { + return err + } + req.Header.Add("Content-Type", "application/json") + + resp, err := provider.client.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() //nolint:errcheck + + // Any HTTP status 2xx is OK. + if resp.StatusCode/100 != 2 { + return fmt.Errorf("bad response status %v", resp.Status) + } + + return nil + }) + if err != nil { + return err + } + + return nil +} + +func (provider *provider) CreateChannel(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error { + config, err := provider.configStore.Get(ctx, orgID) + if err != nil { + return err + } + + err = config.CreateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver) + if err != nil { + return err + } + + err = provider.configStore.Set(ctx, config, func(ctx context.Context) error { + url := provider.config.Legacy.URL.JoinPath(routesPath) + + body, err := json.Marshal(receiver) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), bytes.NewBuffer(body)) + if err != nil { + return err + } + req.Header.Add("Content-Type", "application/json") + + resp, err := provider.client.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() //nolint:errcheck + + // Any HTTP status 2xx is OK. + if resp.StatusCode/100 != 2 { + return fmt.Errorf("bad response status %v", resp.Status) + } + + return nil + }) + if err != nil { + return err + } + + return nil +} + +func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID int) error { + config, err := provider.configStore.Get(ctx, orgID) + if err != nil { + return err + } + + channels := config.Channels() + channel, err := alertmanagertypes.GetChannelByID(channels, channelID) + if err != nil { + return err + } + + err = config.DeleteReceiver(channel.Name) + if err != nil { + return err + } + + err = provider.configStore.Set(ctx, config, func(ctx context.Context) error { + url := provider.config.Legacy.URL.JoinPath(routesPath) + + body, err := json.Marshal(map[string]string{"name": channel.Name}) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url.String(), bytes.NewBuffer(body)) + if err != nil { + return err + } + req.Header.Add("Content-Type", "application/json") + + resp, err := provider.client.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() //nolint:errcheck + + // Any HTTP status 2xx is OK. + if resp.StatusCode/100 != 2 { + return fmt.Errorf("bad response status %v", resp.Status) + } + + return nil + }) + if err != nil { + return err + } + + return nil +} + +func (provider *provider) Stop(ctx context.Context) error { + return nil +} diff --git a/pkg/alertmanager/service.go b/pkg/alertmanager/service.go index 6ec9c03081..447c8e862a 100644 --- a/pkg/alertmanager/service.go +++ b/pkg/alertmanager/service.go @@ -4,7 +4,7 @@ import ( "context" "sync" - "go.signoz.io/signoz/pkg/alertmanager/server" + "go.signoz.io/signoz/pkg/alertmanager/alertmanagerserver" "go.signoz.io/signoz/pkg/errors" "go.signoz.io/signoz/pkg/factory" "go.signoz.io/signoz/pkg/types/alertmanagertypes" @@ -24,7 +24,7 @@ type Service struct { settings factory.ScopedProviderSettings // Map of organization id to alertmanager server - servers map[string]*server.Server + servers map[string]*alertmanagerserver.Server // Mutex to protect the servers map serversMtx sync.RWMutex @@ -36,7 +36,7 @@ func New(ctx context.Context, settings factory.ScopedProviderSettings, config Co stateStore: stateStore, configStore: configStore, settings: settings, - servers: make(map[string]*server.Server), + servers: make(map[string]*alertmanagerserver.Server), serversMtx: sync.RWMutex{}, } @@ -57,7 +57,7 @@ func (service *Service) SyncServers(ctx context.Context) error { continue } - service.servers[orgID], err = server.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), server.Config{}, orgID, service.stateStore) + service.servers[orgID], err = alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config.Config, orgID, service.stateStore) if err != nil { service.settings.Logger().Error("failed to create alertmanagerserver", "orgID", orgID, "error", err) continue @@ -127,7 +127,7 @@ func (service *Service) getConfig(ctx context.Context, orgID string) (*alertmana return config, nil } -func (service *Service) getServer(orgID string) (*server.Server, error) { +func (service *Service) getServer(orgID string) (*alertmanagerserver.Server, error) { server, ok := service.servers[orgID] if !ok { return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for org %s", orgID) diff --git a/pkg/alertmanager/signozalertmanager/provider.go b/pkg/alertmanager/signozalertmanager/provider.go new file mode 100644 index 0000000000..22631ef367 --- /dev/null +++ b/pkg/alertmanager/signozalertmanager/provider.go @@ -0,0 +1,170 @@ +package signozalertmanager + +import ( + "context" + "time" + + "go.signoz.io/signoz/pkg/alertmanager" + "go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/sqlstore" + "go.signoz.io/signoz/pkg/types/alertmanagertypes" +) + +type provider struct { + service *alertmanager.Service + config alertmanager.Config + settings factory.ScopedProviderSettings + configStore alertmanagertypes.ConfigStore + stateStore alertmanagertypes.StateStore +} + +func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] { + return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanager.Config) (alertmanager.Alertmanager, error) { + return New(ctx, settings, config, sqlstore) + }) +} + +func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore) (*provider, error) { + settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/internalalertmanager") + configStore := sqlalertmanagerstore.NewConfigStore(sqlstore) + stateStore := sqlalertmanagerstore.NewStateStore(sqlstore) + + return &provider{ + service: alertmanager.New( + ctx, + settings, + config, + stateStore, + configStore, + ), + settings: settings, + config: config, + configStore: configStore, + stateStore: stateStore, + }, nil +} + +func (provider *provider) Start(ctx context.Context) error { + ticker := time.NewTicker(provider.config.Signoz.PollInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + if err := provider.service.SyncServers(ctx); err != nil { + provider.settings.Logger().ErrorContext(ctx, "failed to sync alertmanager servers", "error", err) + } + } + } +} + +func (provider *provider) Stop(ctx context.Context) error { + return provider.service.Stop(ctx) +} + +func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) { + return provider.service.GetAlerts(ctx, orgID, params) +} + +func (provider *provider) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error { + return provider.service.PutAlerts(ctx, orgID, alerts) +} + +func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error { + return provider.service.TestReceiver(ctx, orgID, receiver) +} + +func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) { + config, err := provider.configStore.Get(ctx, orgID) + if err != nil { + return nil, err + } + + channels := config.Channels() + channelList := make([]*alertmanagertypes.Channel, 0, len(channels)) + for _, channel := range channels { + channelList = append(channelList, channel) + } + + return channelList, nil +} + +func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) { + config, err := provider.configStore.Get(ctx, orgID) + if err != nil { + return nil, err + } + + channels := config.Channels() + channel, err := alertmanagertypes.GetChannelByID(channels, channelID) + if err != nil { + return nil, err + } + + return channel, nil +} + +func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error { + config, err := provider.configStore.Get(ctx, orgID) + if err != nil { + return err + } + + err = config.UpdateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver) + if err != nil { + return err + } + + err = provider.configStore.Set(ctx, config, nil) + if err != nil { + return err + } + + return nil +} + +func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID int) error { + config, err := provider.configStore.Get(ctx, orgID) + if err != nil { + return err + } + + channels := config.Channels() + channel, err := alertmanagertypes.GetChannelByID(channels, channelID) + if err != nil { + return err + } + + err = config.DeleteReceiver(channel.Name) + if err != nil { + return err + } + + err = provider.configStore.Set(ctx, config, nil) + if err != nil { + return err + } + + return nil +} + +func (provider *provider) CreateChannel(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error { + config, err := provider.configStore.Get(ctx, orgID) + if err != nil { + return err + } + + err = config.CreateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver) + if err != nil { + return err + } + + err = provider.configStore.Set(ctx, config, nil) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/types/alertmanagertypes/alert.go b/pkg/types/alertmanagertypes/alert.go index 99fa2a939f..6008a2a1d3 100644 --- a/pkg/types/alertmanagertypes/alert.go +++ b/pkg/types/alertmanagertypes/alert.go @@ -8,6 +8,7 @@ import ( "time" "github.com/go-openapi/runtime/middleware" + "github.com/go-openapi/strfmt" v2 "github.com/prometheus/alertmanager/api/v2" "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/api/v2/restapi/operations/alert" @@ -35,11 +36,34 @@ type ( // A slice of GettableAlert. GettableAlerts = models.GettableAlerts - - // An alias for the GettableAlertsParams type from the alertmanager package. - GettableAlertsParams = alert.GetAlertsParams ) +// An alias for the GettableAlertsParams type from the alertmanager package. +type GettableAlertsParams struct { + alert.GetAlertsParams + RawQuery string +} + +// Converts a slice of Alert to a slice of PostableAlert. +func NewPostableAlertsFromAlerts(alerts []*types.Alert) PostableAlerts { + postableAlerts := make(PostableAlerts, 0, len(alerts)) + for _, alert := range alerts { + start := strfmt.DateTime(alert.StartsAt) + end := strfmt.DateTime(alert.EndsAt) + postableAlerts = append(postableAlerts, &models.PostableAlert{ + Annotations: v2.ModelLabelSetToAPILabelSet(alert.Annotations), + EndsAt: end, + StartsAt: start, + Alert: models.Alert{ + GeneratorURL: strfmt.URI(alert.GeneratorURL), + Labels: v2.ModelLabelSetToAPILabelSet(alert.Labels), + }, + }) + } + + return postableAlerts +} + // Converts a slice of PostableAlert to a slice of Alert. func NewAlertsFromPostableAlerts(postableAlerts PostableAlerts, resolveTimeout time.Duration, now time.Time) ([]*types.Alert, []error) { alerts := v2.OpenAPIAlertsToAlerts(postableAlerts) @@ -109,7 +133,10 @@ func NewGettableAlertsParams(req *http.Request) (GettableAlertsParams, error) { return GettableAlertsParams{}, err } - return params, nil + return GettableAlertsParams{ + GetAlertsParams: params, + RawQuery: req.URL.RawQuery, + }, nil } func NewGettableAlertsFromAlertProvider( diff --git a/pkg/types/alertmanagertypes/config.go b/pkg/types/alertmanagertypes/config.go index 58c5750a7a..a1643845f9 100644 --- a/pkg/types/alertmanagertypes/config.go +++ b/pkg/types/alertmanagertypes/config.go @@ -260,7 +260,7 @@ func (c *Config) DeleteReceiver(name string) error { type ConfigStore interface { // Set creates or updates a config. - Set(context.Context, *Config) error + Set(context.Context, *Config, func(context.Context) error) error // Get returns the config for the given orgID Get(context.Context, string) (*Config, error)