feat(alertmanager): add server implementation for alertmanager (#7103)

This commit is contained in:
Vibhu Pandey 2025-02-13 14:38:58 +05:30 committed by GitHub
parent 7359c0a825
commit 2d6131c291
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 608 additions and 1 deletions

View File

@ -0,0 +1,103 @@
package server
import (
"net/url"
"time"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/common/model"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
type Config struct {
// The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself.
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L155C54-L155C249
ExternalUrl *url.URL `mapstructure:"external_url"`
// GlobalConfig is the global configuration for the alertmanager
Global alertmanagertypes.GlobalConfig `mapstructure:"global"`
// Config of the root node of the routing tree.
Route alertmanagertypes.RouteConfig `mapstructure:"route"`
// Configuration for alerts.
Alerts AlertsConfig `mapstructure:"alerts"`
// Configuration for silences.
Silences SilencesConfig `mapstructure:"silences"`
// Configuration for the notification log.
NFLog NFLogConfig `mapstructure:"nflog"`
}
type AlertsConfig struct {
// Interval between garbage collection of alerts.
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L152
GCInterval time.Duration `mapstructure:"gc_interval"`
}
type SilencesConfig struct {
// Maximum number of silences, including expired silences. If negative or zero, no limit is set.
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L150C64-L150C157
Max int `mapstructure:"max"`
// Maximum size of the silences in bytes. If negative or zero, no limit is set.
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L150C64-L150C157
MaxSizeBytes int `mapstructure:"max_size_bytes"`
// Interval between garbage collection and snapshotting of the silences. The snapshot will be stored in the state store.
// The upstream alertmanager config (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149) has
// been split between silences and nflog.
MaintenanceInterval time.Duration `mapstructure:"maintenance_interval"`
// Retention of the silences.
Retention time.Duration `mapstructure:"retention"`
}
type NFLogConfig struct {
// Interval between garbage collection and snapshotting of the notification logs. The snapshot will be stored in the state store.
// The upstream alertmanager config (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149) has
// been split between silences and nflog.
MaintenanceInterval time.Duration `mapstructure:"maintenance_interval"`
// Retention of the notification logs.
Retention time.Duration `mapstructure:"retention"`
}
func NewConfig() Config {
return Config{
ExternalUrl: &url.URL{
Host: "localhost:8080",
},
Global: alertmanagertypes.GlobalConfig{
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/config/config.go#L727)
ResolveTimeout: model.Duration(5 * time.Minute),
SMTPHello: "localhost",
SMTPFrom: "alertmanager@signoz.io",
SMTPSmarthost: config.HostPort{Host: "localhost", Port: "25"},
SMTPRequireTLS: true,
},
Route: alertmanagertypes.RouteConfig{
GroupByStr: []string{"alertname"},
GroupInterval: 5 * time.Minute,
GroupWait: 30 * time.Second,
RepeatInterval: 4 * time.Hour,
},
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L152)
Alerts: AlertsConfig{
GCInterval: 30 * time.Minute,
},
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149-L151)
Silences: SilencesConfig{
Max: 0,
MaxSizeBytes: 0,
MaintenanceInterval: 15 * time.Minute,
Retention: 120 * time.Hour,
},
// Corresponds to the default in upstream (https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/cmd/alertmanager/main.go#L149)
NFLog: NFLogConfig{
MaintenanceInterval: 15 * time.Minute,
Retention: 120 * time.Hour,
},
}
}

View File

