diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 5dda1e7237..5a1dcd9bd5 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -171,13 +171,18 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } - // initiate agent config handler - if err := agentConf.Initiate(localDB, AppDbEngine); err != nil { + // ingestion pipelines manager + logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(localDB, "sqlite") + if err != nil { return nil, err } - // ingestion pipelines manager - logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(localDB, "sqlite") + // initiate agent config handler + agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{ + DB: localDB, + DBEngine: AppDbEngine, + AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController}, + }) if err != nil { return nil, err } @@ -256,10 +261,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { s.privateHTTP = privateServer - // TODO(Raj): Replace this with actual provider in a follow up PR - agentConfigProvider := opamp.NewMockAgentConfigProvider() s.opampServer = opamp.InitializeServer( - &opAmpModel.AllAgents, agentConfigProvider, + &opAmpModel.AllAgents, agentConfMgr, ) return s, nil diff --git a/pkg/query-service/agentConf/Readme.md b/pkg/query-service/agentConf/Readme.md new file mode 100644 index 0000000000..14cdd63375 --- /dev/null +++ b/pkg/query-service/agentConf/Readme.md @@ -0,0 +1,5 @@ +# Versioned config for agent based features. + +Responsibilities +- Maintain versioned config for registered agent based features like log pipelines etc. +- Provide a combined `AgentConfigProvider` for the opamp server to consume when managing agents diff --git a/pkg/query-service/agentConf/agent_features.go b/pkg/query-service/agentConf/agent_features.go new file mode 100644 index 0000000000..f10d4fbf66 --- /dev/null +++ b/pkg/query-service/agentConf/agent_features.go @@ -0,0 +1,25 @@ +package agentConf + +import "go.signoz.io/signoz/pkg/query-service/model" + +// Interface for features implemented via agent config. +// Eg: ingestion side signal pre-processing features like log processing pipelines etc +type AgentFeature interface { + // Must be unique across `AgentFeature`s + AgentFeatureType() AgentFeatureType + + // Recommend config for an agent based on its `currentConfYaml` and + // `configVersion` for the feature's settings + RecommendAgentConfig( + currentConfYaml []byte, + configVersion *ConfigVersion, + ) ( + recommendedConfYaml []byte, + + // stored as agent_config_versions.last_config in current agentConf model + // TODO(Raj): maybe refactor agentConf further and clean this up + serializedSettingsUsed string, + + apiErr *model.ApiError, + ) +} diff --git a/pkg/query-service/agentConf/manager.go b/pkg/query-service/agentConf/manager.go index e2a5c2239c..a64c93d3fb 100644 --- a/pkg/query-service/agentConf/manager.go +++ b/pkg/query-service/agentConf/manager.go @@ -3,9 +3,13 @@ package agentConf import ( "context" "fmt" + "strings" + "sync" "sync/atomic" + "github.com/google/uuid" "github.com/jmoiron/sqlx" + "github.com/pkg/errors" "go.signoz.io/signoz/pkg/query-service/app/opamp" filterprocessor "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig/filterprocessor" tsp "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig/tailsampler" @@ -20,10 +24,142 @@ func init() { m = &Manager{} } +type AgentFeatureType string + type Manager struct { Repo // lock to make sure only one update is sent to remote agents at a time lock uint32 + + // For AgentConfigProvider implementation + agentFeatures []AgentFeature + configSubscribers map[string]func() + configSubscribersLock sync.Mutex +} + +type ManagerOptions struct { + DB *sqlx.DB + DBEngine string + + // When acting as opamp.AgentConfigProvider, agent conf recommendations are + // applied to the base conf in the order the features have been specified here. + AgentFeatures []AgentFeature +} + +func Initiate(options *ManagerOptions) (*Manager, error) { + // featureType must be unqiue across registered AgentFeatures. + agentFeatureByType := map[AgentFeatureType]AgentFeature{} + for _, feature := range options.AgentFeatures { + featureType := feature.AgentFeatureType() + if agentFeatureByType[featureType] != nil { + panic(fmt.Sprintf( + "found multiple agent features with type: %s", featureType, + )) + } + agentFeatureByType[featureType] = feature + } + + m = &Manager{ + Repo: Repo{options.DB}, + agentFeatures: options.AgentFeatures, + configSubscribers: map[string]func(){}, + } + + err := m.initDB(options.DBEngine) + if err != nil { + return nil, errors.Wrap(err, "could not init agentConf db") + } + return m, nil +} + +// Implements opamp.AgentConfigProvider +func (m *Manager) SubscribeToConfigUpdates(callback func()) (unsubscribe func()) { + m.configSubscribersLock.Lock() + defer m.configSubscribersLock.Unlock() + + subscriberId := uuid.NewString() + m.configSubscribers[subscriberId] = callback + + return func() { + delete(m.configSubscribers, subscriberId) + } +} + +func (m *Manager) notifyConfigUpdateSubscribers() { + m.configSubscribersLock.Lock() + defer m.configSubscribersLock.Unlock() + for _, handler := range m.configSubscribers { + handler() + } +} + +// Implements opamp.AgentConfigProvider +func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) ( + recommendedConfYaml []byte, + // Opaque id of the recommended config, used for reporting deployment status updates + configId string, + err error, +) { + recommendation := currentConfYaml + settingVersions := []string{} + + for _, feature := range m.agentFeatures { + featureType := ElementTypeDef(feature.AgentFeatureType()) + latestConfig, apiErr := GetLatestVersion(context.Background(), featureType) + if apiErr != nil && apiErr.Type() != model.ErrorNotFound { + return nil, "", errors.Wrap(apiErr.ToError(), "failed to get latest agent config version") + } + + if latestConfig == nil { + continue + } + + updatedConf, serializedSettingsUsed, apiErr := feature.RecommendAgentConfig( + recommendation, latestConfig, + ) + if apiErr != nil { + return nil, "", errors.Wrap(apiErr.ToError(), fmt.Sprintf( + "failed to generate agent config recommendation for %s", featureType, + )) + } + recommendation = updatedConf + configId := fmt.Sprintf("%s:%d", featureType, latestConfig.Version) + settingVersions = append(settingVersions, configId) + + m.updateDeployStatus( + context.Background(), + featureType, + latestConfig.Version, + string(DeployInitiated), + "Deployment has started", + configId, + serializedSettingsUsed, + ) + + } + + configId = strings.Join(settingVersions, ",") + return recommendation, configId, nil +} + +// Implements opamp.AgentConfigProvider +func (m *Manager) ReportConfigDeploymentStatus( + agentId string, + configId string, + err error, +) { + featureConfigIds := strings.Split(configId, ",") + for _, featureConfId := range featureConfigIds { + newStatus := string(Deployed) + message := "Deployment was successful" + if err != nil { + newStatus = string(DeployFailed) + message = fmt.Sprintf("%s: %s", agentId, err.Error()) + } + m.updateDeployStatusByHash( + context.Background(), featureConfId, newStatus, message, + ) + } } // Ready indicates if Manager can accept new config update requests @@ -34,10 +170,7 @@ func (mgr *Manager) Ready() bool { return opamp.Ready() } -func Initiate(db *sqlx.DB, engine string) error { - m.Repo = Repo{db} - return m.initDB(engine) -} +// Static methods for working with default manager instance in this module. // Ready indicates if Manager can accept new config update requests func Ready() bool { @@ -81,6 +214,8 @@ func StartNewVersion( return nil, err } + m.notifyConfigUpdateSubscribers() + return cfg, nil } @@ -219,27 +354,3 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml)) return nil } - -// UpsertLogParsingProcessors updates the agent with log parsing processors -func UpsertLogParsingProcessor( - ctx context.Context, - version int, - rawPipelineData []byte, - config map[string]interface{}, - names []string, -) *model.ApiError { - if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) { - return model.UnavailableError(fmt.Errorf("agent updater is busy")) - } - defer atomic.StoreUint32(&m.lock, 0) - - // send the changes to opamp. - configHash, err := opamp.UpsertLogsParsingProcessor(context.Background(), config, names, m.OnConfigUpdate) - if err != nil { - zap.S().Errorf("failed to call agent config update for log parsing processor:", err) - return err - } - - m.updateDeployStatus(ctx, ElementTypeLogPipelines, version, string(DeployInitiated), "Deployment has started", configHash, string(rawPipelineData)) - return nil -} diff --git a/pkg/query-service/app/logparsingpipeline/agent_feature.go b/pkg/query-service/app/logparsingpipeline/agent_feature.go new file mode 100644 index 0000000000..3f8cc7df1a --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/agent_feature.go @@ -0,0 +1,5 @@ +package logparsingpipeline + +import "go.signoz.io/signoz/pkg/query-service/agentConf" + +const LogPipelinesFeatureType agentConf.AgentFeatureType = "log_pipelines" diff --git a/pkg/query-service/app/opamp/logspipeline.go b/pkg/query-service/app/logparsingpipeline/collector_config.go similarity index 62% rename from pkg/query-service/app/opamp/logspipeline.go rename to pkg/query-service/app/logparsingpipeline/collector_config.go index 9ad81fe77c..dfef6070f9 100644 --- a/pkg/query-service/app/opamp/logspipeline.go +++ b/pkg/query-service/app/logparsingpipeline/collector_config.go @@ -1,16 +1,12 @@ -package opamp +package logparsingpipeline import ( - "context" - "crypto/sha256" "encoding/json" "fmt" "strings" "sync" "github.com/knadh/koanf/parsers/yaml" - "github.com/open-telemetry/opamp-go/protobufs" - model "go.signoz.io/signoz/pkg/query-service/app/opamp/model" "go.signoz.io/signoz/pkg/query-service/constants" coreModel "go.signoz.io/signoz/pkg/query-service/model" "go.uber.org/zap" @@ -18,93 +14,7 @@ import ( var lockLogsPipelineSpec sync.RWMutex -func UpsertLogsParsingProcessor( - ctx context.Context, - parsingProcessors map[string]interface{}, - parsingProcessorsNames []string, - callback func(string, string, error), -) (string, *coreModel.ApiError) { - confHash := "" - if opAmpServer == nil { - return confHash, coreModel.UnavailableError(fmt.Errorf( - "opamp server is down, unable to push config to agent at this moment", - )) - } - - agents := opAmpServer.agents.GetAllAgents() - if len(agents) == 0 { - return confHash, coreModel.UnavailableError(fmt.Errorf( - "no agents available at the moment", - )) - } - - for _, agent := range agents { - config := agent.EffectiveConfig - c, err := yaml.Parser().Unmarshal([]byte(config)) - if err != nil { - return confHash, coreModel.BadRequest(err) - } - - buildLogParsingProcessors(c, parsingProcessors) - - p, err := getOtelPipelinFromConfig(c) - if err != nil { - return confHash, coreModel.BadRequest(err) - } - if p.Pipelines.Logs == nil { - return confHash, coreModel.InternalError(fmt.Errorf( - "logs pipeline doesn't exist", - )) - } - - // build the new processor list - updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, parsingProcessorsNames) - p.Pipelines.Logs.Processors = updatedProcessorList - - // add the new processor to the data ( no checks required as the keys will exists) - c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs - - updatedConf, err := yaml.Parser().Marshal(c) - if err != nil { - return confHash, coreModel.BadRequest(err) - } - - // zap.S().Infof("sending new config", string(updatedConf)) - hash := sha256.New() - _, err = hash.Write(updatedConf) - if err != nil { - return confHash, coreModel.InternalError(err) - } - agent.EffectiveConfig = string(updatedConf) - err = agent.Upsert() - if err != nil { - return confHash, coreModel.InternalError(err) - } - - agent.SendToAgent(&protobufs.ServerToAgent{ - RemoteConfig: &protobufs.AgentRemoteConfig{ - Config: &protobufs.AgentConfigMap{ - ConfigMap: map[string]*protobufs.AgentConfigFile{ - "collector.yaml": { - Body: updatedConf, - ContentType: "application/x-yaml", - }, - }, - }, - ConfigHash: hash.Sum(nil), - }, - }) - - if confHash == "" { - confHash = string(hash.Sum(nil)) - model.ListenToConfigUpdate(agent.ID, confHash, callback) - } - } - - return confHash, nil -} - -// check if the processors already exist +// check if the processors already exis // if yes then update the processor. // if something doesn't exists then remove it. func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error { @@ -233,3 +143,40 @@ func checkDuplicateString(pipeline []string) bool { } return false } + +func GenerateCollectorConfigWithPipelines( + config []byte, + parsingProcessors map[string]interface{}, + parsingProcessorsNames []string, +) ([]byte, *coreModel.ApiError) { + c, err := yaml.Parser().Unmarshal([]byte(config)) + if err != nil { + return nil, coreModel.BadRequest(err) + } + + buildLogParsingProcessors(c, parsingProcessors) + + p, err := getOtelPipelinFromConfig(c) + if err != nil { + return nil, coreModel.BadRequest(err) + } + if p.Pipelines.Logs == nil { + return nil, coreModel.InternalError(fmt.Errorf( + "logs pipeline doesn't exist", + )) + } + + // build the new processor list + updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, parsingProcessorsNames) + p.Pipelines.Logs.Processors = updatedProcessorList + + // add the new processor to the data ( no checks required as the keys will exists) + c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs + + updatedConf, err := yaml.Parser().Marshal(c) + if err != nil { + return nil, coreModel.BadRequest(err) + } + + return updatedConf, nil +} diff --git a/pkg/query-service/app/opamp/logspipeline_test.go b/pkg/query-service/app/logparsingpipeline/collector_config_test.go similarity index 99% rename from pkg/query-service/app/opamp/logspipeline_test.go rename to pkg/query-service/app/logparsingpipeline/collector_config_test.go index eef08870dd..8ef79875d5 100644 --- a/pkg/query-service/app/opamp/logspipeline_test.go +++ b/pkg/query-service/app/logparsingpipeline/collector_config_test.go @@ -1,4 +1,4 @@ -package opamp +package logparsingpipeline import ( "fmt" diff --git a/pkg/query-service/app/logparsingpipeline/controller.go b/pkg/query-service/app/logparsingpipeline/controller.go index fc10047c36..72b6c6b76e 100644 --- a/pkg/query-service/app/logparsingpipeline/controller.go +++ b/pkg/query-service/app/logparsingpipeline/controller.go @@ -10,6 +10,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/agentConf" "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/model" + "go.uber.org/multierr" "go.uber.org/zap" ) @@ -72,15 +73,6 @@ func (ic *LogParsingPipelineController) ApplyPipelines( } - // prepare filter config (processor) from the pipelines - filterConfig, names, translationErr := PreparePipelineProcessor(pipelines) - if translationErr != nil { - zap.S().Errorf("failed to generate processor config from pipelines for deployment %w", translationErr) - return nil, model.BadRequest(errors.Wrap( - translationErr, "failed to generate processor config from pipelines for deployment", - )) - } - if !agentConf.Ready() { return nil, model.UnavailableError(fmt.Errorf( "agent updater unavailable at the moment. Please try in sometime", @@ -99,12 +91,6 @@ func (ic *LogParsingPipelineController) ApplyPipelines( return nil, err } - zap.S().Info("applying drop pipeline config", cfg) - // raw pipeline is needed since filterConfig doesn't contain inactive pipelines and operators - rawPipelineData, _ := json.Marshal(pipelines) - - // queue up the config to push to opamp - err = agentConf.UpsertLogParsingProcessor(ctx, cfg.Version, rawPipelineData, filterConfig, names) history, _ := agentConf.GetConfigHistory(ctx, agentConf.ElementTypeLogPipelines, 10) insertedCfg, _ := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, cfg.Version) @@ -166,3 +152,46 @@ func (ic *LogParsingPipelineController) PreviewLogsPipelines( OutputLogs: result, }, nil } + +// Implements agentConf.AgentFeature interface. +func (pc *LogParsingPipelineController) AgentFeatureType() agentConf.AgentFeatureType { + return LogPipelinesFeatureType +} + +// Implements agentConf.AgentFeature interface. +func (pc *LogParsingPipelineController) RecommendAgentConfig( + currentConfYaml []byte, + configVersion *agentConf.ConfigVersion, +) ( + recommendedConfYaml []byte, + serializedSettingsUsed string, + apiErr *model.ApiError, +) { + + pipelines, errs := pc.getPipelinesByVersion( + context.Background(), configVersion.Version, + ) + if len(errs) > 0 { + return nil, "", model.InternalError(multierr.Combine(errs...)) + } + + processors, procNames, err := PreparePipelineProcessor(pipelines) + if err != nil { + return nil, "", model.BadRequest(errors.Wrap(err, "could not prepare otel collector processors for log pipelines")) + } + + updatedConf, apiErr := GenerateCollectorConfigWithPipelines( + currentConfYaml, processors, procNames, + ) + if apiErr != nil { + return nil, "", model.WrapApiError(apiErr, "could not marshal yaml for updated conf") + } + + rawPipelineData, err := json.Marshal(pipelines) + if err != nil { + return nil, "", model.BadRequest(errors.Wrap(err, "could not serialize pipelines to JSON")) + } + + return updatedConf, string(rawPipelineData), nil + +} diff --git a/pkg/query-service/app/opamp/config_provider_test.go b/pkg/query-service/app/opamp/config_provider_test.go index 083c396a41..6718ff1581 100644 --- a/pkg/query-service/app/opamp/config_provider_test.go +++ b/pkg/query-service/app/opamp/config_provider_test.go @@ -2,8 +2,6 @@ package opamp import ( "fmt" - "log" - "net" "os" "testing" @@ -137,6 +135,21 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) { ) require.False(tb.testConfigProvider.ReportedDeploymentStatuses[expectedConfId][agent2Id]) + lastAgent1Msg = agent1Conn.LatestMsgFromServer() + agent1Conn.ClearMsgsFromServer() + response := tb.opampServer.OnMessage(agent1Conn, &protobufs.AgentToServer{ + InstanceUid: agent1Id, + RemoteConfigStatus: &protobufs.RemoteConfigStatus{ + Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED, + LastRemoteConfigHash: lastAgent1Msg.RemoteConfig.ConfigHash, + }, + }) + require.Nil(response.RemoteConfig) + require.Nil( + agent1Conn.LatestMsgFromServer(), + "server should not recommend a config if agent is reporting back with status on a broadcasted config", + ) + require.Equal(1, len(tb.testConfigProvider.ConfigUpdateSubscribers)) tb.opampServer.Stop() require.Equal( @@ -242,15 +255,3 @@ func initialAgentConf() *protobufs.AgentConfigMap { `), ) } - -// Brought in from https://github.com/open-telemetry/opamp-go/blob/main/internal/testhelpers/nethelpers.go -func GetAvailableLocalAddress() string { - ln, err := net.Listen("tcp", "127.0.0.1:") - if err != nil { - log.Fatalf("failed to get a free local port: %v", err) - } - // There is a possible race if something else takes this same port before - // the test uses it, however, that is unlikely in practice. - defer ln.Close() - return ln.Addr().String() -} diff --git a/pkg/query-service/app/opamp/mocks.go b/pkg/query-service/app/opamp/mocks.go index 705fe38bcf..12e9410989 100644 --- a/pkg/query-service/app/opamp/mocks.go +++ b/pkg/query-service/app/opamp/mocks.go @@ -2,6 +2,7 @@ package opamp import ( "context" + "log" "net" "github.com/google/uuid" @@ -132,3 +133,15 @@ func (ta *MockAgentConfigProvider) NotifySubscribersOfChange() { callback() } } + +// Brought in from https://github.com/open-telemetry/opamp-go/blob/main/internal/testhelpers/nethelpers.go +func GetAvailableLocalAddress() string { + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + log.Fatalf("failed to get a free local port: %v", err) + } + // There is a possible race if something else takes this same port before + // the test uses it, however, that is unlikely in practice. + defer ln.Close() + return ln.Addr().String() +} diff --git a/pkg/query-service/app/opamp/model/agents.go b/pkg/query-service/app/opamp/model/agents.go index e835ee8ccc..50a554b957 100644 --- a/pkg/query-service/app/opamp/model/agents.go +++ b/pkg/query-service/app/opamp/model/agents.go @@ -142,18 +142,24 @@ func (agents *Agents) RecommendLatestConfigToAll( return nil } - agent.SendToAgent(&protobufs.ServerToAgent{ - RemoteConfig: &protobufs.AgentRemoteConfig{ - Config: &protobufs.AgentConfigMap{ - ConfigMap: map[string]*protobufs.AgentConfigFile{ - CollectorConfigFilename: { - Body: newConfig, - ContentType: "application/x-yaml", - }, + newRemoteConfig := &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + CollectorConfigFilename: { + Body: newConfig, + ContentType: "application/x-yaml", }, }, - ConfigHash: []byte(confId), }, + ConfigHash: []byte(confId), + } + + agent.mux.Lock() + defer agent.mux.Unlock() + agent.remoteConfig = newRemoteConfig + + agent.SendToAgent(&protobufs.ServerToAgent{ + RemoteConfig: newRemoteConfig, }) ListenToConfigUpdate(agent.ID, confId, provider.ReportConfigDeploymentStatus) diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 6e3f267491..08fb4e7850 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -203,14 +203,19 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } - if err := agentConf.Initiate(localDB, "sqlite"); err != nil { + agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{ + DB: localDB, + DBEngine: "sqlite", + AgentFeatures: []agentConf.AgentFeature{ + logParsingPipelineController, + }, + }) + if err != nil { return nil, err } - // TODO(Raj): Replace this with actual provider in a follow up PR - agentConfigProvider := opamp.NewMockAgentConfigProvider() s.opampServer = opamp.InitializeServer( - &opAmpModel.AllAgents, agentConfigProvider, + &opAmpModel.AllAgents, agentConfMgr, ) return s, nil diff --git a/pkg/query-service/tests/integration/logparsingpipeline_test.go b/pkg/query-service/tests/integration/logparsingpipeline_test.go index 0a1105e63c..2e1b70c1db 100644 --- a/pkg/query-service/tests/integration/logparsingpipeline_test.go +++ b/pkg/query-service/tests/integration/logparsingpipeline_test.go @@ -12,6 +12,7 @@ import ( "strings" "testing" + "github.com/google/uuid" "github.com/gorilla/mux" "github.com/jmoiron/sqlx" "github.com/knadh/koanf/parsers/yaml" @@ -108,6 +109,7 @@ func TestLogPipelinesLifecycle(t *testing.T) { t, postablePipelines, createPipelinesResp, ) testbed.assertPipelinesSentToOpampClient(createPipelinesResp.Pipelines) + testbed.assertNewAgentGetsPipelinesOnConnection(createPipelinesResp.Pipelines) // Should be able to get the configured pipelines. getPipelinesResp = testbed.GetPipelinesFromQS() @@ -133,7 +135,8 @@ func TestLogPipelinesLifecycle(t *testing.T) { t, postablePipelines, getPipelinesResp, ) assert.Equal( - getPipelinesResp.History[0].DeployStatus, agentConf.Deployed, + agentConf.Deployed, + getPipelinesResp.History[0].DeployStatus, "pipeline deployment should be complete after acknowledgment from opamp client", ) @@ -144,6 +147,7 @@ func TestLogPipelinesLifecycle(t *testing.T) { t, postablePipelines, updatePipelinesResp, ) testbed.assertPipelinesSentToOpampClient(updatePipelinesResp.Pipelines) + testbed.assertNewAgentGetsPipelinesOnConnection(updatePipelinesResp.Pipelines) assert.Equal( 2, len(updatePipelinesResp.History), @@ -162,7 +166,8 @@ func TestLogPipelinesLifecycle(t *testing.T) { t, postablePipelines, getPipelinesResp, ) assert.Equal( - getPipelinesResp.History[0].DeployStatus, agentConf.Deployed, + agentConf.Deployed, + getPipelinesResp.History[0].DeployStatus, "deployment for latest pipeline config should be complete after acknowledgment from opamp client", ) } @@ -332,10 +337,7 @@ func NewLogPipelinesTestBed(t *testing.T) *LogPipelinesTestBed { t.Fatalf("could not create a new ApiHandler: %v", err) } - opampServer, clientConn, err := mockOpampAgent(testDBFilePath) - if err != nil { - t.Fatalf("could not create opamp server and mock client connection: %v", err) - } + opampServer, clientConn := mockOpampAgent(t, testDBFilePath, controller) user, apiErr := createTestUser() if apiErr != nil { @@ -447,16 +449,26 @@ func (tb *LogPipelinesTestBed) assertPipelinesSentToOpampClient( pipelines []logparsingpipeline.Pipeline, ) { lastMsg := tb.opampClientConn.LatestMsgFromServer() - collectorConfigFiles := lastMsg.RemoteConfig.Config.ConfigMap + assertPipelinesRecommendedInRemoteConfig( + tb.t, lastMsg, pipelines, + ) +} + +func assertPipelinesRecommendedInRemoteConfig( + t *testing.T, + msg *protobufs.ServerToAgent, + pipelines []logparsingpipeline.Pipeline, +) { + collectorConfigFiles := msg.RemoteConfig.Config.ConfigMap assert.Equal( - tb.t, len(collectorConfigFiles), 1, + t, len(collectorConfigFiles), 1, "otel config sent to client is expected to contain atleast 1 file", ) collectorConfigYaml := maps.Values(collectorConfigFiles)[0].Body collectorConfSentToClient, err := yaml.Parser().Unmarshal(collectorConfigYaml) if err != nil { - tb.t.Fatalf("could not unmarshal config file sent to opamp client: %v", err) + t.Fatalf("could not unmarshal config file sent to opamp client: %v", err) } // Each pipeline is expected to become its own processor @@ -477,14 +489,14 @@ func (tb *LogPipelinesTestBed) assertPipelinesSentToOpampClient( _, expectedLogProcessorNames, err := logparsingpipeline.PreparePipelineProcessor(pipelines) assert.Equal( - tb.t, expectedLogProcessorNames, collectorConfLogsPipelineProcNames, + t, expectedLogProcessorNames, collectorConfLogsPipelineProcNames, "config sent to opamp client doesn't contain expected log pipelines", ) collectorConfProcessors := collectorConfSentToClient["processors"].(map[string]interface{}) for _, procName := range expectedLogProcessorNames { pipelineProcessorInConf, procExists := collectorConfProcessors[procName] - assert.True(tb.t, procExists, fmt.Sprintf( + assert.True(t, procExists, fmt.Sprintf( "%s processor not found in config sent to opamp client", procName, )) @@ -497,7 +509,7 @@ func (tb *LogPipelinesTestBed) assertPipelinesSentToOpampClient( pipelineProcOps, func(op interface{}) bool { return op.(map[string]interface{})["id"] == "router_signoz" }, ) - require.GreaterOrEqual(tb.t, routerOpIdx, 0) + require.GreaterOrEqual(t, routerOpIdx, 0) routerOproutes := pipelineProcOps[routerOpIdx].(map[string]interface{})["routes"].([]interface{}) pipelineFilterExpr := routerOproutes[0].(map[string]interface{})["expr"].(string) @@ -507,10 +519,10 @@ func (tb *LogPipelinesTestBed) assertPipelinesSentToOpampClient( return logparsingpipeline.CollectorConfProcessorName(p) == procName }, ) - require.GreaterOrEqual(tb.t, pipelineIdx, 0) + require.GreaterOrEqual(t, pipelineIdx, 0) expectedExpr, err := queryBuilderToExpr.Parse(pipelines[pipelineIdx].Filter) - require.Nil(tb.t, err) - require.Equal(tb.t, expectedExpr, pipelineFilterExpr) + require.Nil(t, err) + require.Equal(t, expectedExpr, pipelineFilterExpr) } } @@ -528,6 +540,26 @@ func (tb *LogPipelinesTestBed) simulateOpampClientAcknowledgementForLatestConfig }) } +func (tb *LogPipelinesTestBed) assertNewAgentGetsPipelinesOnConnection( + pipelines []logparsingpipeline.Pipeline, +) { + newAgentConn := &opamp.MockOpAmpConnection{} + tb.opampServer.OnMessage( + newAgentConn, + &protobufs.AgentToServer{ + InstanceUid: uuid.NewString(), + EffectiveConfig: &protobufs.EffectiveConfig{ + ConfigMap: newInitialAgentConfigMap(), + }, + }, + ) + latestMsgFromServer := newAgentConn.LatestMsgFromServer() + require.NotNil(tb.t, latestMsgFromServer) + assertPipelinesRecommendedInRemoteConfig( + tb.t, latestMsgFromServer, pipelines, + ) +} + func unmarshalPipelinesResponse(apiResponse *app.ApiResponse) ( *logparsingpipeline.PipelinesResponse, error, @@ -563,58 +595,74 @@ func assertPipelinesResponseMatchesPostedPipelines( } } -func mockOpampAgent(testDBFilePath string) (*opamp.Server, *opamp.MockOpAmpConnection, error) { +func mockOpampAgent( + t *testing.T, + testDBFilePath string, + pipelinesController *logparsingpipeline.LogParsingPipelineController, +) (*opamp.Server, *opamp.MockOpAmpConnection) { // Mock an available opamp agent testDB, err := opampModel.InitDB(testDBFilePath) - if err != nil { - return nil, nil, err - } - err = agentConf.Initiate(testDB, "sqlite") - if err != nil { - return nil, nil, err - } + require.Nil(t, err, "failed to init opamp model") + + agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{ + DB: testDB, + DBEngine: "sqlite", + AgentFeatures: []agentConf.AgentFeature{pipelinesController}, + }) + require.Nil(t, err, "failed to init agentConf") + + opampServer := opamp.InitializeServer(nil, agentConfMgr) + err = opampServer.Start(opamp.GetAvailableLocalAddress()) + require.Nil(t, err, "failed to start opamp server") + + t.Cleanup(func() { + opampServer.Stop() + }) - opampServer := opamp.InitializeServer(nil, opamp.NewMockAgentConfigProvider()) opampClientConnection := &opamp.MockOpAmpConnection{} opampServer.OnMessage( opampClientConnection, &protobufs.AgentToServer{ InstanceUid: "test", EffectiveConfig: &protobufs.EffectiveConfig{ - ConfigMap: &protobufs.AgentConfigMap{ - ConfigMap: map[string]*protobufs.AgentConfigFile{ - "otel-collector.yaml": { - Body: []byte(` - receivers: - otlp: - protocols: - grpc: - endpoint: 0.0.0.0:4317 - http: - endpoint: 0.0.0.0:4318 - processors: - batch: - send_batch_size: 10000 - send_batch_max_size: 11000 - timeout: 10s - exporters: - otlp: - endpoint: otelcol2:4317 - service: - pipelines: - logs: - receivers: [otlp] - processors: [batch] - exporters: [otlp] - `), - ContentType: "text/yaml", - }, - }, - }, + ConfigMap: newInitialAgentConfigMap(), }, }, ) - return opampServer, opampClientConnection, nil + return opampServer, opampClientConnection +} + +func newInitialAgentConfigMap() *protobufs.AgentConfigMap { + return &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "otel-collector.yaml": { + Body: []byte(` + receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + processors: + batch: + send_batch_size: 10000 + send_batch_max_size: 11000 + timeout: 10s + exporters: + otlp: + endpoint: otelcol2:4317 + service: + pipelines: + logs: + receivers: [otlp] + processors: [batch] + exporters: [otlp] + `), + ContentType: "text/yaml", + }, + }, + } } func createTestUser() (*model.User, *model.ApiError) {