mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-06-04 11:25:52 +08:00

* chore: integrate pipelines API * fix: limit support integrated in pipelines * fix: interface to string * fix: json parser and allow deleting all pipelines * fix: output modified if operators are disabled * fix: validation updated for operators * fix: expression check added * fix: regex expression check added * fix: remove operator validation updated * fix: tests updated for pipeline builder * fix: fix error messages in http handler * fix: dont return payload if there is an error * fix: extracting userId from context moved to auth package * fix: api errors moved to http handler * fix: get version logic updated * fix: deployment result message updated * fix: pipeline builder edgecase fixed and tests updated * fix: get failing postablePipeline tests to pass --------- Co-authored-by: Vishal Sharma <makeavish786@gmail.com> Co-authored-by: Raj <rkssisodiya@gmail.com>
231 lines
7.6 KiB
Go
231 lines
7.6 KiB
Go
package agentConf
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync/atomic"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
"go.signoz.io/signoz/pkg/query-service/app/opamp"
|
|
filterprocessor "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig/filterprocessor"
|
|
tsp "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig/tailsampler"
|
|
"go.uber.org/zap"
|
|
yaml "gopkg.in/yaml.v3"
|
|
)
|
|
|
|
var m *Manager
|
|
|
|
func init() {
|
|
m = &Manager{}
|
|
}
|
|
|
|
type Manager struct {
|
|
Repo
|
|
// lock to make sure only one update is sent to remote agents at a time
|
|
lock uint32
|
|
}
|
|
|
|
// Ready indicates if Manager can accept new config update requests
|
|
func (mgr *Manager) Ready() bool {
|
|
if atomic.LoadUint32(&mgr.lock) != 0 {
|
|
return false
|
|
}
|
|
return opamp.Ready()
|
|
}
|
|
|
|
func Initiate(db *sqlx.DB, engine string) error {
|
|
m.Repo = Repo{db}
|
|
return m.initDB(engine)
|
|
}
|
|
|
|
// Ready indicates if Manager can accept new config update requests
|
|
func Ready() bool {
|
|
return m.Ready()
|
|
}
|
|
|
|
func GetLatestVersion(ctx context.Context, elementType ElementTypeDef) (*ConfigVersion, error) {
|
|
return m.GetLatestVersion(ctx, elementType)
|
|
}
|
|
|
|
func GetConfigVersion(ctx context.Context, elementType ElementTypeDef, version int) (*ConfigVersion, error) {
|
|
return m.GetConfigVersion(ctx, elementType, version)
|
|
}
|
|
|
|
func GetConfigHistory(ctx context.Context, typ ElementTypeDef, limit int) ([]ConfigVersion, error) {
|
|
return m.GetConfigHistory(ctx, typ, limit)
|
|
}
|
|
|
|
// StartNewVersion launches a new config version for given set of elements
|
|
func StartNewVersion(ctx context.Context, userId string, eleType ElementTypeDef, elementIds []string) (*ConfigVersion, error) {
|
|
|
|
if !m.Ready() {
|
|
// agent is already being updated, ask caller to wait and re-try after sometime
|
|
return nil, fmt.Errorf("agent updater is busy")
|
|
}
|
|
|
|
// create a new version
|
|
cfg := NewConfigversion(eleType)
|
|
|
|
// insert new config and elements into database
|
|
err := m.insertConfig(ctx, userId, cfg, elementIds)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return cfg, nil
|
|
}
|
|
|
|
func Redeploy(ctx context.Context, typ ElementTypeDef, version int) error {
|
|
|
|
configVersion, err := GetConfigVersion(ctx, typ, version)
|
|
if err != nil {
|
|
zap.S().Debug("failed to fetch config version during redeploy", err)
|
|
return fmt.Errorf("failed to fetch details of the config version")
|
|
}
|
|
|
|
if configVersion == nil || (configVersion != nil && configVersion.LastConf == "") {
|
|
zap.S().Debug("config version has no conf yaml", configVersion)
|
|
return fmt.Errorf("the config version can not be redeployed")
|
|
}
|
|
switch typ {
|
|
case ElementTypeSamplingRules:
|
|
var config *tsp.Config
|
|
if err := yaml.Unmarshal([]byte(configVersion.LastConf), &config); err != nil {
|
|
zap.S().Error("failed to read last conf correctly", err)
|
|
return fmt.Errorf("failed to read the stored config correctly")
|
|
}
|
|
|
|
// merge current config with new filter params
|
|
processorConf := map[string]interface{}{
|
|
"signoz_tail_sampling": config,
|
|
}
|
|
|
|
opamp.AddToTracePipelineSpec("signoz_tail_sampling")
|
|
configHash, err := opamp.UpsertControlProcessors(ctx, "traces", processorConf, m.OnConfigUpdate)
|
|
if err != nil {
|
|
zap.S().Error("failed to call agent config update for trace processor:", err)
|
|
return fmt.Errorf("failed to deploy the config")
|
|
}
|
|
|
|
m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, configVersion.LastConf)
|
|
case ElementTypeDropRules:
|
|
var filterConfig *filterprocessor.Config
|
|
if err := yaml.Unmarshal([]byte(configVersion.LastConf), &filterConfig); err != nil {
|
|
zap.S().Error("failed to read last conf correctly", err)
|
|
return fmt.Errorf("failed to read the stored config correctly")
|
|
}
|
|
processorConf := map[string]interface{}{
|
|
"filter": filterConfig,
|
|
}
|
|
|
|
opamp.AddToMetricsPipelineSpec("filter")
|
|
configHash, err := opamp.UpsertControlProcessors(ctx, "metrics", processorConf, m.OnConfigUpdate)
|
|
if err != nil {
|
|
zap.S().Error("failed to call agent config update for trace processor:", err)
|
|
return err
|
|
}
|
|
|
|
m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, configVersion.LastConf)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpsertFilterProcessor updates the agent config with new filter processor params
|
|
func UpsertFilterProcessor(ctx context.Context, version int, config *filterprocessor.Config) error {
|
|
if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) {
|
|
return fmt.Errorf("agent updater is busy")
|
|
}
|
|
defer atomic.StoreUint32(&m.lock, 0)
|
|
|
|
// merge current config with new filter params
|
|
// merge current config with new filter params
|
|
processorConf := map[string]interface{}{
|
|
"filter": config,
|
|
}
|
|
|
|
opamp.AddToMetricsPipelineSpec("filter")
|
|
configHash, err := opamp.UpsertControlProcessors(ctx, "metrics", processorConf, m.OnConfigUpdate)
|
|
if err != nil {
|
|
zap.S().Error("failed to call agent config update for trace processor:", err)
|
|
return err
|
|
}
|
|
|
|
processorConfYaml, err := yaml.Marshal(config)
|
|
if err != nil {
|
|
zap.S().Warnf("unexpected error while transforming processor config to yaml", err)
|
|
}
|
|
|
|
m.updateDeployStatus(ctx, ElementTypeDropRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
|
|
return nil
|
|
}
|
|
|
|
// OnConfigUpdate is a callback function passed to opamp server.
|
|
// It receives a config hash with error status. We assume
|
|
// successful deployment if no error is received.
|
|
// this method is currently expected to be called only once in the lifecycle
|
|
// but can be improved in future to accept continuous request status updates from opamp
|
|
func (m *Manager) OnConfigUpdate(agentId string, hash string, err error) {
|
|
|
|
status := string(Deployed)
|
|
|
|
message := "Deployment was successful"
|
|
|
|
defer func() {
|
|
zap.S().Info(status, zap.String("agentId", agentId), zap.String("agentResponse", message))
|
|
}()
|
|
|
|
if err != nil {
|
|
status = string(DeployFailed)
|
|
message = fmt.Sprintf("%s: %s", agentId, err.Error())
|
|
}
|
|
|
|
m.updateDeployStatusByHash(context.Background(), hash, status, message)
|
|
}
|
|
|
|
// UpsertSamplingProcessor updates the agent config with new filter processor params
|
|
func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Config) error {
|
|
if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) {
|
|
return fmt.Errorf("agent updater is busy")
|
|
}
|
|
defer atomic.StoreUint32(&m.lock, 0)
|
|
|
|
// merge current config with new filter params
|
|
processorConf := map[string]interface{}{
|
|
"signoz_tail_sampling": config,
|
|
}
|
|
|
|
opamp.AddToTracePipelineSpec("signoz_tail_sampling")
|
|
configHash, err := opamp.UpsertControlProcessors(ctx, "traces", processorConf, m.OnConfigUpdate)
|
|
if err != nil {
|
|
zap.S().Error("failed to call agent config update for trace processor:", err)
|
|
return err
|
|
}
|
|
|
|
processorConfYaml, err := yaml.Marshal(config)
|
|
if err != nil {
|
|
zap.S().Warnf("unexpected error while transforming processor config to yaml", err)
|
|
}
|
|
|
|
m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
|
|
return nil
|
|
}
|
|
|
|
// UpsertLogParsingProcessors updates the agent with log parsing processors
|
|
func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []string) error {
|
|
if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) {
|
|
return fmt.Errorf("agent updater is busy")
|
|
}
|
|
defer atomic.StoreUint32(&m.lock, 0)
|
|
|
|
// send the changes to opamp.
|
|
configHash, err := opamp.UpsertLogsParsingProcessor(context.Background(), config, names, m.OnConfigUpdate)
|
|
if err != nil {
|
|
zap.S().Errorf("failed to call agent config update for log parsing processor:", err)
|
|
return err
|
|
}
|
|
|
|
m.updateDeployStatus(ctx, ElementTypeLogPipelines, version, string(DeployInitiated), "Deployment has started", configHash, string(rawPipelineData))
|
|
return nil
|
|
}
|