mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-13 06:59:03 +08:00
feat(legacyalertmanager): add legacyalertmanager (#7156)
This commit is contained in:
parent
972a7a9dac
commit
639b9a5a8a
@ -22,4 +22,19 @@ type Alertmanager interface {
|
||||
|
||||
// TestReceiver sends a test alert to a receiver.
|
||||
TestReceiver(context.Context, string, alertmanagertypes.Receiver) error
|
||||
|
||||
// ListChannels lists all channels for the organization.
|
||||
ListChannels(context.Context, string) ([]*alertmanagertypes.Channel, error)
|
||||
|
||||
// GetChannelByID gets a channel for the organization.
|
||||
GetChannelByID(context.Context, string, int) (*alertmanagertypes.Channel, error)
|
||||
|
||||
// UpdateChannel updates a channel for the organization.
|
||||
UpdateChannelByReceiver(context.Context, string, alertmanagertypes.Receiver) error
|
||||
|
||||
// CreateChannel creates a channel for the organization.
|
||||
CreateChannel(context.Context, string, alertmanagertypes.Receiver) error
|
||||
|
||||
// DeleteChannelByID deletes a channel for the organization.
|
||||
DeleteChannelByID(context.Context, string, int) error
|
||||
}
|
||||
|
132
pkg/alertmanager/alertmanagerbatcher/batcher.go
Normal file
132
pkg/alertmanager/alertmanagerbatcher/batcher.go
Normal file
@ -0,0 +1,132 @@
|
||||
package alertmanagerbatcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"sync"
|
||||
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
// Notifier is responsible for dispatching alert notifications to an alertmanager.
|
||||
type Batcher struct {
|
||||
C chan alertmanagertypes.PostableAlerts
|
||||
// logger
|
||||
logger *slog.Logger
|
||||
|
||||
// queue of alerts to be sent to alertmanager
|
||||
queue alertmanagertypes.PostableAlerts
|
||||
|
||||
// config
|
||||
config Config
|
||||
|
||||
// more channel to signal the sender goroutine to send alerts
|
||||
moreC chan struct{}
|
||||
stopC chan struct{}
|
||||
mtx sync.RWMutex
|
||||
}
|
||||
|
||||
func New(logger *slog.Logger, config Config) *Batcher {
|
||||
batcher := &Batcher{
|
||||
logger: logger,
|
||||
queue: make(alertmanagertypes.PostableAlerts, config.Capacity),
|
||||
config: config,
|
||||
moreC: make(chan struct{}, 1),
|
||||
stopC: make(chan struct{}),
|
||||
C: make(chan alertmanagertypes.PostableAlerts, config.Size),
|
||||
}
|
||||
|
||||
return batcher
|
||||
}
|
||||
|
||||
// Start dispatches notifications continuously.
|
||||
func (n *Batcher) Start(ctx context.Context) error {
|
||||
go func() {
|
||||
n.logger.InfoContext(ctx, "starting alertmanager batcher")
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-n.stopC:
|
||||
return
|
||||
case <-n.moreC:
|
||||
}
|
||||
alerts := n.nextBatch()
|
||||
n.C <- alerts
|
||||
// If the queue still has items left, kick off the next iteration.
|
||||
if n.queueLen() > 0 {
|
||||
n.setMore()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Batcher) queueLen() int {
|
||||
n.mtx.RLock()
|
||||
defer n.mtx.RUnlock()
|
||||
|
||||
return len(n.queue)
|
||||
}
|
||||
|
||||
func (n *Batcher) nextBatch() alertmanagertypes.PostableAlerts {
|
||||
n.mtx.Lock()
|
||||
defer n.mtx.Unlock()
|
||||
|
||||
var alerts alertmanagertypes.PostableAlerts
|
||||
|
||||
if len(n.queue) > n.config.Size {
|
||||
alerts = append(make(alertmanagertypes.PostableAlerts, 0, n.config.Size), n.queue[:n.config.Size]...)
|
||||
n.queue = n.queue[n.config.Size:]
|
||||
} else {
|
||||
alerts = append(make(alertmanagertypes.PostableAlerts, 0, len(n.queue)), n.queue...)
|
||||
n.queue = n.queue[:0]
|
||||
}
|
||||
|
||||
return alerts
|
||||
}
|
||||
|
||||
// Send queues the given notification requests for processing.
|
||||
// Panics if called on a handler that is not running.
|
||||
func (n *Batcher) Send(ctx context.Context, alerts ...*alertmanagertypes.PostableAlert) {
|
||||
n.mtx.Lock()
|
||||
defer n.mtx.Unlock()
|
||||
|
||||
// Queue capacity should be significantly larger than a single alert
|
||||
// batch could be.
|
||||
if d := len(alerts) - n.config.Capacity; d > 0 {
|
||||
alerts = alerts[d:]
|
||||
n.logger.WarnContext(ctx, "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d)
|
||||
}
|
||||
|
||||
// If the queue is full, remove the oldest alerts in favor
|
||||
// of newer ones.
|
||||
if d := (len(n.queue) + len(alerts)) - n.config.Capacity; d > 0 {
|
||||
n.queue = n.queue[d:]
|
||||
|
||||
n.logger.WarnContext(ctx, "Alert notification queue full, dropping alerts", "num_dropped", d)
|
||||
}
|
||||
n.queue = append(n.queue, alerts...)
|
||||
|
||||
// Notify sending goroutine that there are alerts to be processed.
|
||||
n.setMore()
|
||||
}
|
||||
|
||||
// setMore signals that the alert queue has items.
|
||||
func (n *Batcher) setMore() {
|
||||
// If we cannot send on the channel, it means the signal already exists
|
||||
// and has not been consumed yet.
|
||||
select {
|
||||
case n.moreC <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Stop shuts down the notification handler.
|
||||
func (n *Batcher) Stop(ctx context.Context) {
|
||||
n.logger.InfoContext(ctx, "Stopping alertmanager batcher")
|
||||
close(n.moreC)
|
||||
close(n.stopC)
|
||||
}
|
13
pkg/alertmanager/alertmanagerbatcher/config.go
Normal file
13
pkg/alertmanager/alertmanagerbatcher/config.go
Normal file
@ -0,0 +1,13 @@
|
||||
package alertmanagerbatcher
|
||||
|
||||
type Config struct {
|
||||
Capacity int
|
||||
Size int
|
||||
}
|
||||
|
||||
func NewConfig() Config {
|
||||
return Config{
|
||||
Capacity: 1000,
|
||||
Size: 64,
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package server
|
||||
package alertmanagerserver
|
||||
|
||||
import (
|
||||
"net/url"
|
@ -1,4 +1,4 @@
|
||||
package server
|
||||
package alertmanagerserver
|
||||
|
||||
import (
|
||||
"context"
|
@ -1,4 +1,4 @@
|
||||
package server
|
||||
package alertmanagerserver
|
||||
|
||||
import (
|
||||
"bytes"
|
@ -45,7 +45,7 @@ func (store *config) Get(ctx context.Context, orgID string) (*alertmanagertypes.
|
||||
}
|
||||
|
||||
// Set implements alertmanagertypes.ConfigStore.
|
||||
func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config) error {
|
||||
func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config, cb func(context.Context) error) error {
|
||||
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -75,6 +75,12 @@ func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config)
|
||||
}
|
||||
}
|
||||
|
||||
if cb != nil {
|
||||
if err = cb(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -15,13 +15,11 @@ import (
|
||||
)
|
||||
|
||||
type API struct {
|
||||
configStore alertmanagertypes.ConfigStore
|
||||
alertmanager Alertmanager
|
||||
}
|
||||
|
||||
func NewAPI(configStore alertmanagertypes.ConfigStore, alertmanager Alertmanager) *API {
|
||||
func NewAPI(alertmanager Alertmanager) *API {
|
||||
return &API{
|
||||
configStore: configStore,
|
||||
alertmanager: alertmanager,
|
||||
}
|
||||
}
|
||||
@ -83,7 +81,7 @@ func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) {
|
||||
render.Success(rw, http.StatusNoContent, nil)
|
||||
}
|
||||
|
||||
func (api *API) GetChannels(req *http.Request, rw http.ResponseWriter) {
|
||||
func (api *API) ListChannels(req *http.Request, rw http.ResponseWriter) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@ -93,20 +91,13 @@ func (api *API) GetChannels(req *http.Request, rw http.ResponseWriter) {
|
||||
return
|
||||
}
|
||||
|
||||
config, err := api.configStore.Get(ctx, claims.OrgID)
|
||||
channels, err := api.alertmanager.ListChannels(ctx, claims.OrgID)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
|
||||
channelList := make([]*alertmanagertypes.Channel, 0, len(channels))
|
||||
for _, channel := range channels {
|
||||
channelList = append(channelList, channel)
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, channelList)
|
||||
render.Success(rw, http.StatusOK, channels)
|
||||
}
|
||||
|
||||
func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
@ -137,14 +128,7 @@ func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
return
|
||||
}
|
||||
|
||||
config, err := api.configStore.Get(ctx, claims.OrgID)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
channel, err := alertmanagertypes.GetChannelByID(channels, id)
|
||||
channel, err := api.alertmanager.GetChannelByID(ctx, claims.OrgID, id)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
@ -176,19 +160,7 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
return
|
||||
}
|
||||
|
||||
config, err := api.configStore.Get(ctx, claims.OrgID)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = config.UpdateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = api.configStore.Set(ctx, config)
|
||||
err = api.alertmanager.UpdateChannelByReceiver(ctx, claims.OrgID, receiver)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
@ -225,26 +197,39 @@ func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
return
|
||||
}
|
||||
|
||||
config, err := api.configStore.Get(ctx, claims.OrgID)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
channel, err := alertmanagertypes.GetChannelByID(channels, id)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = config.DeleteReceiver(channel.Name)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = api.configStore.Set(ctx, config)
|
||||
err = api.alertmanager.DeleteChannelByID(ctx, claims.OrgID, id)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusNoContent, nil)
|
||||
}
|
||||
|
||||
func (api *API) CreateChannel(req *http.Request, rw http.ResponseWriter) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
claims, ok := authtypes.ClaimsFromContext(ctx)
|
||||
if !ok {
|
||||
render.Error(rw, errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated"))
|
||||
return
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
defer req.Body.Close() //nolint:errcheck
|
||||
|
||||
receiver, err := alertmanagertypes.NewReceiver(string(body))
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = api.alertmanager.CreateChannel(ctx, claims.OrgID, receiver)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
|
@ -1,37 +1,46 @@
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager/server"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerserver"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
// Config is the config for the alertmanager server.
|
||||
server.Config `mapstructure:",squash"`
|
||||
alertmanagerserver.Config `mapstructure:",squash"`
|
||||
|
||||
// Provider is the provider for the alertmanager service.
|
||||
Provider string `mapstructure:"provider"`
|
||||
|
||||
// Internal is the internal alertmanager configuration.
|
||||
Internal Internal `mapstructure:"internal"`
|
||||
Signoz Signoz `mapstructure:"signoz"`
|
||||
|
||||
// Legacy is the legacy alertmanager configuration.
|
||||
Legacy Legacy `mapstructure:"legacy"`
|
||||
}
|
||||
|
||||
type Internal struct {
|
||||
type Signoz struct {
|
||||
// PollInterval is the interval at which the alertmanager is synced.
|
||||
PollInterval time.Duration `mapstructure:"poll_interval"`
|
||||
}
|
||||
|
||||
type Legacy struct {
|
||||
// URL is the URL of the legacy alertmanager.
|
||||
URL *url.URL `mapstructure:"url"`
|
||||
}
|
||||
|
||||
func NewConfigFactory() factory.ConfigFactory {
|
||||
return factory.NewConfigFactory(factory.MustNewName("alertmanager"), newConfig)
|
||||
}
|
||||
|
||||
func newConfig() factory.Config {
|
||||
return Config{
|
||||
Config: server.NewConfig(),
|
||||
Provider: "internal",
|
||||
Internal: Internal{
|
||||
Config: alertmanagerserver.NewConfig(),
|
||||
Provider: "signoz",
|
||||
Signoz: Signoz{
|
||||
PollInterval: 15 * time.Second,
|
||||
},
|
||||
}
|
||||
|
@ -1,70 +0,0 @@
|
||||
package internalalertmanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
service *alertmanager.Service
|
||||
config alertmanager.Config
|
||||
settings factory.ScopedProviderSettings
|
||||
}
|
||||
|
||||
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("internal"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanager.Config) (alertmanager.Alertmanager, error) {
|
||||
return New(ctx, settings, config, sqlstore)
|
||||
})
|
||||
}
|
||||
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore) (alertmanager.Alertmanager, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/internalalertmanager")
|
||||
return &provider{
|
||||
service: alertmanager.New(
|
||||
ctx,
|
||||
settings,
|
||||
config,
|
||||
sqlalertmanagerstore.NewStateStore(sqlstore),
|
||||
sqlalertmanagerstore.NewConfigStore(sqlstore),
|
||||
),
|
||||
settings: settings,
|
||||
config: config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (provider *provider) Start(ctx context.Context) error {
|
||||
ticker := time.NewTicker(provider.config.Internal.PollInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
if err := provider.service.SyncServers(ctx); err != nil {
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to sync alertmanager servers", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *provider) Stop(ctx context.Context) error {
|
||||
return provider.service.Stop(ctx)
|
||||
}
|
||||
|
||||
func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
|
||||
return provider.service.GetAlerts(ctx, orgID, params)
|
||||
}
|
||||
|
||||
func (provider *provider) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error {
|
||||
return provider.service.PutAlerts(ctx, orgID, alerts)
|
||||
}
|
||||
|
||||
func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||
return provider.service.TestReceiver(ctx, orgID, receiver)
|
||||
}
|
342
pkg/alertmanager/legacyalertmanager/provider.go
Normal file
342
pkg/alertmanager/legacyalertmanager/provider.go
Normal file
@ -0,0 +1,342 @@
|
||||
package legacyalertmanager
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerbatcher"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
const (
|
||||
alertsPath string = "/v1/alerts"
|
||||
routesPath string = "/v1/routes"
|
||||
testReceiverPath string = "/v1/testReceiver"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
config alertmanager.Config
|
||||
settings factory.ScopedProviderSettings
|
||||
client *http.Client
|
||||
configStore alertmanagertypes.ConfigStore
|
||||
batcher *alertmanagerbatcher.Batcher
|
||||
}
|
||||
|
||||
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("legacy"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanager.Config) (alertmanager.Alertmanager, error) {
|
||||
return New(ctx, settings, config, sqlstore)
|
||||
})
|
||||
}
|
||||
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore) (*provider, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager")
|
||||
configStore := sqlalertmanagerstore.NewConfigStore(sqlstore)
|
||||
|
||||
return &provider{
|
||||
config: config,
|
||||
settings: settings,
|
||||
client: &http.Client{
|
||||
Timeout: 30 * time.Second,
|
||||
},
|
||||
configStore: configStore,
|
||||
batcher: alertmanagerbatcher.New(settings.Logger(), alertmanagerbatcher.NewConfig()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (provider *provider) Start(ctx context.Context) error {
|
||||
err := provider.batcher.Start(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer provider.batcher.Stop(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case alerts := <-provider.batcher.C:
|
||||
if err := provider.putAlerts(ctx, "", alerts); err != nil {
|
||||
provider.settings.Logger().Error("failed to send alerts to alertmanager", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
|
||||
url := provider.config.Legacy.URL.JoinPath(alertsPath)
|
||||
url.RawQuery = params.RawQuery
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
|
||||
resp, err := provider.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer resp.Body.Close() //nolint:errcheck
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var alerts alertmanagertypes.GettableAlerts
|
||||
if err := json.Unmarshal(body, &alerts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return alerts, nil
|
||||
}
|
||||
|
||||
func (provider *provider) PutAlerts(ctx context.Context, _ string, alerts alertmanagertypes.PostableAlerts) error {
|
||||
provider.batcher.Send(ctx, alerts...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) putAlerts(ctx context.Context, _ string, alerts alertmanagertypes.PostableAlerts) error {
|
||||
url := provider.config.Legacy.URL.JoinPath(alertsPath)
|
||||
|
||||
body, err := json.Marshal(alerts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
|
||||
resp, err := provider.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer resp.Body.Close() //nolint:errcheck
|
||||
|
||||
// Any HTTP status 2xx is OK.
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return fmt.Errorf("bad response status %v", resp.Status)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||
url := provider.config.Legacy.URL.JoinPath(testReceiverPath)
|
||||
|
||||
body, err := json.Marshal(receiver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
|
||||
resp, err := provider.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer resp.Body.Close() //nolint:errcheck
|
||||
|
||||
// Any HTTP status 2xx is OK.
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return fmt.Errorf("bad response status %v", resp.Status)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
channelList := make([]*alertmanagertypes.Channel, 0, len(channels))
|
||||
for _, channel := range channels {
|
||||
channelList = append(channelList, channel)
|
||||
}
|
||||
|
||||
return channelList, nil
|
||||
}
|
||||
|
||||
func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
channel, err := alertmanagertypes.GetChannelByID(channels, channelID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return channel, nil
|
||||
}
|
||||
|
||||
func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = config.UpdateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = provider.configStore.Set(ctx, config, func(ctx context.Context) error {
|
||||
url := provider.config.Legacy.URL.JoinPath(routesPath)
|
||||
|
||||
body, err := json.Marshal(receiver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url.String(), bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
|
||||
resp, err := provider.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer resp.Body.Close() //nolint:errcheck
|
||||
|
||||
// Any HTTP status 2xx is OK.
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return fmt.Errorf("bad response status %v", resp.Status)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) CreateChannel(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = config.CreateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = provider.configStore.Set(ctx, config, func(ctx context.Context) error {
|
||||
url := provider.config.Legacy.URL.JoinPath(routesPath)
|
||||
|
||||
body, err := json.Marshal(receiver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
|
||||
resp, err := provider.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer resp.Body.Close() //nolint:errcheck
|
||||
|
||||
// Any HTTP status 2xx is OK.
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return fmt.Errorf("bad response status %v", resp.Status)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID int) error {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
channel, err := alertmanagertypes.GetChannelByID(channels, channelID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = config.DeleteReceiver(channel.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = provider.configStore.Set(ctx, config, func(ctx context.Context) error {
|
||||
url := provider.config.Legacy.URL.JoinPath(routesPath)
|
||||
|
||||
body, err := json.Marshal(map[string]string{"name": channel.Name})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url.String(), bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
|
||||
resp, err := provider.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer resp.Body.Close() //nolint:errcheck
|
||||
|
||||
// Any HTTP status 2xx is OK.
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return fmt.Errorf("bad response status %v", resp.Status)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) Stop(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
@ -4,7 +4,7 @@ import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager/server"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerserver"
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
@ -24,7 +24,7 @@ type Service struct {
|
||||
settings factory.ScopedProviderSettings
|
||||
|
||||
// Map of organization id to alertmanager server
|
||||
servers map[string]*server.Server
|
||||
servers map[string]*alertmanagerserver.Server
|
||||
|
||||
// Mutex to protect the servers map
|
||||
serversMtx sync.RWMutex
|
||||
@ -36,7 +36,7 @@ func New(ctx context.Context, settings factory.ScopedProviderSettings, config Co
|
||||
stateStore: stateStore,
|
||||
configStore: configStore,
|
||||
settings: settings,
|
||||
servers: make(map[string]*server.Server),
|
||||
servers: make(map[string]*alertmanagerserver.Server),
|
||||
serversMtx: sync.RWMutex{},
|
||||
}
|
||||
|
||||
@ -57,7 +57,7 @@ func (service *Service) SyncServers(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
service.servers[orgID], err = server.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), server.Config{}, orgID, service.stateStore)
|
||||
service.servers[orgID], err = alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config.Config, orgID, service.stateStore)
|
||||
if err != nil {
|
||||
service.settings.Logger().Error("failed to create alertmanagerserver", "orgID", orgID, "error", err)
|
||||
continue
|
||||
@ -127,7 +127,7 @@ func (service *Service) getConfig(ctx context.Context, orgID string) (*alertmana
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func (service *Service) getServer(orgID string) (*server.Server, error) {
|
||||
func (service *Service) getServer(orgID string) (*alertmanagerserver.Server, error) {
|
||||
server, ok := service.servers[orgID]
|
||||
if !ok {
|
||||
return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for org %s", orgID)
|
||||
|
170
pkg/alertmanager/signozalertmanager/provider.go
Normal file
170
pkg/alertmanager/signozalertmanager/provider.go
Normal file
@ -0,0 +1,170 @@
|
||||
package signozalertmanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
service *alertmanager.Service
|
||||
config alertmanager.Config
|
||||
settings factory.ScopedProviderSettings
|
||||
configStore alertmanagertypes.ConfigStore
|
||||
stateStore alertmanagertypes.StateStore
|
||||
}
|
||||
|
||||
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanager.Config) (alertmanager.Alertmanager, error) {
|
||||
return New(ctx, settings, config, sqlstore)
|
||||
})
|
||||
}
|
||||
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore) (*provider, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/internalalertmanager")
|
||||
configStore := sqlalertmanagerstore.NewConfigStore(sqlstore)
|
||||
stateStore := sqlalertmanagerstore.NewStateStore(sqlstore)
|
||||
|
||||
return &provider{
|
||||
service: alertmanager.New(
|
||||
ctx,
|
||||
settings,
|
||||
config,
|
||||
stateStore,
|
||||
configStore,
|
||||
),
|
||||
settings: settings,
|
||||
config: config,
|
||||
configStore: configStore,
|
||||
stateStore: stateStore,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (provider *provider) Start(ctx context.Context) error {
|
||||
ticker := time.NewTicker(provider.config.Signoz.PollInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
if err := provider.service.SyncServers(ctx); err != nil {
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to sync alertmanager servers", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *provider) Stop(ctx context.Context) error {
|
||||
return provider.service.Stop(ctx)
|
||||
}
|
||||
|
||||
func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
|
||||
return provider.service.GetAlerts(ctx, orgID, params)
|
||||
}
|
||||
|
||||
func (provider *provider) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error {
|
||||
return provider.service.PutAlerts(ctx, orgID, alerts)
|
||||
}
|
||||
|
||||
func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||
return provider.service.TestReceiver(ctx, orgID, receiver)
|
||||
}
|
||||
|
||||
func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*alertmanagertypes.Channel, error) {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
channelList := make([]*alertmanagertypes.Channel, 0, len(channels))
|
||||
for _, channel := range channels {
|
||||
channelList = append(channelList, channel)
|
||||
}
|
||||
|
||||
return channelList, nil
|
||||
}
|
||||
|
||||
func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
channel, err := alertmanagertypes.GetChannelByID(channels, channelID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return channel, nil
|
||||
}
|
||||
|
||||
func (provider *provider) UpdateChannelByReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = config.UpdateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = provider.configStore.Set(ctx, config, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) DeleteChannelByID(ctx context.Context, orgID string, channelID int) error {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
channel, err := alertmanagertypes.GetChannelByID(channels, channelID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = config.DeleteReceiver(channel.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = provider.configStore.Set(ctx, config, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) CreateChannel(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||
config, err := provider.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = config.CreateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = provider.configStore.Set(ctx, config, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/runtime/middleware"
|
||||
"github.com/go-openapi/strfmt"
|
||||
v2 "github.com/prometheus/alertmanager/api/v2"
|
||||
"github.com/prometheus/alertmanager/api/v2/models"
|
||||
"github.com/prometheus/alertmanager/api/v2/restapi/operations/alert"
|
||||
@ -35,11 +36,34 @@ type (
|
||||
|
||||
// A slice of GettableAlert.
|
||||
GettableAlerts = models.GettableAlerts
|
||||
|
||||
// An alias for the GettableAlertsParams type from the alertmanager package.
|
||||
GettableAlertsParams = alert.GetAlertsParams
|
||||
)
|
||||
|
||||
// An alias for the GettableAlertsParams type from the alertmanager package.
|
||||
type GettableAlertsParams struct {
|
||||
alert.GetAlertsParams
|
||||
RawQuery string
|
||||
}
|
||||
|
||||
// Converts a slice of Alert to a slice of PostableAlert.
|
||||
func NewPostableAlertsFromAlerts(alerts []*types.Alert) PostableAlerts {
|
||||
postableAlerts := make(PostableAlerts, 0, len(alerts))
|
||||
for _, alert := range alerts {
|
||||
start := strfmt.DateTime(alert.StartsAt)
|
||||
end := strfmt.DateTime(alert.EndsAt)
|
||||
postableAlerts = append(postableAlerts, &models.PostableAlert{
|
||||
Annotations: v2.ModelLabelSetToAPILabelSet(alert.Annotations),
|
||||
EndsAt: end,
|
||||
StartsAt: start,
|
||||
Alert: models.Alert{
|
||||
GeneratorURL: strfmt.URI(alert.GeneratorURL),
|
||||
Labels: v2.ModelLabelSetToAPILabelSet(alert.Labels),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return postableAlerts
|
||||
}
|
||||
|
||||
// Converts a slice of PostableAlert to a slice of Alert.
|
||||
func NewAlertsFromPostableAlerts(postableAlerts PostableAlerts, resolveTimeout time.Duration, now time.Time) ([]*types.Alert, []error) {
|
||||
alerts := v2.OpenAPIAlertsToAlerts(postableAlerts)
|
||||
@ -109,7 +133,10 @@ func NewGettableAlertsParams(req *http.Request) (GettableAlertsParams, error) {
|
||||
return GettableAlertsParams{}, err
|
||||
}
|
||||
|
||||
return params, nil
|
||||
return GettableAlertsParams{
|
||||
GetAlertsParams: params,
|
||||
RawQuery: req.URL.RawQuery,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewGettableAlertsFromAlertProvider(
|
||||
|
@ -260,7 +260,7 @@ func (c *Config) DeleteReceiver(name string) error {
|
||||
|
||||
type ConfigStore interface {
|
||||
// Set creates or updates a config.
|
||||
Set(context.Context, *Config) error
|
||||
Set(context.Context, *Config, func(context.Context) error) error
|
||||
|
||||
// Get returns the config for the given orgID
|
||||
Get(context.Context, string) (*Config, error)
|
||||
|
Loading…
x
Reference in New Issue
Block a user