mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-07-28 08:12:00 +08:00
145 lines
3.7 KiB
Go
145 lines
3.7 KiB
Go
package alertmanagerbatcher
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
|
|
"sync"
|
|
|
|
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
|
)
|
|
|
|
// Batcher is responsible for batching alerts and broadcasting them on a channel.
|
|
type Batcher struct {
|
|
// C is the channel on which alerts are sent to alertmanager
|
|
C chan alertmanagertypes.PostableAlerts
|
|
|
|
// logger
|
|
logger *slog.Logger
|
|
|
|
// queue of alerts to be sent to alertmanager
|
|
queue alertmanagertypes.PostableAlerts
|
|
|
|
// config
|
|
config Config
|
|
|
|
// more channel to signal the sender goroutine to send alerts
|
|
moreC chan struct{}
|
|
|
|
// stop channel to signal the sender goroutine to stop
|
|
stopC chan struct{}
|
|
|
|
// mutex to synchronize access to the queue
|
|
queueMtx sync.RWMutex
|
|
|
|
// wait group to wait for all goroutines to finish
|
|
goroutinesWg sync.WaitGroup
|
|
}
|
|
|
|
func New(logger *slog.Logger, config Config) *Batcher {
|
|
batcher := &Batcher{
|
|
logger: logger,
|
|
queue: make(alertmanagertypes.PostableAlerts, 0, config.Capacity),
|
|
config: config,
|
|
moreC: make(chan struct{}, 1),
|
|
stopC: make(chan struct{}),
|
|
C: make(chan alertmanagertypes.PostableAlerts, config.Size),
|
|
}
|
|
|
|
return batcher
|
|
}
|
|
|
|
// Start dispatches notifications continuously.
|
|
func (batcher *Batcher) Start(ctx context.Context) error {
|
|
batcher.goroutinesWg.Add(1)
|
|
go func() {
|
|
defer batcher.goroutinesWg.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-batcher.stopC:
|
|
for batcher.queueLen() > 0 {
|
|
alerts := batcher.next()
|
|
batcher.C <- alerts
|
|
}
|
|
close(batcher.C)
|
|
return
|
|
case <-batcher.moreC:
|
|
}
|
|
alerts := batcher.next()
|
|
batcher.C <- alerts
|
|
// If the queue still has items left, kick off the next iteration.
|
|
if batcher.queueLen() > 0 {
|
|
batcher.setMore()
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Add queues the given alerts for processing.
|
|
func (batcher *Batcher) Add(ctx context.Context, alerts ...*alertmanagertypes.PostableAlert) {
|
|
batcher.queueMtx.Lock()
|
|
defer batcher.queueMtx.Unlock()
|
|
|
|
// Queue capacity should be significantly larger than a single alert
|
|
// batch could be.
|
|
if d := len(alerts) - batcher.config.Capacity; d > 0 {
|
|
alerts = alerts[d:]
|
|
batcher.logger.WarnContext(ctx, "alert batch larger than queue capacity, dropping alerts", "num_dropped", d, "capacity", batcher.config.Capacity)
|
|
}
|
|
|
|
// If the queue is full, remove the oldest alerts in favor
|
|
// of newer ones.
|
|
if d := (len(batcher.queue) + len(alerts)) - batcher.config.Capacity; d > 0 {
|
|
batcher.queue = batcher.queue[d:]
|
|
batcher.logger.WarnContext(ctx, "alert batch queue full, dropping alerts", "num_dropped", d)
|
|
}
|
|
|
|
batcher.queue = append(batcher.queue, alerts...)
|
|
|
|
// Notify sending goroutine that there are alerts to be processed.
|
|
batcher.setMore()
|
|
}
|
|
|
|
// Stop shuts down the batcher.
|
|
func (batcher *Batcher) Stop(ctx context.Context) {
|
|
close(batcher.stopC)
|
|
batcher.goroutinesWg.Wait()
|
|
}
|
|
|
|
func (batcher *Batcher) queueLen() int {
|
|
batcher.queueMtx.RLock()
|
|
defer batcher.queueMtx.RUnlock()
|
|
|
|
return len(batcher.queue)
|
|
}
|
|
|
|
func (batcher *Batcher) next() alertmanagertypes.PostableAlerts {
|
|
batcher.queueMtx.Lock()
|
|
defer batcher.queueMtx.Unlock()
|
|
|
|
var alerts alertmanagertypes.PostableAlerts
|
|
|
|
if len(batcher.queue) > batcher.config.Size {
|
|
alerts = append(make(alertmanagertypes.PostableAlerts, 0, batcher.config.Size), batcher.queue[:batcher.config.Size]...)
|
|
batcher.queue = batcher.queue[batcher.config.Size:]
|
|
} else {
|
|
alerts = append(make(alertmanagertypes.PostableAlerts, 0, len(batcher.queue)), batcher.queue...)
|
|
batcher.queue = batcher.queue[:0]
|
|
}
|
|
|
|
return alerts
|
|
}
|
|
|
|
// setMore signals that the alert queue has items.
|
|
func (batcher *Batcher) setMore() {
|
|
// If we cannot send on the channel, it means the signal already exists
|
|
// and has not been consumed yet.
|
|
select {
|
|
case batcher.moreC <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|