From cb155a11728984c49cfc0a33070bd3398ca7df59 Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+rkssisodiya@users.noreply.github.com> Date: Sat, 14 Oct 2023 09:16:14 +0530 Subject: [PATCH] feat: opamp server with agent config provider (#3737) * feat: add interface for opamp.AgentConfigProvider * feat: add iface and plumbing for generating recommended conf in opamp/agent * feat: get opamp server config provider tests started * chore: add test scenario for agent connection without a config recommendation * chore: add test scenario for agent connection with a config recommendation * chore: add test for validating config deployment status gets reported * chore: add test for rolling out latest config recommendations when config changes * chore: wrap up opamp server lifecycle tests * chore: some tests cleanup * chore: get all tests passing * chore: update opamp server init logic in ee query service * chore: some cleanup * chore: some final cleanup --- ee/query-service/app/server.go | 12 +- .../app/opamp/config_provider.go | 12 + .../app/opamp/config_provider_test.go | 256 ++++++++++++++++++ pkg/query-service/app/opamp/mocks.go | 134 +++++++++ pkg/query-service/app/opamp/model/agent.go | 63 +++-- pkg/query-service/app/opamp/model/agents.go | 45 +++ pkg/query-service/app/opamp/model/config.go | 20 ++ .../app/opamp/model/constants.go | 4 + pkg/query-service/app/opamp/opamp_server.go | 46 +++- pkg/query-service/app/server.go | 13 +- .../integration/logparsingpipeline_test.go | 46 +--- 11 files changed, 575 insertions(+), 76 deletions(-) create mode 100644 pkg/query-service/app/opamp/config_provider.go create mode 100644 pkg/query-service/app/opamp/config_provider_test.go create mode 100644 pkg/query-service/app/opamp/mocks.go create mode 100644 pkg/query-service/app/opamp/model/config.go create mode 100644 pkg/query-service/app/opamp/model/constants.go diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 834575643b..5dda1e7237 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -88,6 +88,8 @@ type Server struct { // Usage manager usageManager *usage.Manager + opampServer *opamp.Server + unavailableChannel chan healthcheck.Status } @@ -254,6 +256,12 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { s.privateHTTP = privateServer + // TODO(Raj): Replace this with actual provider in a follow up PR + agentConfigProvider := opamp.NewMockAgentConfigProvider() + s.opampServer = opamp.InitializeServer( + &opAmpModel.AllAgents, agentConfigProvider, + ) + return s, nil } @@ -569,7 +577,7 @@ func (s *Server) Start() error { go func() { zap.S().Info("Starting OpAmp Websocket server", zap.String("addr", baseconst.OpAmpWsEndpoint)) - err := opamp.InitializeAndStartServer(baseconst.OpAmpWsEndpoint, &opAmpModel.AllAgents) + err := s.opampServer.Start(baseconst.OpAmpWsEndpoint) if err != nil { zap.S().Info("opamp ws server failed to start", err) s.unavailableChannel <- healthcheck.Unavailable @@ -592,7 +600,7 @@ func (s *Server) Stop() error { } } - opamp.StopServer() + s.opampServer.Stop() if s.ruleManager != nil { s.ruleManager.Stop() diff --git a/pkg/query-service/app/opamp/config_provider.go b/pkg/query-service/app/opamp/config_provider.go new file mode 100644 index 0000000000..0978890cb1 --- /dev/null +++ b/pkg/query-service/app/opamp/config_provider.go @@ -0,0 +1,12 @@ +package opamp + +import "go.signoz.io/signoz/pkg/query-service/app/opamp/model" + +// Interface for a source of otel collector config recommendations. +type AgentConfigProvider interface { + model.AgentConfigProvider + + // Subscribe to be notified on changes in config provided by this source. + // Used for rolling out latest config recommendation to all connected agents when settings change + SubscribeToConfigUpdates(callback func()) (unsubscribe func()) +} diff --git a/pkg/query-service/app/opamp/config_provider_test.go b/pkg/query-service/app/opamp/config_provider_test.go new file mode 100644 index 0000000000..083c396a41 --- /dev/null +++ b/pkg/query-service/app/opamp/config_provider_test.go @@ -0,0 +1,256 @@ +package opamp + +import ( + "fmt" + "log" + "net" + "os" + "testing" + + "github.com/knadh/koanf" + "github.com/knadh/koanf/parsers/yaml" + "github.com/knadh/koanf/providers/rawbytes" + _ "github.com/mattn/go-sqlite3" + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "go.signoz.io/signoz/pkg/query-service/app/opamp/model" + "golang.org/x/exp/maps" +) + +func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) { + require := require.New(t) + + tb := newTestbed(t) + + require.Equal( + 0, len(tb.testConfigProvider.ConfigUpdateSubscribers), + "there should be no agent config subscribers at the start", + ) + tb.StartServer() + require.Equal( + 1, len(tb.testConfigProvider.ConfigUpdateSubscribers), + "Opamp server should have subscribed to updates from config provider after being started", + ) + + // Server should always respond with a RemoteConfig when an agent connects. + // Even if there are no recommended changes to the agent's initial config + require.False(tb.testConfigProvider.HasRecommendations()) + agent1Conn := &MockOpAmpConnection{} + agent1Id := "testAgent1" + tb.opampServer.OnMessage( + agent1Conn, + &protobufs.AgentToServer{ + InstanceUid: agent1Id, + EffectiveConfig: &protobufs.EffectiveConfig{ + ConfigMap: initialAgentConf(), + }, + }, + ) + lastAgent1Msg := agent1Conn.LatestMsgFromServer() + require.NotNil( + lastAgent1Msg, + "Server should always send a remote config to the agent when it connects", + ) + require.Equal( + RemoteConfigBody(lastAgent1Msg), + string(initialAgentConf().ConfigMap[model.CollectorConfigFilename].Body), + ) + + tb.testConfigProvider.ZPagesEndpoint = "localhost:55555" + require.True(tb.testConfigProvider.HasRecommendations()) + agent2Id := "testAgent2" + agent2Conn := &MockOpAmpConnection{} + tb.opampServer.OnMessage( + agent2Conn, + &protobufs.AgentToServer{ + InstanceUid: agent2Id, + EffectiveConfig: &protobufs.EffectiveConfig{ + ConfigMap: initialAgentConf(), + }, + }, + ) + lastAgent2Msg := agent2Conn.LatestMsgFromServer() + require.NotNil( + lastAgent2Msg, + "server should recommend a config to agent when it connects", + ) + + recommendedEndpoint, err := GetStringValueFromYaml( + []byte(RemoteConfigBody(lastAgent2Msg)), "extensions.zpages.endpoint", + ) + require.Nil(err) + require.Equal( + tb.testConfigProvider.ZPagesEndpoint, recommendedEndpoint, + "server should send recommended config to agent when it connects", + ) + + agent2Conn.ClearMsgsFromServer() + tb.opampServer.OnMessage(agent2Conn, &protobufs.AgentToServer{ + InstanceUid: agent2Id, + EffectiveConfig: &protobufs.EffectiveConfig{ + ConfigMap: NewAgentConfigMap( + []byte(RemoteConfigBody(lastAgent2Msg)), + ), + }, + RemoteConfigStatus: &protobufs.RemoteConfigStatus{ + Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED, + LastRemoteConfigHash: lastAgent2Msg.RemoteConfig.ConfigHash, + }, + }) + expectedConfId := tb.testConfigProvider.ZPagesEndpoint + require.True(tb.testConfigProvider.HasReportedDeploymentStatus(expectedConfId, agent2Id), + "Server should report deployment success to config provider on receiving update from agent.", + ) + require.True(tb.testConfigProvider.ReportedDeploymentStatuses[expectedConfId][agent2Id]) + require.Nil( + agent2Conn.LatestMsgFromServer(), + "Server should not recommend a RemoteConfig if agent is already running it.", + ) + + // Server should rollout latest config to all agents when notified of a change by config provider + agent1Conn.ClearMsgsFromServer() + agent2Conn.ClearMsgsFromServer() + tb.testConfigProvider.ZPagesEndpoint = "localhost:66666" + tb.testConfigProvider.NotifySubscribersOfChange() + for _, agentConn := range []*MockOpAmpConnection{agent1Conn, agent2Conn} { + lastMsg := agentConn.LatestMsgFromServer() + + recommendedEndpoint, err := GetStringValueFromYaml( + []byte(RemoteConfigBody(lastMsg)), "extensions.zpages.endpoint", + ) + require.Nil(err) + require.Equal(tb.testConfigProvider.ZPagesEndpoint, recommendedEndpoint) + } + + lastAgent2Msg = agent2Conn.LatestMsgFromServer() + tb.opampServer.OnMessage(agent2Conn, &protobufs.AgentToServer{ + InstanceUid: agent2Id, + RemoteConfigStatus: &protobufs.RemoteConfigStatus{ + Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, + LastRemoteConfigHash: lastAgent2Msg.RemoteConfig.ConfigHash, + }, + }) + expectedConfId = tb.testConfigProvider.ZPagesEndpoint + require.True(tb.testConfigProvider.HasReportedDeploymentStatus(expectedConfId, agent2Id), + "Server should report deployment failure to config provider on receiving update from agent.", + ) + require.False(tb.testConfigProvider.ReportedDeploymentStatuses[expectedConfId][agent2Id]) + + require.Equal(1, len(tb.testConfigProvider.ConfigUpdateSubscribers)) + tb.opampServer.Stop() + require.Equal( + 0, len(tb.testConfigProvider.ConfigUpdateSubscribers), + "Opamp server should have unsubscribed to config provider updates after shutdown", + ) +} + +type testbed struct { + testConfigProvider *MockAgentConfigProvider + opampServer *Server + t *testing.T +} + +func newTestbed(t *testing.T) *testbed { + // Init opamp model. + testDBFile, err := os.CreateTemp("", "test-signoz-db-*") + if err != nil { + t.Fatalf("could not create temp file for test db: %v", err) + } + testDBFilePath := testDBFile.Name() + t.Cleanup(func() { os.Remove(testDBFilePath) }) + testDBFile.Close() + + _, err = model.InitDB(testDBFilePath) + if err != nil { + t.Fatalf("could not init opamp model: %v", err) + } + + testConfigProvider := NewMockAgentConfigProvider() + opampServer := InitializeServer(nil, testConfigProvider) + + return &testbed{ + testConfigProvider: testConfigProvider, + opampServer: opampServer, + t: t, + } +} + +func (tb *testbed) StartServer() { + testListenPath := GetAvailableLocalAddress() + err := tb.opampServer.Start(testListenPath) + require.Nil(tb.t, err, "should be able to start opamp server") +} + +// Test helper +func GetStringValueFromYaml( + serializedYaml []byte, path string, +) (string, error) { + if len(serializedYaml) < 1 { + return "", fmt.Errorf("yaml data is empty") + } + + k := koanf.New(".") + err := k.Load(rawbytes.Provider(serializedYaml), yaml.Parser()) + if err != nil { + return "", errors.Wrap(err, "could not unmarshal collector config") + } + + return k.String("extensions.zpages.endpoint"), nil +} + +// Returns body of a ServerToAgent.RemoteConfig or "" +func RemoteConfigBody(msg *protobufs.ServerToAgent) string { + if msg == nil { + return "" + } + + collectorConfFiles := msg.RemoteConfig.Config.ConfigMap + if len(collectorConfFiles) < 1 { + return "" + } + return string(maps.Values(collectorConfFiles)[0].Body) +} + +func NewAgentConfigMap(body []byte) *protobufs.AgentConfigMap { + return &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + model.CollectorConfigFilename: { + Body: body, + ContentType: "text/yaml", + }, + }, + } + +} + +func initialAgentConf() *protobufs.AgentConfigMap { + return NewAgentConfigMap( + []byte(` + receivers: + otlp: + processors: + batch: + exporters: + otlp: + service: + pipelines: + logs: + receivers: [otlp] + processors: [batch] + exporters: [otlp] + `), + ) +} + +// Brought in from https://github.com/open-telemetry/opamp-go/blob/main/internal/testhelpers/nethelpers.go +func GetAvailableLocalAddress() string { + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + log.Fatalf("failed to get a free local port: %v", err) + } + // There is a possible race if something else takes this same port before + // the test uses it, however, that is unlikely in practice. + defer ln.Close() + return ln.Addr().String() +} diff --git a/pkg/query-service/app/opamp/mocks.go b/pkg/query-service/app/opamp/mocks.go new file mode 100644 index 0000000000..705fe38bcf --- /dev/null +++ b/pkg/query-service/app/opamp/mocks.go @@ -0,0 +1,134 @@ +package opamp + +import ( + "context" + "net" + + "github.com/google/uuid" + "github.com/knadh/koanf" + "github.com/knadh/koanf/parsers/yaml" + "github.com/knadh/koanf/providers/rawbytes" + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/pkg/errors" +) + +type MockOpAmpConnection struct { + ServerToAgentMsgs []*protobufs.ServerToAgent +} + +func (conn *MockOpAmpConnection) Send(ctx context.Context, msg *protobufs.ServerToAgent) error { + conn.ServerToAgentMsgs = append(conn.ServerToAgentMsgs, msg) + return nil +} + +func (conn *MockOpAmpConnection) LatestMsgFromServer() *protobufs.ServerToAgent { + if len(conn.ServerToAgentMsgs) < 1 { + return nil + } + return conn.ServerToAgentMsgs[len(conn.ServerToAgentMsgs)-1] +} + +func (conn *MockOpAmpConnection) ClearMsgsFromServer() []*protobufs.ServerToAgent { + msgs := conn.ServerToAgentMsgs + conn.ServerToAgentMsgs = []*protobufs.ServerToAgent{} + return msgs +} + +func (conn *MockOpAmpConnection) Disconnect() error { + return nil +} +func (conn *MockOpAmpConnection) RemoteAddr() net.Addr { + return nil +} + +// Implements opamp.AgentConfigProvider +type MockAgentConfigProvider struct { + // An updated config is recommended by TestAgentConfProvider + // if `ZPagesEndpoint` is not empty + ZPagesEndpoint string + + ConfigUpdateSubscribers map[string]func() + + // { configId: { agentId: isOk } } + ReportedDeploymentStatuses map[string]map[string]bool +} + +func NewMockAgentConfigProvider() *MockAgentConfigProvider { + return &MockAgentConfigProvider{ + ConfigUpdateSubscribers: map[string]func(){}, + ReportedDeploymentStatuses: map[string]map[string]bool{}, + } +} + +// Test helper. +func (ta *MockAgentConfigProvider) HasRecommendations() bool { + return len(ta.ZPagesEndpoint) > 0 +} + +// AgentConfigProvider interface +func (ta *MockAgentConfigProvider) RecommendAgentConfig(baseConfYaml []byte) ( + []byte, string, error, +) { + if len(ta.ZPagesEndpoint) < 1 { + return baseConfYaml, "agent-base-config", nil + } + + k := koanf.New(".") + err := k.Load(rawbytes.Provider(baseConfYaml), yaml.Parser()) + if err != nil { + return nil, "", errors.Wrap(err, "could not unmarshal baseConf") + } + + k.Set("extensions.zpages.endpoint", ta.ZPagesEndpoint) + recommendedYaml, err := k.Marshal(yaml.Parser()) + if err != nil { + return nil, "", errors.Wrap(err, "could not marshal recommended conf") + } + + confId := ta.ZPagesEndpoint + return recommendedYaml, confId, nil +} + +// AgentConfigProvider interface +func (ta *MockAgentConfigProvider) ReportConfigDeploymentStatus( + agentId string, + configId string, + err error, +) { + confIdReports := ta.ReportedDeploymentStatuses[configId] + if confIdReports == nil { + confIdReports = map[string]bool{} + ta.ReportedDeploymentStatuses[configId] = confIdReports + } + + confIdReports[agentId] = (err == nil) +} + +// Test helper. +func (ta *MockAgentConfigProvider) HasReportedDeploymentStatus( + configId string, agentId string, +) bool { + confIdReports := ta.ReportedDeploymentStatuses[configId] + if confIdReports == nil { + return false + } + _, exists := confIdReports[agentId] + return exists +} + +// AgentConfigProvider interface +func (ta *MockAgentConfigProvider) SubscribeToConfigUpdates(callback func()) func() { + subscriberId := uuid.NewString() + ta.ConfigUpdateSubscribers[subscriberId] = callback + + return func() { + delete(ta.ConfigUpdateSubscribers, subscriberId) + } +} + +// test helper. +func (ta *MockAgentConfigProvider) NotifySubscribersOfChange() { + for _, callback := range ta.ConfigUpdateSubscribers { + callback() + } +} diff --git a/pkg/query-service/app/opamp/model/agent.go b/pkg/query-service/app/opamp/model/agent.go index ba2ecfcddc..a6f9dd66ef 100644 --- a/pkg/query-service/app/opamp/model/agent.go +++ b/pkg/query-service/app/opamp/model/agent.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "go.uber.org/zap" "google.golang.org/protobuf/proto" "github.com/open-telemetry/opamp-go/protobufs" @@ -72,12 +73,6 @@ func (agent *Agent) Upsert() error { return nil } -func (agent *Agent) UpdateStatus(statusMsg *protobufs.AgentToServer, response *protobufs.ServerToAgent) { - agent.mux.Lock() - defer agent.mux.Unlock() - agent.processStatusUpdate(statusMsg, response) -} - // extracts lb exporter support flag from agent description. the flag // is used to decide if lb exporter can be enabled on the agent. func ExtractLbFlag(agentDescr *protobufs.AgentDescription) bool { @@ -208,9 +203,20 @@ func (agent *Agent) hasCapability(capability protobufs.AgentCapabilities) bool { return agent.Status.Capabilities&uint64(capability) != 0 } +func (agent *Agent) UpdateStatus( + statusMsg *protobufs.AgentToServer, + response *protobufs.ServerToAgent, + configProvider AgentConfigProvider, +) { + agent.mux.Lock() + defer agent.mux.Unlock() + agent.processStatusUpdate(statusMsg, response, configProvider) +} + func (agent *Agent) processStatusUpdate( newStatus *protobufs.AgentToServer, response *protobufs.ServerToAgent, + configProvider AgentConfigProvider, ) { // We don't have any status for this Agent, or we lost the previous status update from the Agent, so our // current status is not up-to-date. @@ -237,12 +243,16 @@ func (agent *Agent) processStatusUpdate( response.Flags |= uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState) } + // This needs to be done before agent.updateRemoteConfig() to ensure it sees + // the latest value for agent.EffectiveConfig when generating a config recommendation + agent.updateEffectiveConfig(newStatus, response) + configChanged := false if agentDescrChanged { // Agent description is changed. // We need to recalculate the config. - configChanged = agent.updateRemoteConfig() + configChanged = agent.updateRemoteConfig(configProvider) } // If remote config is changed and different from what the Agent has then @@ -254,13 +264,21 @@ func (agent *Agent) processStatusUpdate( // does not have this config (hash is different). Send the new config the Agent. response.RemoteConfig = agent.remoteConfig agent.SendToAgent(response) - } - agent.updateEffectiveConfig(newStatus, response) + ListenToConfigUpdate( + agent.ID, + string(response.RemoteConfig.ConfigHash), + configProvider.ReportConfigDeploymentStatus, + ) + } } -func (agent *Agent) updateRemoteConfig() bool { - hash := sha256.New() +func (agent *Agent) updateRemoteConfig(configProvider AgentConfigProvider) bool { + recommendedConfig, confId, err := configProvider.RecommendAgentConfig([]byte(agent.EffectiveConfig)) + if err != nil { + zap.S().Errorf("could not generate config recommendation for agent %d: %w", agent.ID, err) + return false + } cfg := protobufs.AgentRemoteConfig{ Config: &protobufs.AgentConfigMap{ @@ -268,14 +286,25 @@ func (agent *Agent) updateRemoteConfig() bool { }, } - // Calculate the hash. - for k, v := range cfg.Config.ConfigMap { - hash.Write([]byte(k)) - hash.Write(v.Body) - hash.Write([]byte(v.ContentType)) + cfg.Config.ConfigMap[CollectorConfigFilename] = &protobufs.AgentConfigFile{ + Body: recommendedConfig, + ContentType: "application/x-yaml", } - cfg.ConfigHash = hash.Sum(nil) + if len(confId) < 1 { + // Should never happen. Handle gracefully if it does by some chance. + zap.S().Errorf("config provider recommended a config with empty confId. Using content hash for configId") + + hash := sha256.New() + for k, v := range cfg.Config.ConfigMap { + hash.Write([]byte(k)) + hash.Write(v.Body) + hash.Write([]byte(v.ContentType)) + } + cfg.ConfigHash = hash.Sum(nil) + } else { + cfg.ConfigHash = []byte(confId) + } configChanged := !isEqualRemoteConfig(agent.remoteConfig, &cfg) diff --git a/pkg/query-service/app/opamp/model/agents.go b/pkg/query-service/app/opamp/model/agents.go index 18faddb48b..e835ee8ccc 100644 --- a/pkg/query-service/app/opamp/model/agents.go +++ b/pkg/query-service/app/opamp/model/agents.go @@ -6,7 +6,10 @@ import ( "time" "github.com/jmoiron/sqlx" + "github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/server/types" + "github.com/pkg/errors" + "go.uber.org/zap" ) var db *sqlx.DB @@ -115,3 +118,45 @@ func (agents *Agents) GetAllAgents() []*Agent { } return allAgents } + +// Recommend latest config to connected agents whose effective +// config is not the same as the latest recommendation +func (agents *Agents) RecommendLatestConfigToAll( + provider AgentConfigProvider, +) error { + for _, agent := range agents.GetAllAgents() { + newConfig, confId, err := provider.RecommendAgentConfig( + []byte(agent.EffectiveConfig), + ) + if err != nil { + return errors.Wrap(err, fmt.Sprintf( + "could not generate conf recommendation for %v", agent.ID, + )) + } + + // Recommendation is same as current config + if string(newConfig) == agent.EffectiveConfig { + zap.S().Infof( + "Recommended config same as current effective config for agent %s", agent.ID, + ) + return nil + } + + agent.SendToAgent(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + CollectorConfigFilename: { + Body: newConfig, + ContentType: "application/x-yaml", + }, + }, + }, + ConfigHash: []byte(confId), + }, + }) + + ListenToConfigUpdate(agent.ID, confId, provider.ReportConfigDeploymentStatus) + } + return nil +} diff --git a/pkg/query-service/app/opamp/model/config.go b/pkg/query-service/app/opamp/model/config.go new file mode 100644 index 0000000000..026ef947c3 --- /dev/null +++ b/pkg/query-service/app/opamp/model/config.go @@ -0,0 +1,20 @@ +package model + +// Interface for source of otel collector config recommendations. +type AgentConfigProvider interface { + // Generate recommended config for an agent based on its `currentConfYaml` + // and current state of user facing settings for agent based features. + RecommendAgentConfig(currentConfYaml []byte) ( + recommendedConfYaml []byte, + // Opaque id of the recommended config, used for reporting deployment status updates + configId string, + err error, + ) + + // Report deployment status for config recommendations generated by RecommendAgentConfig + ReportConfigDeploymentStatus( + agentId string, + configId string, + err error, + ) +} diff --git a/pkg/query-service/app/opamp/model/constants.go b/pkg/query-service/app/opamp/model/constants.go new file mode 100644 index 0000000000..293922c424 --- /dev/null +++ b/pkg/query-service/app/opamp/model/constants.go @@ -0,0 +1,4 @@ +package model + +// Must match collectorConfigKey in https://github.com/SigNoz/signoz-otel-collector/blob/main/opamp/config_manager.go +const CollectorConfigFilename = "collector.yaml" diff --git a/pkg/query-service/app/opamp/opamp_server.go b/pkg/query-service/app/opamp/opamp_server.go index 201fd598c7..2a7ba4c6fa 100644 --- a/pkg/query-service/app/opamp/opamp_server.go +++ b/pkg/query-service/app/opamp/opamp_server.go @@ -18,35 +18,32 @@ type Server struct { agents *model.Agents logger *zap.Logger capabilities int32 + + agentConfigProvider AgentConfigProvider + + // cleanups to be run when stopping the server + cleanups []func() } const capabilities = protobufs.ServerCapabilities_ServerCapabilities_AcceptsEffectiveConfig | protobufs.ServerCapabilities_ServerCapabilities_OffersRemoteConfig | protobufs.ServerCapabilities_ServerCapabilities_AcceptsStatus -func InitializeServer(listener string, agents *model.Agents) *Server { +func InitializeServer( + agents *model.Agents, agentConfigProvider AgentConfigProvider, +) *Server { if agents == nil { agents = &model.AllAgents } opAmpServer = &Server{ - agents: agents, + agents: agents, + agentConfigProvider: agentConfigProvider, } opAmpServer.server = server.New(zap.S()) return opAmpServer } -func InitializeAndStartServer(listener string, agents *model.Agents) error { - InitializeServer(listener, agents) - return opAmpServer.Start(listener) -} - -func StopServer() { - if opAmpServer != nil { - opAmpServer.Stop() - } -} - func (srv *Server) Start(listener string) error { settings := server.StartSettings{ Settings: server.Settings{ @@ -58,10 +55,24 @@ func (srv *Server) Start(listener string) error { ListenEndpoint: listener, } + unsubscribe := srv.agentConfigProvider.SubscribeToConfigUpdates(func() { + err := srv.agents.RecommendLatestConfigToAll(srv.agentConfigProvider) + if err != nil { + zap.S().Errorf( + "could not roll out latest config recommendation to connected agents: %w", err, + ) + } + }) + srv.cleanups = append(srv.cleanups, unsubscribe) + return srv.server.Start(settings) } func (srv *Server) Stop() { + for _, cleanup := range srv.cleanups { + defer cleanup() + } + srv.server.Stop(context.Background()) } @@ -80,7 +91,12 @@ func (srv *Server) OnMessage(conn types.Connection, msg *protobufs.AgentToServer if created { agent.CanLB = model.ExtractLbFlag(msg.AgentDescription) - zap.S().Debugf("New agent added:", zap.Bool("canLb", agent.CanLB), zap.String("ID", agent.ID), zap.Any("status", agent.CurrentStatus)) + zap.S().Debugf( + "New agent added:", + zap.Bool("canLb", agent.CanLB), + zap.String("ID", agent.ID), + zap.Any("status", agent.CurrentStatus), + ) } var response *protobufs.ServerToAgent @@ -89,7 +105,7 @@ func (srv *Server) OnMessage(conn types.Connection, msg *protobufs.AgentToServer Capabilities: uint64(capabilities), } - agent.UpdateStatus(msg, response) + agent.UpdateStatus(msg, response, srv.agentConfigProvider) return response } diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 69f3a9367f..6e3f267491 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -76,6 +76,8 @@ type Server struct { privateConn net.Listener privateHTTP *http.Server + opampServer *opamp.Server + unavailableChannel chan healthcheck.Status } @@ -204,6 +206,13 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { if err := agentConf.Initiate(localDB, "sqlite"); err != nil { return nil, err } + + // TODO(Raj): Replace this with actual provider in a follow up PR + agentConfigProvider := opamp.NewMockAgentConfigProvider() + s.opampServer = opamp.InitializeServer( + &opAmpModel.AllAgents, agentConfigProvider, + ) + return s, nil } @@ -503,7 +512,7 @@ func (s *Server) Start() error { go func() { zap.S().Info("Starting OpAmp Websocket server", zap.String("addr", constants.OpAmpWsEndpoint)) - err := opamp.InitializeAndStartServer(constants.OpAmpWsEndpoint, &opAmpModel.AllAgents) + err := s.opampServer.Start(constants.OpAmpWsEndpoint) if err != nil { zap.S().Info("opamp ws server failed to start", err) s.unavailableChannel <- healthcheck.Unavailable @@ -526,7 +535,7 @@ func (s *Server) Stop() error { } } - opamp.StopServer() + s.opampServer.Stop() if s.ruleManager != nil { s.ruleManager.Stop() diff --git a/pkg/query-service/tests/integration/logparsingpipeline_test.go b/pkg/query-service/tests/integration/logparsingpipeline_test.go index 21bc02b1f8..0a1105e63c 100644 --- a/pkg/query-service/tests/integration/logparsingpipeline_test.go +++ b/pkg/query-service/tests/integration/logparsingpipeline_test.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "net" "net/http" "net/http/httptest" "os" @@ -299,7 +298,7 @@ type LogPipelinesTestBed struct { testUser *model.User apiHandler *app.APIHandler opampServer *opamp.Server - opampClientConn *mockOpAmpConnection + opampClientConn *opamp.MockOpAmpConnection } func NewLogPipelinesTestBed(t *testing.T) *LogPipelinesTestBed { @@ -447,7 +446,7 @@ func (tb *LogPipelinesTestBed) GetPipelinesFromQS() *logparsingpipeline.Pipeline func (tb *LogPipelinesTestBed) assertPipelinesSentToOpampClient( pipelines []logparsingpipeline.Pipeline, ) { - lastMsg := tb.opampClientConn.latestMsgFromServer() + lastMsg := tb.opampClientConn.LatestMsgFromServer() collectorConfigFiles := lastMsg.RemoteConfig.Config.ConfigMap assert.Equal( tb.t, len(collectorConfigFiles), 1, @@ -516,7 +515,7 @@ func (tb *LogPipelinesTestBed) assertPipelinesSentToOpampClient( } func (tb *LogPipelinesTestBed) simulateOpampClientAcknowledgementForLatestConfig() { - lastMsg := tb.opampClientConn.latestMsgFromServer() + lastMsg := tb.opampClientConn.LatestMsgFromServer() tb.opampServer.OnMessage(tb.opampClientConn, &protobufs.AgentToServer{ InstanceUid: "test", EffectiveConfig: &protobufs.EffectiveConfig{ @@ -564,7 +563,7 @@ func assertPipelinesResponseMatchesPostedPipelines( } } -func mockOpampAgent(testDBFilePath string) (*opamp.Server, *mockOpAmpConnection, error) { +func mockOpampAgent(testDBFilePath string) (*opamp.Server, *opamp.MockOpAmpConnection, error) { // Mock an available opamp agent testDB, err := opampModel.InitDB(testDBFilePath) if err != nil { @@ -575,8 +574,8 @@ func mockOpampAgent(testDBFilePath string) (*opamp.Server, *mockOpAmpConnection, return nil, nil, err } - opampServer := opamp.InitializeServer(constants.OpAmpWsEndpoint, nil) - opampClientConnection := &mockOpAmpConnection{} + opampServer := opamp.InitializeServer(nil, opamp.NewMockAgentConfigProvider()) + opampClientConnection := &opamp.MockOpAmpConnection{} opampServer.OnMessage( opampClientConnection, &protobufs.AgentToServer{ @@ -674,36 +673,3 @@ func NewAuthenticatedTestRequest( req.Header.Add("Authorization", "Bearer "+userJwt.AccessJwt) return req, nil } - -type mockOpAmpConnection struct { - serverToAgentMsgs []*protobufs.ServerToAgent -} - -func (conn *mockOpAmpConnection) Send(ctx context.Context, msg *protobufs.ServerToAgent) error { - conn.serverToAgentMsgs = append(conn.serverToAgentMsgs, msg) - return nil -} - -func (conn *mockOpAmpConnection) latestMsgFromServer() *protobufs.ServerToAgent { - if len(conn.serverToAgentMsgs) < 1 { - return nil - } - return conn.serverToAgentMsgs[len(conn.serverToAgentMsgs)-1] -} - -func (conn *mockOpAmpConnection) LatestPipelinesReceivedFromServer() ([]logparsingpipeline.Pipeline, error) { - pipelines := []logparsingpipeline.Pipeline{} - lastMsg := conn.latestMsgFromServer() - if lastMsg == nil { - return pipelines, nil - } - - return pipelines, nil -} - -func (conn *mockOpAmpConnection) Disconnect() error { - return nil -} -func (conn *mockOpAmpConnection) RemoteAddr() net.Addr { - return nil -}