mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-14 04:45:57 +08:00
feat(alertmanager): add service for alertmanager (#7136)
### Summary - adds an alertmanager service
This commit is contained in:
parent
7e1301b8d2
commit
918c8942c4
25
pkg/alertmanager/alertmanager.go
Normal file
25
pkg/alertmanager/alertmanager.go
Normal file
@ -0,0 +1,25 @@
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeAlertmanagerNotFound = errors.MustNewCode("alertmanager_not_found")
|
||||
)
|
||||
|
||||
type Alertmanager interface {
|
||||
factory.Service
|
||||
// GetAlerts gets the alerts from the alertmanager per organization.
|
||||
GetAlerts(context.Context, string, alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error)
|
||||
|
||||
// PutAlerts puts the alerts into the alertmanager per organization.
|
||||
PutAlerts(context.Context, string, alertmanagertypes.PostableAlerts) error
|
||||
|
||||
// TestReceiver sends a test alert to a receiver.
|
||||
TestReceiver(context.Context, string, alertmanagertypes.Receiver) error
|
||||
}
|
@ -0,0 +1,100 @@
|
||||
package sqlalertmanagerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewConfigStore(sqlstore sqlstore.SQLStore) alertmanagertypes.ConfigStore {
|
||||
return &config{sqlstore: sqlstore}
|
||||
}
|
||||
|
||||
// Get implements alertmanagertypes.ConfigStore.
|
||||
func (store *config) Get(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) {
|
||||
storeableConfig := new(alertmanagertypes.StoreableConfig)
|
||||
|
||||
err := store.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(storeableConfig).
|
||||
Where("org_id = ?", orgID).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerConfigNotFound, "cannot find alertmanager config for orgID %s", orgID)
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg, err := alertmanagertypes.NewConfigFromStoreableConfig(storeableConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// Set implements alertmanagertypes.ConfigStore.
|
||||
func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config) error {
|
||||
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer tx.Rollback() //nolint:errcheck
|
||||
|
||||
if _, err = tx.
|
||||
NewInsert().
|
||||
Model(config.StoreableConfig()).
|
||||
On("CONFLICT (org_id) DO UPDATE").
|
||||
Set("config = ?", string(config.StoreableConfig().Config)).
|
||||
Set("updated_at = ?", config.StoreableConfig().UpdatedAt).
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
if len(channels) != 0 {
|
||||
if _, err = tx.NewInsert().
|
||||
Model(&channels).
|
||||
On("CONFLICT (name) DO UPDATE").
|
||||
Set("data = EXCLUDED.data").
|
||||
Set("updated_at = EXCLUDED.updated_at").
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *config) ListOrgs(ctx context.Context) ([]string, error) {
|
||||
var orgIDs []string
|
||||
|
||||
err := store.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Table("organizations").
|
||||
ColumnExpr("id").
|
||||
Scan(ctx, &orgIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return orgIDs, nil
|
||||
}
|
@ -0,0 +1,69 @@
|
||||
package sqlalertmanagerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
type state struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewStateStore(sqlstore sqlstore.SQLStore) alertmanagertypes.StateStore {
|
||||
return &state{sqlstore: sqlstore}
|
||||
}
|
||||
|
||||
// Get implements alertmanagertypes.StateStore.
|
||||
func (store *state) Get(ctx context.Context, orgID string) (*alertmanagertypes.StoreableState, error) {
|
||||
storeableState := new(alertmanagertypes.StoreableState)
|
||||
|
||||
err := store.
|
||||
sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(storeableState).
|
||||
Where("org_id = ?", orgID).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerStateNotFound, "cannot find alertmanager state for org %s", orgID)
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return storeableState, nil
|
||||
}
|
||||
|
||||
// Set implements alertmanagertypes.StateStore.
|
||||
func (store *state) Set(ctx context.Context, orgID string, storeableState *alertmanagertypes.StoreableState) error {
|
||||
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer tx.Rollback() //nolint:errcheck
|
||||
|
||||
_, err = tx.
|
||||
NewInsert().
|
||||
Model(storeableState).
|
||||
On("CONFLICT (org_id) DO UPDATE").
|
||||
Set("silences = EXCLUDED.silences").
|
||||
Set("nflog = EXCLUDED.nflog").
|
||||
Set("updated_at = EXCLUDED.updated_at").
|
||||
Where("org_id = ?", orgID).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
254
pkg/alertmanager/api.go
Normal file
254
pkg/alertmanager/api.go
Normal file
@ -0,0 +1,254 @@
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/http/render"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
"go.signoz.io/signoz/pkg/types/authtypes"
|
||||
)
|
||||
|
||||
type API struct {
|
||||
configStore alertmanagertypes.ConfigStore
|
||||
alertmanager Alertmanager
|
||||
}
|
||||
|
||||
func NewAPI(configStore alertmanagertypes.ConfigStore, alertmanager Alertmanager) *API {
|
||||
return &API{
|
||||
configStore: configStore,
|
||||
alertmanager: alertmanager,
|
||||
}
|
||||
}
|
||||
|
||||
func (api *API) GetAlerts(req *http.Request, rw http.ResponseWriter) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
claims, ok := authtypes.ClaimsFromContext(ctx)
|
||||
if !ok {
|
||||
render.Error(rw, errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated"))
|
||||
return
|
||||
}
|
||||
|
||||
params, err := alertmanagertypes.NewGettableAlertsParams(req)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
alerts, err := api.alertmanager.GetAlerts(ctx, claims.OrgID, params)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, alerts)
|
||||
}
|
||||
|
||||
func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
claims, ok := authtypes.ClaimsFromContext(ctx)
|
||||
if !ok {
|
||||
render.Error(rw, errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated"))
|
||||
return
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
defer req.Body.Close() //nolint:errcheck
|
||||
|
||||
receiver, err := alertmanagertypes.NewReceiver(string(body))
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = api.alertmanager.TestReceiver(ctx, claims.OrgID, receiver)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusNoContent, nil)
|
||||
}
|
||||
|
||||
func (api *API) GetChannels(req *http.Request, rw http.ResponseWriter) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
claims, ok := authtypes.ClaimsFromContext(ctx)
|
||||
if !ok {
|
||||
render.Error(rw, errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated"))
|
||||
return
|
||||
}
|
||||
|
||||
config, err := api.configStore.Get(ctx, claims.OrgID)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
|
||||
channelList := make([]*alertmanagertypes.Channel, 0, len(channels))
|
||||
for _, channel := range channels {
|
||||
channelList = append(channelList, channel)
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, channelList)
|
||||
}
|
||||
|
||||
func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
claims, ok := authtypes.ClaimsFromContext(ctx)
|
||||
if !ok {
|
||||
render.Error(rw, errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated"))
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(req)
|
||||
if vars == nil {
|
||||
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path"))
|
||||
return
|
||||
}
|
||||
|
||||
idString, ok := vars["id"]
|
||||
if !ok {
|
||||
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path"))
|
||||
return
|
||||
}
|
||||
|
||||
id, err := strconv.Atoi(idString)
|
||||
if err != nil {
|
||||
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid integer"))
|
||||
return
|
||||
}
|
||||
|
||||
config, err := api.configStore.Get(ctx, claims.OrgID)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
channel, err := alertmanagertypes.GetChannelByID(channels, id)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, channel)
|
||||
}
|
||||
|
||||
func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
claims, ok := authtypes.ClaimsFromContext(ctx)
|
||||
if !ok {
|
||||
render.Error(rw, errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated"))
|
||||
return
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
defer req.Body.Close() //nolint:errcheck
|
||||
|
||||
receiver, err := alertmanagertypes.NewReceiver(string(body))
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
config, err := api.configStore.Get(ctx, claims.OrgID)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = config.UpdateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = api.configStore.Set(ctx, config)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusNoContent, nil)
|
||||
}
|
||||
|
||||
func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
claims, ok := authtypes.ClaimsFromContext(ctx)
|
||||
if !ok {
|
||||
render.Error(rw, errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated"))
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(req)
|
||||
if vars == nil {
|
||||
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path"))
|
||||
return
|
||||
}
|
||||
|
||||
idString, ok := vars["id"]
|
||||
if !ok {
|
||||
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path"))
|
||||
return
|
||||
}
|
||||
|
||||
id, err := strconv.Atoi(idString)
|
||||
if err != nil {
|
||||
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid integer"))
|
||||
return
|
||||
}
|
||||
|
||||
config, err := api.configStore.Get(ctx, claims.OrgID)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
channels := config.Channels()
|
||||
channel, err := alertmanagertypes.GetChannelByID(channels, id)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = config.DeleteReceiver(channel.Name)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = api.configStore.Set(ctx, config)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusNoContent, nil)
|
||||
}
|
42
pkg/alertmanager/config.go
Normal file
42
pkg/alertmanager/config.go
Normal file
@ -0,0 +1,42 @@
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager/server"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
// Config is the config for the alertmanager server.
|
||||
server.Config `mapstructure:",squash"`
|
||||
|
||||
// Provider is the provider for the alertmanager service.
|
||||
Provider string `mapstructure:"provider"`
|
||||
|
||||
// Internal is the internal alertmanager configuration.
|
||||
Internal Internal `mapstructure:"internal"`
|
||||
}
|
||||
|
||||
type Internal struct {
|
||||
// PollInterval is the interval at which the alertmanager is synced.
|
||||
PollInterval time.Duration `mapstructure:"poll_interval"`
|
||||
}
|
||||
|
||||
func NewConfigFactory() factory.ConfigFactory {
|
||||
return factory.NewConfigFactory(factory.MustNewName("alertmanager"), newConfig)
|
||||
}
|
||||
|
||||
func newConfig() factory.Config {
|
||||
return Config{
|
||||
Config: server.NewConfig(),
|
||||
Provider: "internal",
|
||||
Internal: Internal{
|
||||
PollInterval: 15 * time.Second,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c Config) Validate() error {
|
||||
return nil
|
||||
}
|
70
pkg/alertmanager/internalalertmanager/provider.go
Normal file
70
pkg/alertmanager/internalalertmanager/provider.go
Normal file
@ -0,0 +1,70 @@
|
||||
package internalalertmanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager"
|
||||
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/sqlstore"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
type provider struct {
|
||||
service *alertmanager.Service
|
||||
config alertmanager.Config
|
||||
settings factory.ScopedProviderSettings
|
||||
}
|
||||
|
||||
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("internal"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanager.Config) (alertmanager.Alertmanager, error) {
|
||||
return New(ctx, settings, config, sqlstore)
|
||||
})
|
||||
}
|
||||
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore) (alertmanager.Alertmanager, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/internalalertmanager")
|
||||
return &provider{
|
||||
service: alertmanager.New(
|
||||
ctx,
|
||||
settings,
|
||||
config,
|
||||
sqlalertmanagerstore.NewStateStore(sqlstore),
|
||||
sqlalertmanagerstore.NewConfigStore(sqlstore),
|
||||
),
|
||||
settings: settings,
|
||||
config: config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (provider *provider) Start(ctx context.Context) error {
|
||||
ticker := time.NewTicker(provider.config.Internal.PollInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
if err := provider.service.SyncServers(ctx); err != nil {
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to sync alertmanager servers", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *provider) Stop(ctx context.Context) error {
|
||||
return provider.service.Stop(ctx)
|
||||
}
|
||||
|
||||
func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
|
||||
return provider.service.GetAlerts(ctx, orgID, params)
|
||||
}
|
||||
|
||||
func (provider *provider) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error {
|
||||
return provider.service.PutAlerts(ctx, orgID, alerts)
|
||||
}
|
||||
|
||||
func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||
return provider.service.TestReceiver(ctx, orgID, receiver)
|
||||
}
|
@ -34,7 +34,7 @@ type Server struct {
|
||||
logger *slog.Logger
|
||||
|
||||
// registry is the prometheus registry for the alertmanager
|
||||
registry *prometheus.Registry
|
||||
registry prometheus.Registerer
|
||||
|
||||
// srvConfig is the server config for the alertmanager
|
||||
srvConfig Config
|
||||
@ -64,7 +64,7 @@ type Server struct {
|
||||
stopc chan struct{}
|
||||
}
|
||||
|
||||
func New(ctx context.Context, logger *slog.Logger, registry *prometheus.Registry, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore) (*Server, error) {
|
||||
func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore) (*Server, error) {
|
||||
server := &Server{
|
||||
logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/server"),
|
||||
registry: registry,
|
||||
@ -77,20 +77,18 @@ func New(ctx context.Context, logger *slog.Logger, registry *prometheus.Registry
|
||||
server.marker = alertmanagertypes.NewMarker(server.registry)
|
||||
|
||||
// get silences for initial state
|
||||
silencesstate, err := server.stateStore.Get(ctx, server.orgID, alertmanagertypes.SilenceStateName)
|
||||
state, err := server.stateStore.Get(ctx, server.orgID)
|
||||
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// get nflog for initial state
|
||||
nflogstate, err := server.stateStore.Get(ctx, server.orgID, alertmanagertypes.NFLogStateName)
|
||||
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||
return nil, err
|
||||
silencesSnapshot := ""
|
||||
if state != nil {
|
||||
silencesSnapshot = state.Silences
|
||||
}
|
||||
|
||||
// Initialize silences
|
||||
server.silences, err = silence.New(silence.Options{
|
||||
SnapshotReader: strings.NewReader(silencesstate),
|
||||
SnapshotReader: strings.NewReader(silencesSnapshot),
|
||||
Retention: srvConfig.Silences.Retention,
|
||||
Limits: silence.Limits{
|
||||
MaxSilences: func() int { return srvConfig.Silences.Max },
|
||||
@ -103,9 +101,14 @@ func New(ctx context.Context, logger *slog.Logger, registry *prometheus.Registry
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nflogSnapshot := ""
|
||||
if state != nil {
|
||||
nflogSnapshot = state.NFLog
|
||||
}
|
||||
|
||||
// Initialize notification log
|
||||
server.nflog, err = nflog.New(nflog.Options{
|
||||
SnapshotReader: strings.NewReader(nflogstate),
|
||||
SnapshotReader: strings.NewReader(nflogSnapshot),
|
||||
Retention: server.srvConfig.NFLog.Retention,
|
||||
Metrics: server.registry,
|
||||
Logger: server.logger,
|
||||
@ -125,7 +128,21 @@ func New(ctx context.Context, logger *slog.Logger, registry *prometheus.Registry
|
||||
// Don't return here - we need to snapshot our state first.
|
||||
}
|
||||
|
||||
return server.stateStore.Set(ctx, server.orgID, alertmanagertypes.SilenceStateName, server.silences)
|
||||
state, err := server.stateStore.Get(ctx, server.orgID)
|
||||
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if state == nil {
|
||||
state = alertmanagertypes.NewStoreableState(server.orgID)
|
||||
}
|
||||
|
||||
c, err := state.Set(alertmanagertypes.SilenceStateName, server.silences)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return c, server.stateStore.Set(ctx, server.orgID, state)
|
||||
})
|
||||
|
||||
}()
|
||||
@ -140,7 +157,21 @@ func New(ctx context.Context, logger *slog.Logger, registry *prometheus.Registry
|
||||
// Don't return without saving the current state.
|
||||
}
|
||||
|
||||
return server.stateStore.Set(ctx, server.orgID, alertmanagertypes.NFLogStateName, server.nflog)
|
||||
state, err := server.stateStore.Get(ctx, server.orgID)
|
||||
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if state == nil {
|
||||
state = alertmanagertypes.NewStoreableState(server.orgID)
|
||||
}
|
||||
|
||||
c, err := state.Set(alertmanagertypes.NFLogStateName, server.nflog)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return c, server.stateStore.Set(ctx, server.orgID, state)
|
||||
})
|
||||
}()
|
||||
|
||||
|
137
pkg/alertmanager/service.go
Normal file
137
pkg/alertmanager/service.go
Normal file
@ -0,0 +1,137 @@
|
||||
package alertmanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"go.signoz.io/signoz/pkg/alertmanager/server"
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/factory"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
// config is the config for the alertmanager service
|
||||
config Config
|
||||
|
||||
// stateStore is the state store for the alertmanager service
|
||||
stateStore alertmanagertypes.StateStore
|
||||
|
||||
// configStore is the config store for the alertmanager service
|
||||
configStore alertmanagertypes.ConfigStore
|
||||
|
||||
// settings is the settings for the alertmanager service
|
||||
settings factory.ScopedProviderSettings
|
||||
|
||||
// Map of organization id to alertmanager server
|
||||
servers map[string]*server.Server
|
||||
|
||||
// Mutex to protect the servers map
|
||||
serversMtx sync.RWMutex
|
||||
}
|
||||
|
||||
func New(ctx context.Context, settings factory.ScopedProviderSettings, config Config, stateStore alertmanagertypes.StateStore, configStore alertmanagertypes.ConfigStore) *Service {
|
||||
service := &Service{
|
||||
config: config,
|
||||
stateStore: stateStore,
|
||||
configStore: configStore,
|
||||
settings: settings,
|
||||
servers: make(map[string]*server.Server),
|
||||
serversMtx: sync.RWMutex{},
|
||||
}
|
||||
|
||||
return service
|
||||
}
|
||||
|
||||
func (service *Service) SyncServers(ctx context.Context) error {
|
||||
orgIDs, err := service.configStore.ListOrgs(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
service.serversMtx.Lock()
|
||||
for _, orgID := range orgIDs {
|
||||
config, err := service.getConfig(ctx, orgID)
|
||||
if err != nil {
|
||||
service.settings.Logger().Error("failed to get alertmanagerconfig for org", "orgID", orgID, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
service.servers[orgID], err = server.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), server.Config{}, orgID, service.stateStore)
|
||||
if err != nil {
|
||||
service.settings.Logger().Error("failed to create alertmanagerserver", "orgID", orgID, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
err = service.servers[orgID].SetConfig(ctx, config)
|
||||
if err != nil {
|
||||
service.settings.Logger().Error("failed to set config for alertmanager server", "orgID", orgID, "error", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
service.serversMtx.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (service *Service) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
|
||||
server, err := service.getServer(orgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return server.GetAlerts(ctx, params)
|
||||
}
|
||||
|
||||
func (service *Service) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error {
|
||||
server, err := service.getServer(orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return server.PutAlerts(ctx, alerts)
|
||||
}
|
||||
|
||||
func (service *Service) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||
server, err := service.getServer(orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return server.TestReceiver(ctx, receiver)
|
||||
}
|
||||
|
||||
func (service *Service) Stop(ctx context.Context) error {
|
||||
for _, server := range service.servers {
|
||||
server.Stop(ctx)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (service *Service) getConfig(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) {
|
||||
config, err := service.configStore.Get(ctx, orgID)
|
||||
if err != nil {
|
||||
if !errors.Ast(err, errors.TypeNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
config, err = alertmanagertypes.NewDefaultConfig(service.config.Global, service.config.Route, orgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return config, err
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func (service *Service) getServer(orgID string) (*server.Server, error) {
|
||||
server, ok := service.servers[orgID]
|
||||
if !ok {
|
||||
return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for org %s", orgID)
|
||||
}
|
||||
|
||||
return server, nil
|
||||
}
|
@ -5,6 +5,16 @@ import (
|
||||
"regexp"
|
||||
)
|
||||
|
||||
var (
|
||||
CodeInvalidInput code = code{"invalid_input"}
|
||||
CodeInternal = code{"internal"}
|
||||
CodeUnsupported = code{"unsupported"}
|
||||
CodeNotFound = code{"not_found"}
|
||||
CodeMethodNotAllowed = code{"method_not_allowed"}
|
||||
CodeAlreadyExists = code{"already_exists"}
|
||||
CodeUnauthenticated = code{"unauthenticated"}
|
||||
)
|
||||
|
||||
var (
|
||||
codeRegex = regexp.MustCompile(`^[a-z_]+$`)
|
||||
)
|
||||
|
@ -2,54 +2,36 @@ package alertmanagertypestest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"sync"
|
||||
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
|
||||
)
|
||||
|
||||
var _ alertmanagertypes.StateStore = (*StateStore)(nil)
|
||||
|
||||
type StateStore struct {
|
||||
states map[string]map[string]string
|
||||
states map[string]*alertmanagertypes.StoreableState
|
||||
mtx sync.RWMutex
|
||||
}
|
||||
|
||||
func NewStateStore() *StateStore {
|
||||
return &StateStore{
|
||||
states: make(map[string]map[string]string),
|
||||
states: make(map[string]*alertmanagertypes.StoreableState),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StateStore) Set(ctx context.Context, orgID string, stateName alertmanagertypes.StateName, state alertmanagertypes.State) (int64, error) {
|
||||
if _, ok := s.states[orgID]; !ok {
|
||||
s.states[orgID] = make(map[string]string)
|
||||
}
|
||||
|
||||
bytes, err := state.MarshalBinary()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
func (s *StateStore) Set(ctx context.Context, orgID string, storeableState *alertmanagertypes.StoreableState) error {
|
||||
s.mtx.Lock()
|
||||
s.states[orgID][stateName.String()] = base64.StdEncoding.EncodeToString(bytes)
|
||||
s.states[orgID] = storeableState
|
||||
s.mtx.Unlock()
|
||||
return int64(len(bytes)), nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StateStore) Get(ctx context.Context, orgID string, stateName alertmanagertypes.StateName) (string, error) {
|
||||
func (s *StateStore) Get(ctx context.Context, orgID string) (*alertmanagertypes.StoreableState, error) {
|
||||
if _, ok := s.states[orgID]; !ok {
|
||||
return "", errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerStateNotFound, "state %q for orgID %q not found", stateName.String(), orgID)
|
||||
return nil, errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerStateNotFound, "state for orgID %q not found", orgID)
|
||||
}
|
||||
|
||||
state, ok := s.states[orgID][stateName.String()]
|
||||
if !ok {
|
||||
return "", errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerStateNotFound, "state %q for orgID %q not found", stateName.String(), orgID)
|
||||
}
|
||||
|
||||
bytes, err := base64.StdEncoding.DecodeString(state)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(bytes), nil
|
||||
return s.states[orgID], nil
|
||||
}
|
||||
|
@ -8,6 +8,11 @@ import (
|
||||
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
"github.com/uptrace/bun"
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeAlertmanagerChannelNotFound = errors.MustNewCode("alertmanager_channel_not_found")
|
||||
)
|
||||
|
||||
var (
|
||||
@ -137,3 +142,13 @@ func NewConfigFromChannels(globalConfig GlobalConfig, routeConfig RouteConfig, c
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func GetChannelByID(channels Channels, id int) (*Channel, error) {
|
||||
for _, channel := range channels {
|
||||
if channel.ID == id {
|
||||
return channel, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", id)
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package alertmanagertypes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@ -19,6 +20,7 @@ const (
|
||||
|
||||
var (
|
||||
ErrCodeAlertmanagerConfigInvalid = errors.MustNewCode("alertmanager_config_invalid")
|
||||
ErrCodeAlertmanagerConfigNotFound = errors.MustNewCode("alertmanager_config_not_found")
|
||||
ErrCodeAlertmanagerConfigConflict = errors.MustNewCode("alertmanager_config_conflict")
|
||||
)
|
||||
|
||||
@ -37,13 +39,11 @@ type RouteConfig struct {
|
||||
type StoreableConfig struct {
|
||||
bun.BaseModel `bun:"table:alertmanager_config"`
|
||||
|
||||
ID uint64 `bun:"id"`
|
||||
Config string `bun:"config"`
|
||||
SilencesState string `bun:"silences_state,nullzero"`
|
||||
NFLogState string `bun:"nflog_state,nullzero"`
|
||||
CreatedAt time.Time `bun:"created_at"`
|
||||
UpdatedAt time.Time `bun:"updated_at"`
|
||||
OrgID string `bun:"org_id"`
|
||||
ID uint64 `bun:"id,pk,autoincrement"`
|
||||
Config string `bun:"config"`
|
||||
CreatedAt time.Time `bun:"created_at"`
|
||||
UpdatedAt time.Time `bun:"updated_at"`
|
||||
OrgID string `bun:"org_id"`
|
||||
}
|
||||
|
||||
// Config is the type for the entire alertmanager configuration
|
||||
@ -66,17 +66,35 @@ func NewConfig(c *config.Config, orgID string) *Config {
|
||||
return &Config{
|
||||
alertmanagerConfig: c,
|
||||
storeableConfig: &StoreableConfig{
|
||||
Config: string(newRawFromConfig(c)),
|
||||
SilencesState: "",
|
||||
NFLogState: "",
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
OrgID: orgID,
|
||||
Config: string(newRawFromConfig(c)),
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
OrgID: orgID,
|
||||
},
|
||||
channels: channels,
|
||||
}
|
||||
}
|
||||
|
||||
func NewConfigFromStoreableConfig(sc *StoreableConfig) (*Config, error) {
|
||||
alertmanagerConfig, err := newConfigFromString(sc.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
channels := NewChannelsFromConfig(alertmanagerConfig, sc.OrgID)
|
||||
|
||||
return &Config{
|
||||
alertmanagerConfig: alertmanagerConfig,
|
||||
storeableConfig: sc,
|
||||
channels: channels,
|
||||
orgID: sc.OrgID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewRouteFromReceiver(receiver Receiver) *config.Route {
|
||||
return &config.Route{Receiver: receiver.Name, Continue: true}
|
||||
}
|
||||
|
||||
func NewDefaultConfig(globalConfig GlobalConfig, routeConfig RouteConfig, orgID string) (*Config, error) {
|
||||
err := mergo.Merge(&globalConfig, config.DefaultGlobalConfig())
|
||||
if err != nil {
|
||||
@ -96,14 +114,14 @@ func NewDefaultConfig(globalConfig GlobalConfig, routeConfig RouteConfig, orgID
|
||||
}, orgID), nil
|
||||
}
|
||||
|
||||
func NewConfigFromString(s string, orgID string) (*Config, error) {
|
||||
func newConfigFromString(s string) (*config.Config, error) {
|
||||
config := new(config.Config)
|
||||
err := json.Unmarshal([]byte(s), config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewConfig(config, orgID), nil
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func newRawFromConfig(c *config.Config) []byte {
|
||||
@ -240,6 +258,17 @@ func (c *Config) DeleteReceiver(name string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type ConfigStore interface {
|
||||
// Set creates or updates a config.
|
||||
Set(context.Context, *Config) error
|
||||
|
||||
// Get returns the config for the given orgID
|
||||
Get(context.Context, string) (*Config, error)
|
||||
|
||||
// ListOrgs returns the list of orgs
|
||||
ListOrgs(context.Context) ([]string, error)
|
||||
}
|
||||
|
||||
// MarshalSecretValue if set to true will expose Secret type
|
||||
// through the marshal interfaces. We need to store the actual value of the secret
|
||||
// in the database, so we need to set this to true.
|
||||
|
@ -2,8 +2,11 @@ package alertmanagertypes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/alertmanager/cluster"
|
||||
"github.com/uptrace/bun"
|
||||
"go.signoz.io/signoz/pkg/errors"
|
||||
)
|
||||
|
||||
@ -22,6 +25,66 @@ var (
|
||||
ErrCodeAlertmanagerStateNotFound = errors.MustNewCode("alertmanager_state_not_found")
|
||||
)
|
||||
|
||||
type StoreableState struct {
|
||||
bun.BaseModel `bun:"table:alertmanager_state"`
|
||||
|
||||
ID uint64 `bun:"id,pk,autoincrement"`
|
||||
Silences string `bun:"silences,nullzero"`
|
||||
NFLog string `bun:"nflog,nullzero"`
|
||||
CreatedAt time.Time `bun:"created_at"`
|
||||
UpdatedAt time.Time `bun:"updated_at"`
|
||||
OrgID string `bun:"org_id"`
|
||||
}
|
||||
|
||||
func NewStoreableState(orgID string) *StoreableState {
|
||||
return &StoreableState{
|
||||
OrgID: orgID,
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StoreableState) Set(stateName StateName, state State) (int64, error) {
|
||||
marshalledState, err := state.MarshalBinary()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
encodedState := base64.StdEncoding.EncodeToString(marshalledState)
|
||||
|
||||
switch stateName {
|
||||
case SilenceStateName:
|
||||
s.Silences = encodedState
|
||||
case NFLogStateName:
|
||||
s.NFLog = encodedState
|
||||
}
|
||||
|
||||
s.UpdatedAt = time.Now()
|
||||
|
||||
return int64(len(marshalledState)), nil
|
||||
}
|
||||
|
||||
func (s *StoreableState) Get(stateName StateName) (string, error) {
|
||||
base64encodedState := ""
|
||||
|
||||
switch stateName {
|
||||
case SilenceStateName:
|
||||
base64encodedState = s.Silences
|
||||
case NFLogStateName:
|
||||
base64encodedState = s.NFLog
|
||||
}
|
||||
|
||||
if base64encodedState == "" {
|
||||
return "", errors.New(errors.TypeNotFound, ErrCodeAlertmanagerStateNotFound, "state not found")
|
||||
}
|
||||
|
||||
decodedState, err := base64.StdEncoding.DecodeString(base64encodedState)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(decodedState), nil
|
||||
}
|
||||
|
||||
type StateName struct {
|
||||
name string
|
||||
}
|
||||
@ -35,9 +98,9 @@ type StateStore interface {
|
||||
// The return type matches the return of `silence.Maintenance` or `nflog.Maintenance`.
|
||||
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/silence/silence.go#L217
|
||||
// and https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L94
|
||||
Set(context.Context, string, StateName, State) (int64, error)
|
||||
Set(context.Context, string, *StoreableState) error
|
||||
|
||||
// Gets the silence state or the notification log state as a string from the store. This is used as a snapshot to load the
|
||||
// initial state of silences or notification log when starting the alertmanager.
|
||||
Get(context.Context, string, StateName) (string, error)
|
||||
Get(context.Context, string) (*StoreableState, error)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user