@ -0,0 +1,302 @@
package server
import (
"context"
"log/slog"
"strings"
"sync"
"time"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/featurecontrol"
"github.com/prometheus/alertmanager/inhibit"
"github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/provider/mem"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/timeinterval"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"go.signoz.io/signoz/pkg/errors"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
var (
// This is not a real file and will never be used. We need this placeholder to ensure maintenance runs on shutdown. See
// https://github.com/prometheus/server/blob/3ee2cd0f1271e277295c02b6160507b4d193dde2/silence/silence.go#L435-L438
// and https://github.com/prometheus/server/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L362.
snapfnoop string = "snapfnoop"
)
type Server struct {
// logger is the logger for the alertmanager
logger *slog.Logger
// registry is the prometheus registry for the alertmanager
registry *prometheus.Registry
// srvConfig is the server config for the alertmanager
srvConfig Config
// alertmanagerConfig is the config of the alertmanager
alertmanagerConfig *alertmanagertypes.Config
// orgID is the orgID for the alertmanager
orgID string
// store is the backing store for the alertmanager
stateStore alertmanagertypes.StateStore
// alertmanager primitives from upstream alertmanager
alerts *mem.Alerts
nflog *nflog.Log
dispatcher *dispatch.Dispatcher
dispatcherMetrics *dispatch.DispatcherMetrics
inhibitor *inhibit.Inhibitor
silencer *silence.Silencer
silences *silence.Silences
timeIntervals map[string][]timeinterval.TimeInterval
pipelineBuilder *notify.PipelineBuilder
marker *alertmanagertypes.MemMarker
tmpl *template.Template
wg sync.WaitGroup
stopc chan struct{}
}
func New(ctx context.Context, logger *slog.Logger, registry *prometheus.Registry, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore) (*Server, error) {
server := &Server{
logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/server"),
registry: registry,
srvConfig: srvConfig,
orgID: orgID,
stateStore: stateStore,
stopc: make(chan struct{}),
}
// initialize marker
server.marker = alertmanagertypes.NewMarker(server.registry)
// get silences for initial state
silencesstate, err := server.stateStore.Get(ctx, server.orgID, alertmanagertypes.SilenceStateName)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
return nil, err
}
// get nflog for initial state
nflogstate, err := server.stateStore.Get(ctx, server.orgID, alertmanagertypes.NFLogStateName)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
return nil, err
}
// Initialize silences
server.silences, err = silence.New(silence.Options{
SnapshotReader: strings.NewReader(silencesstate),
Retention: srvConfig.Silences.Retention,
Limits: silence.Limits{
MaxSilences: func() int { return srvConfig.Silences.Max },
MaxSilenceSizeBytes: func() int { return srvConfig.Silences.MaxSizeBytes },
},
Metrics: server.registry,
Logger: server.logger,
})
if err != nil {
return nil, err
}
// Initialize notification log
server.nflog, err = nflog.New(nflog.Options{
SnapshotReader: strings.NewReader(nflogstate),
Retention: server.srvConfig.NFLog.Retention,
Metrics: server.registry,
Logger: server.logger,
})
if err != nil {
return nil, err
}
// Start maintenance for silences
server.wg.Add(1)
go func() {
defer server.wg.Done()
server.silences.Maintenance(server.srvConfig.Silences.MaintenanceInterval, snapfnoop, server.stopc, func() (int64, error) {
// Delete silences older than the retention period.
if _, err := server.silences.GC(); err != nil {
server.logger.ErrorContext(ctx, "silence garbage collection", "error", err)
// Don't return here - we need to snapshot our state first.
}
return server.stateStore.Set(ctx, server.orgID, alertmanagertypes.SilenceStateName, server.silences)
})
}()
// Start maintenance for notification logs
server.wg.Add(1)
go func() {
defer server.wg.Done()
server.nflog.Maintenance(server.srvConfig.NFLog.MaintenanceInterval, snapfnoop, server.stopc, func() (int64, error) {
if _, err := server.nflog.GC(); err != nil {
server.logger.ErrorContext(ctx, "notification log garbage collection", "error", err)
// Don't return without saving the current state.
}
return server.stateStore.Set(ctx, server.orgID, alertmanagertypes.NFLogStateName, server.nflog)
})
}()
server.alerts, err = mem.NewAlerts(ctx, server.marker, server.srvConfig.Alerts.GCInterval, nil, server.logger, server.registry)
if err != nil {
return nil, err
}
server.pipelineBuilder = notify.NewPipelineBuilder(server.registry, featurecontrol.NoopFlags{})
server.dispatcherMetrics = dispatch.NewDispatcherMetrics(false, server.registry)
return server, nil
}
func (server *Server) GetAlerts(ctx context.Context, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
return alertmanagertypes.NewGettableAlertsFromAlertProvider(server.alerts, server.alertmanagerConfig, server.marker.Status, func(labels model.LabelSet) {
server.inhibitor.Mutes(labels)
server.silencer.Mutes(labels)
}, params)
}
func (server *Server) PutAlerts(ctx context.Context, postableAlerts alertmanagertypes.PostableAlerts) error {
alerts, err := alertmanagertypes.NewAlertsFromPostableAlerts(postableAlerts, time.Duration(server.srvConfig.Global.ResolveTimeout), time.Now())
// Notification sending alert takes precedence over validation errors.
if err := server.alerts.Put(alerts...); err != nil {
return err
}
if err != nil {
return errors.Join(err...)
}
return nil
}
func (server *Server) SetConfig(ctx context.Context, alertmanagerConfig *alertmanagertypes.Config) error {
config := alertmanagerConfig.AlertmanagerConfig()
var err error
server.tmpl, err = template.FromGlobs(config.Templates)
if err != nil {
return err
}
server.tmpl.ExternalURL = server.srvConfig.ExternalUrl
// Build the routing tree and record which receivers are used.
routes := dispatch.NewRoute(config.Route, nil)
activeReceivers := make(map[string]struct{})
routes.Walk(func(r *dispatch.Route) {
activeReceivers[r.RouteOpts.Receiver] = struct{}{}
})
// Build the map of receiver to integrations.
receivers := make(map[string][]notify.Integration, len(activeReceivers))
var integrationsNum int
for _, rcv := range config.Receivers {
if _, found := activeReceivers[rcv.Name]; !found {
// No need to build a receiver if no route is using it.
server.logger.InfoContext(ctx, "skipping creation of receiver not referenced by any route", "receiver", rcv.Name)
continue
}
integrations, err := alertmanagertypes.NewReceiverIntegrations(rcv, server.tmpl, server.logger)
if err != nil {
return err
}
// rcv.Name is guaranteed to be unique across all receivers.
receivers[rcv.Name] = integrations
integrationsNum += len(integrations)
}
// Build the map of time interval names to time interval definitions.
timeIntervals := make(map[string][]timeinterval.TimeInterval, len(config.MuteTimeIntervals)+len(config.TimeIntervals))
for _, ti := range config.MuteTimeIntervals {
timeIntervals[ti.Name] = ti.TimeIntervals
}
for _, ti := range config.TimeIntervals {
timeIntervals[ti.Name] = ti.TimeIntervals
}
intervener := timeinterval.NewIntervener(timeIntervals)
if server.inhibitor != nil {
server.inhibitor.Stop()
}
if server.dispatcher != nil {
server.dispatcher.Stop()
}
server.inhibitor = inhibit.NewInhibitor(server.alerts, config.InhibitRules, server.marker, server.logger)
server.timeIntervals = timeIntervals
server.silencer = silence.NewSilencer(server.silences, server.marker, server.logger)
var pipelinePeer notify.Peer
pipeline := server.pipelineBuilder.New(
receivers,
func() time.Duration { return 0 },
server.inhibitor,
server.silencer,
intervener,
server.marker,
server.nflog,
pipelinePeer,
)
timeoutFunc := func(d time.Duration) time.Duration {
if d < notify.MinTimeout {
d = notify.MinTimeout
}
return d
}
server.dispatcher = dispatch.NewDispatcher(
server.alerts,
routes,
pipeline,
server.marker,
timeoutFunc,
nil,
server.logger,
server.dispatcherMetrics,
)
// Do not try to add these to server.wg as there seems to be a race condition if
// we call Start() and Stop() in quick succession.
// Both these goroutines will run indefinitely.
go server.dispatcher.Run()
go server.inhibitor.Run()
server.alertmanagerConfig = alertmanagerConfig
return nil
}
func (server *Server) TestReceiver(ctx context.Context, receiver alertmanagertypes.Receiver) error {
return alertmanagertypes.TestReceiver(ctx, receiver, server.tmpl, server.logger)
}
func (server *Server) Stop(ctx context.Context) error {
if server.dispatcher != nil {
server.dispatcher.Stop()
}
if server.inhibitor != nil {
server.inhibitor.Stop()
}
// Close the alert provider.
server.alerts.Close()
// Signals maintenance goroutines of server states to stop.
close(server.stopc)
// Wait for all goroutines to finish.
server.wg.Wait()
return nil
}

View File

@ -0,0 +1,126 @@
package server
import (
"bytes"
"context"
"io"
"log/slog"
"net"
"net/http"
"net/url"
"testing"
"time"
"github.com/go-openapi/strfmt"
"github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/client_golang/prometheus"
commoncfg "github.com/prometheus/common/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
"go.signoz.io/signoz/pkg/types/alertmanagertypes/alertmanagertypestest"
)
func TestServerSetConfigAndStop(t *testing.T) {
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore())
require.NoError(t, err)
amConfig, err := alertmanagertypes.NewDefaultConfig(alertmanagertypes.GlobalConfig{}, alertmanagertypes.RouteConfig{}, "1")
require.NoError(t, err)
assert.NoError(t, server.SetConfig(context.Background(), amConfig))
assert.NoError(t, server.Stop(context.Background()))
}
func TestServerTestReceiverTypeWebhook(t *testing.T) {
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore())
require.NoError(t, err)
amConfig, err := alertmanagertypes.NewDefaultConfig(alertmanagertypes.GlobalConfig{}, alertmanagertypes.RouteConfig{}, "1")
require.NoError(t, err)
webhookListener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
requestBody := new(bytes.Buffer)
webhookServer := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := requestBody.ReadFrom(r.Body)
require.NoError(t, err)
w.WriteHeader(http.StatusOK)
}),
}
go func() {
require.NoError(t, webhookServer.Serve(webhookListener))
}()
require.NoError(t, server.SetConfig(context.Background(), amConfig))
defer require.NoError(t, server.Stop(context.Background()))
webhookURL, err := url.Parse("http://" + webhookListener.Addr().String() + "/webhook")
require.NoError(t, err)
err = server.TestReceiver(context.Background(), alertmanagertypes.Receiver{
Name: "test-receiver",
WebhookConfigs: []*config.WebhookConfig{
{
HTTPConfig: &commoncfg.HTTPClientConfig{},
URL: &config.SecretURL{URL: webhookURL},
},
},
})
assert.NoError(t, err)
assert.Contains(t, requestBody.String(), "test-receiver")
assert.Contains(t, requestBody.String(), "firing")
}
func TestServerPutAlerts(t *testing.T) {
stateStore := alertmanagertypestest.NewStateStore()
srvCfg := NewConfig()
srvCfg.Route.GroupInterval = 1 * time.Second
server, err := New(context.Background(), slog.New(slog.NewTextHandler(io.Discard, nil)), prometheus.NewRegistry(), srvCfg, "1", stateStore)
require.NoError(t, err)
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")
require.NoError(t, err)
require.NoError(t, amConfig.CreateReceiver(&config.Route{Receiver: "test-receiver", Continue: true}, alertmanagertypes.Receiver{
Name: "test-receiver",
WebhookConfigs: []*config.WebhookConfig{
{
HTTPConfig: &commoncfg.HTTPClientConfig{},
URL: &config.SecretURL{URL: &url.URL{Host: "localhost", Path: "/test-receiver"}},
},
},
}))
require.NoError(t, server.SetConfig(context.Background(), amConfig))
require.NoError(t, server.PutAlerts(context.Background(), alertmanagertypes.PostableAlerts{
{
Annotations: models.LabelSet{"alertname": "test-alert"},
StartsAt: strfmt.DateTime(time.Now().Add(-time.Hour)),
EndsAt: strfmt.DateTime(time.Now().Add(time.Hour)),
Alert: models.Alert{
GeneratorURL: "http://localhost:8080/test-alert",
Labels: models.LabelSet{"alertname": "test-alert"},
},
},
}))
require.NotEmpty(t, server.alerts)
dummyRequest, err := http.NewRequest(http.MethodGet, "/alerts", nil)
require.NoError(t, err)
params, err := alertmanagertypes.NewGettableAlertsParams(dummyRequest)
require.NoError(t, err)
gettableAlerts, err := server.GetAlerts(context.Background(), params)
require.NoError(t, err)
assert.Equal(t, 1, len(gettableAlerts))
assert.Equal(t, gettableAlerts[0].Alert.Labels["alertname"], "test-alert")
assert.NoError(t, server.Stop(context.Background()))
}

View File

@ -0,0 +1,55 @@
package alertmanagertypestest
import (
"context"
"encoding/base64"
"sync"
"go.signoz.io/signoz/pkg/errors"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
)
type StateStore struct {
states map[string]map[string]string
mtx sync.RWMutex
}
func NewStateStore() *StateStore {
return &StateStore{
states: make(map[string]map[string]string),
}
}
func (s *StateStore) Set(ctx context.Context, orgID string, stateName alertmanagertypes.StateName, state alertmanagertypes.State) (int64, error) {
if _, ok := s.states[orgID]; !ok {
s.states[orgID] = make(map[string]string)
}
bytes, err := state.MarshalBinary()
if err != nil {
return 0, err
}
s.mtx.Lock()
s.states[orgID][stateName.String()] = base64.StdEncoding.EncodeToString(bytes)
s.mtx.Unlock()
return int64(len(bytes)), nil
}
func (s *StateStore) Get(ctx context.Context, orgID string, stateName alertmanagertypes.StateName) (string, error) {
if _, ok := s.states[orgID]; !ok {
return "", errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerStateNotFound, "state %q for orgID %q not found", stateName.String(), orgID)
}
state, ok := s.states[orgID][stateName.String()]
if !ok {
return "", errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerStateNotFound, "state %q for orgID %q not found", stateName.String(), orgID)
}
bytes, err := base64.StdEncoding.DecodeString(state)
if err != nil {
return "", err
}
return string(bytes), nil
}

View File

@ -1,6 +1,11 @@
package alertmanagertypes
import "github.com/prometheus/alertmanager/cluster"
import (
"context"
"github.com/prometheus/alertmanager/cluster"
"go.signoz.io/signoz/pkg/errors"
)
// State is the type alias for the State type from the alertmanager package.
type State = cluster.State
@ -13,6 +18,10 @@ var (
NFLogStateName = StateName{name: "nflog"}
)
var (
ErrCodeAlertmanagerStateNotFound = errors.MustNewCode("alertmanager_state_not_found")
)
type StateName struct {
name string
}
@ -20,3 +29,15 @@ type StateName struct {
func (s StateName) String() string {
return s.name
}
type StateStore interface {
// Creates the silence or the notification log state and returns the number of bytes in the state.
// The return type matches the return of `silence.Maintenance` or `nflog.Maintenance`.
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/silence/silence.go#L217
// and https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L94
Set(context.Context, string, StateName, State) (int64, error)
// Gets the silence state or the notification log state as a string from the store. This is used as a snapshot to load the
// initial state of silences or notification log when starting the alertmanager.
Get(context.Context, string, StateName) (string, error)
}