feat: opamp server with agent config provider (#3737)

* feat: add interface for opamp.AgentConfigProvider

* feat: add iface and plumbing for generating recommended conf in opamp/agent

* feat: get opamp server config provider tests started

* chore: add test scenario for agent connection without a config recommendation

* chore: add test scenario for agent connection with a config recommendation

* chore: add test for validating config deployment status gets reported

* chore: add test for rolling out latest config recommendations when config changes

* chore: wrap up opamp server lifecycle tests

* chore: some tests cleanup

* chore: get all tests passing

* chore: update opamp server init logic in ee query service

* chore: some cleanup

* chore: some final cleanup
This commit is contained in:
Raj Kamal Singh 2023-10-14 09:16:14 +05:30 committed by GitHub
parent ad62106cad
commit cb155a1172
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 575 additions and 76 deletions

View File

@ -88,6 +88,8 @@ type Server struct {
// Usage manager
usageManager *usage.Manager
opampServer *opamp.Server
unavailableChannel chan healthcheck.Status
}
@ -254,6 +256,12 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
s.privateHTTP = privateServer
// TODO(Raj): Replace this with actual provider in a follow up PR
agentConfigProvider := opamp.NewMockAgentConfigProvider()
s.opampServer = opamp.InitializeServer(
&opAmpModel.AllAgents, agentConfigProvider,
)
return s, nil
}
@ -569,7 +577,7 @@ func (s *Server) Start() error {
go func() {
zap.S().Info("Starting OpAmp Websocket server", zap.String("addr", baseconst.OpAmpWsEndpoint))
err := opamp.InitializeAndStartServer(baseconst.OpAmpWsEndpoint, &opAmpModel.AllAgents)
err := s.opampServer.Start(baseconst.OpAmpWsEndpoint)
if err != nil {
zap.S().Info("opamp ws server failed to start", err)
s.unavailableChannel <- healthcheck.Unavailable
@ -592,7 +600,7 @@ func (s *Server) Stop() error {
}
}
opamp.StopServer()
s.opampServer.Stop()
if s.ruleManager != nil {
s.ruleManager.Stop()

View File

@ -0,0 +1,12 @@
package opamp
import "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
// Interface for a source of otel collector config recommendations.
type AgentConfigProvider interface {
model.AgentConfigProvider
// Subscribe to be notified on changes in config provided by this source.
// Used for rolling out latest config recommendation to all connected agents when settings change
SubscribeToConfigUpdates(callback func()) (unsubscribe func())
}

View File

@ -0,0 +1,256 @@
package opamp
import (
"fmt"
"log"
"net"
"os"
"testing"
"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
_ "github.com/mattn/go-sqlite3"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/app/opamp/model"
"golang.org/x/exp/maps"
)
func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
require := require.New(t)
tb := newTestbed(t)
require.Equal(
0, len(tb.testConfigProvider.ConfigUpdateSubscribers),
"there should be no agent config subscribers at the start",
)
tb.StartServer()
require.Equal(
1, len(tb.testConfigProvider.ConfigUpdateSubscribers),
"Opamp server should have subscribed to updates from config provider after being started",
)
// Server should always respond with a RemoteConfig when an agent connects.
// Even if there are no recommended changes to the agent's initial config
require.False(tb.testConfigProvider.HasRecommendations())
agent1Conn := &MockOpAmpConnection{}
agent1Id := "testAgent1"
tb.opampServer.OnMessage(
agent1Conn,
&protobufs.AgentToServer{
InstanceUid: agent1Id,
EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: initialAgentConf(),
},
},
)
lastAgent1Msg := agent1Conn.LatestMsgFromServer()
require.NotNil(
lastAgent1Msg,
"Server should always send a remote config to the agent when it connects",
)
require.Equal(
RemoteConfigBody(lastAgent1Msg),
string(initialAgentConf().ConfigMap[model.CollectorConfigFilename].Body),
)
tb.testConfigProvider.ZPagesEndpoint = "localhost:55555"
require.True(tb.testConfigProvider.HasRecommendations())
agent2Id := "testAgent2"
agent2Conn := &MockOpAmpConnection{}
tb.opampServer.OnMessage(
agent2Conn,
&protobufs.AgentToServer{
InstanceUid: agent2Id,
EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: initialAgentConf(),
},
},
)
lastAgent2Msg := agent2Conn.LatestMsgFromServer()
require.NotNil(
lastAgent2Msg,
"server should recommend a config to agent when it connects",
)
recommendedEndpoint, err := GetStringValueFromYaml(
[]byte(RemoteConfigBody(lastAgent2Msg)), "extensions.zpages.endpoint",
)
require.Nil(err)
require.Equal(
tb.testConfigProvider.ZPagesEndpoint, recommendedEndpoint,
"server should send recommended config to agent when it connects",
)
agent2Conn.ClearMsgsFromServer()
tb.opampServer.OnMessage(agent2Conn, &protobufs.AgentToServer{
InstanceUid: agent2Id,
EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: NewAgentConfigMap(
[]byte(RemoteConfigBody(lastAgent2Msg)),
),
},
RemoteConfigStatus: &protobufs.RemoteConfigStatus{
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
LastRemoteConfigHash: lastAgent2Msg.RemoteConfig.ConfigHash,
},
})
expectedConfId := tb.testConfigProvider.ZPagesEndpoint
require.True(tb.testConfigProvider.HasReportedDeploymentStatus(expectedConfId, agent2Id),
"Server should report deployment success to config provider on receiving update from agent.",
)
require.True(tb.testConfigProvider.ReportedDeploymentStatuses[expectedConfId][agent2Id])
require.Nil(
agent2Conn.LatestMsgFromServer(),
"Server should not recommend a RemoteConfig if agent is already running it.",
)
// Server should rollout latest config to all agents when notified of a change by config provider
agent1Conn.ClearMsgsFromServer()
agent2Conn.ClearMsgsFromServer()
tb.testConfigProvider.ZPagesEndpoint = "localhost:66666"
tb.testConfigProvider.NotifySubscribersOfChange()
for _, agentConn := range []*MockOpAmpConnection{agent1Conn, agent2Conn} {
lastMsg := agentConn.LatestMsgFromServer()
recommendedEndpoint, err := GetStringValueFromYaml(
[]byte(RemoteConfigBody(lastMsg)), "extensions.zpages.endpoint",
)
require.Nil(err)
require.Equal(tb.testConfigProvider.ZPagesEndpoint, recommendedEndpoint)
}
lastAgent2Msg = agent2Conn.LatestMsgFromServer()
tb.opampServer.OnMessage(agent2Conn, &protobufs.AgentToServer{
InstanceUid: agent2Id,
RemoteConfigStatus: &protobufs.RemoteConfigStatus{
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
LastRemoteConfigHash: lastAgent2Msg.RemoteConfig.ConfigHash,
},
})
expectedConfId = tb.testConfigProvider.ZPagesEndpoint
require.True(tb.testConfigProvider.HasReportedDeploymentStatus(expectedConfId, agent2Id),
"Server should report deployment failure to config provider on receiving update from agent.",
)
require.False(tb.testConfigProvider.ReportedDeploymentStatuses[expectedConfId][agent2Id])
require.Equal(1, len(tb.testConfigProvider.ConfigUpdateSubscribers))
tb.opampServer.Stop()
require.Equal(
0, len(tb.testConfigProvider.ConfigUpdateSubscribers),
"Opamp server should have unsubscribed to config provider updates after shutdown",
)
}
type testbed struct {
testConfigProvider *MockAgentConfigProvider
opampServer *Server
t *testing.T
}
func newTestbed(t *testing.T) *testbed {
// Init opamp model.
testDBFile, err := os.CreateTemp("", "test-signoz-db-*")
if err != nil {
t.Fatalf("could not create temp file for test db: %v", err)
}
testDBFilePath := testDBFile.Name()
t.Cleanup(func() { os.Remove(testDBFilePath) })
testDBFile.Close()
_, err = model.InitDB(testDBFilePath)
if err != nil {
t.Fatalf("could not init opamp model: %v", err)
}
testConfigProvider := NewMockAgentConfigProvider()
opampServer := InitializeServer(nil, testConfigProvider)
return &testbed{
testConfigProvider: testConfigProvider,
opampServer: opampServer,
t: t,
}
}
func (tb *testbed) StartServer() {
testListenPath := GetAvailableLocalAddress()
err := tb.opampServer.Start(testListenPath)
require.Nil(tb.t, err, "should be able to start opamp server")
}
// Test helper
func GetStringValueFromYaml(
serializedYaml []byte, path string,
) (string, error) {
if len(serializedYaml) < 1 {
return "", fmt.Errorf("yaml data is empty")
}
k := koanf.New(".")
err := k.Load(rawbytes.Provider(serializedYaml), yaml.Parser())
if err != nil {
return "", errors.Wrap(err, "could not unmarshal collector config")
}
return k.String("extensions.zpages.endpoint"), nil
}
// Returns body of a ServerToAgent.RemoteConfig or ""
func RemoteConfigBody(msg *protobufs.ServerToAgent) string {
if msg == nil {
return ""
}
collectorConfFiles := msg.RemoteConfig.Config.ConfigMap
if len(collectorConfFiles) < 1 {
return ""
}
return string(maps.Values(collectorConfFiles)[0].Body)
}
func NewAgentConfigMap(body []byte) *protobufs.AgentConfigMap {
return &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
model.CollectorConfigFilename: {
Body: body,
ContentType: "text/yaml",
},
},
}
}
func initialAgentConf() *protobufs.AgentConfigMap {
return NewAgentConfigMap(
[]byte(`
receivers:
otlp:
processors:
batch:
exporters:
otlp:
service:
pipelines:
logs:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
`),
)
}
// Brought in from https://github.com/open-telemetry/opamp-go/blob/main/internal/testhelpers/nethelpers.go
func GetAvailableLocalAddress() string {
ln, err := net.Listen("tcp", "127.0.0.1:")
if err != nil {
log.Fatalf("failed to get a free local port: %v", err)
}
// There is a possible race if something else takes this same port before
// the test uses it, however, that is unlikely in practice.
defer ln.Close()
return ln.Addr().String()
}

View File

@ -0,0 +1,134 @@
package opamp
import (
"context"
"net"
"github.com/google/uuid"
"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/pkg/errors"
)
type MockOpAmpConnection struct {
ServerToAgentMsgs []*protobufs.ServerToAgent
}
func (conn *MockOpAmpConnection) Send(ctx context.Context, msg *protobufs.ServerToAgent) error {
conn.ServerToAgentMsgs = append(conn.ServerToAgentMsgs, msg)
return nil
}
func (conn *MockOpAmpConnection) LatestMsgFromServer() *protobufs.ServerToAgent {
if len(conn.ServerToAgentMsgs) < 1 {
return nil
}
return conn.ServerToAgentMsgs[len(conn.ServerToAgentMsgs)-1]
}
func (conn *MockOpAmpConnection) ClearMsgsFromServer() []*protobufs.ServerToAgent {
msgs := conn.ServerToAgentMsgs
conn.ServerToAgentMsgs = []*protobufs.ServerToAgent{}
return msgs
}
func (conn *MockOpAmpConnection) Disconnect() error {
return nil
}
func (conn *MockOpAmpConnection) RemoteAddr() net.Addr {
return nil
}
// Implements opamp.AgentConfigProvider
type MockAgentConfigProvider struct {
// An updated config is recommended by TestAgentConfProvider
// if `ZPagesEndpoint` is not empty
ZPagesEndpoint string
ConfigUpdateSubscribers map[string]func()
// { configId: { agentId: isOk } }
ReportedDeploymentStatuses map[string]map[string]bool
}
func NewMockAgentConfigProvider() *MockAgentConfigProvider {
return &MockAgentConfigProvider{
ConfigUpdateSubscribers: map[string]func(){},
ReportedDeploymentStatuses: map[string]map[string]bool{},
}
}
// Test helper.
func (ta *MockAgentConfigProvider) HasRecommendations() bool {
return len(ta.ZPagesEndpoint) > 0
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) RecommendAgentConfig(baseConfYaml []byte) (
[]byte, string, error,
) {
if len(ta.ZPagesEndpoint) < 1 {
return baseConfYaml, "agent-base-config", nil
}
k := koanf.New(".")
err := k.Load(rawbytes.Provider(baseConfYaml), yaml.Parser())
if err != nil {
return nil, "", errors.Wrap(err, "could not unmarshal baseConf")
}
k.Set("extensions.zpages.endpoint", ta.ZPagesEndpoint)
recommendedYaml, err := k.Marshal(yaml.Parser())
if err != nil {
return nil, "", errors.Wrap(err, "could not marshal recommended conf")
}
confId := ta.ZPagesEndpoint
return recommendedYaml, confId, nil
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) ReportConfigDeploymentStatus(
agentId string,
configId string,
err error,
) {
confIdReports := ta.ReportedDeploymentStatuses[configId]
if confIdReports == nil {
confIdReports = map[string]bool{}
ta.ReportedDeploymentStatuses[configId] = confIdReports
}
confIdReports[agentId] = (err == nil)
}
// Test helper.
func (ta *MockAgentConfigProvider) HasReportedDeploymentStatus(
configId string, agentId string,
) bool {
confIdReports := ta.ReportedDeploymentStatuses[configId]
if confIdReports == nil {
return false
}
_, exists := confIdReports[agentId]
return exists
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) SubscribeToConfigUpdates(callback func()) func() {
subscriberId := uuid.NewString()
ta.ConfigUpdateSubscribers[subscriberId] = callback
return func() {
delete(ta.ConfigUpdateSubscribers, subscriberId)
}
}
// test helper.
func (ta *MockAgentConfigProvider) NotifySubscribersOfChange() {
for _, callback := range ta.ConfigUpdateSubscribers {
callback()
}
}

View File

@ -7,6 +7,7 @@ import (
"sync"
"time"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/open-telemetry/opamp-go/protobufs"
@ -72,12 +73,6 @@ func (agent *Agent) Upsert() error {
return nil
}
func (agent *Agent) UpdateStatus(statusMsg *protobufs.AgentToServer, response *protobufs.ServerToAgent) {
agent.mux.Lock()
defer agent.mux.Unlock()
agent.processStatusUpdate(statusMsg, response)
}
// extracts lb exporter support flag from agent description. the flag
// is used to decide if lb exporter can be enabled on the agent.
func ExtractLbFlag(agentDescr *protobufs.AgentDescription) bool {
@ -208,9 +203,20 @@ func (agent *Agent) hasCapability(capability protobufs.AgentCapabilities) bool {
return agent.Status.Capabilities&uint64(capability) != 0
}
func (agent *Agent) UpdateStatus(
statusMsg *protobufs.AgentToServer,
response *protobufs.ServerToAgent,
configProvider AgentConfigProvider,
) {
agent.mux.Lock()
defer agent.mux.Unlock()
agent.processStatusUpdate(statusMsg, response, configProvider)
}
func (agent *Agent) processStatusUpdate(
newStatus *protobufs.AgentToServer,
response *protobufs.ServerToAgent,
configProvider AgentConfigProvider,
) {
// We don't have any status for this Agent, or we lost the previous status update from the Agent, so our
// current status is not up-to-date.
@ -237,12 +243,16 @@ func (agent *Agent) processStatusUpdate(
response.Flags |= uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState)
}
// This needs to be done before agent.updateRemoteConfig() to ensure it sees
// the latest value for agent.EffectiveConfig when generating a config recommendation
agent.updateEffectiveConfig(newStatus, response)
configChanged := false
if agentDescrChanged {
// Agent description is changed.
// We need to recalculate the config.
configChanged = agent.updateRemoteConfig()
configChanged = agent.updateRemoteConfig(configProvider)
}
// If remote config is changed and different from what the Agent has then
@ -254,13 +264,21 @@ func (agent *Agent) processStatusUpdate(
// does not have this config (hash is different). Send the new config the Agent.
response.RemoteConfig = agent.remoteConfig
agent.SendToAgent(response)
}
agent.updateEffectiveConfig(newStatus, response)
ListenToConfigUpdate(
agent.ID,
string(response.RemoteConfig.ConfigHash),
configProvider.ReportConfigDeploymentStatus,
)
}
}
func (agent *Agent) updateRemoteConfig() bool {
hash := sha256.New()
func (agent *Agent) updateRemoteConfig(configProvider AgentConfigProvider) bool {
recommendedConfig, confId, err := configProvider.RecommendAgentConfig([]byte(agent.EffectiveConfig))
if err != nil {
zap.S().Errorf("could not generate config recommendation for agent %d: %w", agent.ID, err)
return false
}
cfg := protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
@ -268,14 +286,25 @@ func (agent *Agent) updateRemoteConfig() bool {
},
}
// Calculate the hash.
for k, v := range cfg.Config.ConfigMap {
hash.Write([]byte(k))
hash.Write(v.Body)
hash.Write([]byte(v.ContentType))
cfg.Config.ConfigMap[CollectorConfigFilename] = &protobufs.AgentConfigFile{
Body: recommendedConfig,
ContentType: "application/x-yaml",
}
cfg.ConfigHash = hash.Sum(nil)
if len(confId) < 1 {
// Should never happen. Handle gracefully if it does by some chance.
zap.S().Errorf("config provider recommended a config with empty confId. Using content hash for configId")
hash := sha256.New()
for k, v := range cfg.Config.ConfigMap {
hash.Write([]byte(k))
hash.Write(v.Body)
hash.Write([]byte(v.ContentType))
}
cfg.ConfigHash = hash.Sum(nil)
} else {
cfg.ConfigHash = []byte(confId)
}
configChanged := !isEqualRemoteConfig(agent.remoteConfig, &cfg)

View File

@ -6,7 +6,10 @@ import (
"time"
"github.com/jmoiron/sqlx"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server/types"
"github.com/pkg/errors"
"go.uber.org/zap"
)
var db *sqlx.DB
@ -115,3 +118,45 @@ func (agents *Agents) GetAllAgents() []*Agent {
}
return allAgents
}
// Recommend latest config to connected agents whose effective
// config is not the same as the latest recommendation
func (agents *Agents) RecommendLatestConfigToAll(
provider AgentConfigProvider,
) error {
for _, agent := range agents.GetAllAgents() {
newConfig, confId, err := provider.RecommendAgentConfig(
[]byte(agent.EffectiveConfig),
)
if err != nil {
return errors.Wrap(err, fmt.Sprintf(
"could not generate conf recommendation for %v", agent.ID,
))
}
// Recommendation is same as current config
if string(newConfig) == agent.EffectiveConfig {
zap.S().Infof(
"Recommended config same as current effective config for agent %s", agent.ID,
)
return nil
}
agent.SendToAgent(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
CollectorConfigFilename: {
Body: newConfig,
ContentType: "application/x-yaml",
},
},
},
ConfigHash: []byte(confId),
},
})
ListenToConfigUpdate(agent.ID, confId, provider.ReportConfigDeploymentStatus)
}
return nil
}

View File

@ -0,0 +1,20 @@
package model
// Interface for source of otel collector config recommendations.
type AgentConfigProvider interface {
// Generate recommended config for an agent based on its `currentConfYaml`
// and current state of user facing settings for agent based features.
RecommendAgentConfig(currentConfYaml []byte) (
recommendedConfYaml []byte,
// Opaque id of the recommended config, used for reporting deployment status updates
configId string,
err error,
)
// Report deployment status for config recommendations generated by RecommendAgentConfig
ReportConfigDeploymentStatus(
agentId string,
configId string,
err error,
)
}

View File

@ -0,0 +1,4 @@
package model
// Must match collectorConfigKey in https://github.com/SigNoz/signoz-otel-collector/blob/main/opamp/config_manager.go
const CollectorConfigFilename = "collector.yaml"

View File

@ -18,35 +18,32 @@ type Server struct {
agents *model.Agents
logger *zap.Logger
capabilities int32
agentConfigProvider AgentConfigProvider
// cleanups to be run when stopping the server
cleanups []func()
}
const capabilities = protobufs.ServerCapabilities_ServerCapabilities_AcceptsEffectiveConfig |
protobufs.ServerCapabilities_ServerCapabilities_OffersRemoteConfig |
protobufs.ServerCapabilities_ServerCapabilities_AcceptsStatus
func InitializeServer(listener string, agents *model.Agents) *Server {
func InitializeServer(
agents *model.Agents, agentConfigProvider AgentConfigProvider,
) *Server {
if agents == nil {
agents = &model.AllAgents
}
opAmpServer = &Server{
agents: agents,
agents: agents,
agentConfigProvider: agentConfigProvider,
}
opAmpServer.server = server.New(zap.S())
return opAmpServer
}
func InitializeAndStartServer(listener string, agents *model.Agents) error {
InitializeServer(listener, agents)
return opAmpServer.Start(listener)
}
func StopServer() {
if opAmpServer != nil {
opAmpServer.Stop()
}
}
func (srv *Server) Start(listener string) error {
settings := server.StartSettings{
Settings: server.Settings{
@ -58,10 +55,24 @@ func (srv *Server) Start(listener string) error {
ListenEndpoint: listener,
}
unsubscribe := srv.agentConfigProvider.SubscribeToConfigUpdates(func() {
err := srv.agents.RecommendLatestConfigToAll(srv.agentConfigProvider)
if err != nil {
zap.S().Errorf(
"could not roll out latest config recommendation to connected agents: %w", err,
)
}
})
srv.cleanups = append(srv.cleanups, unsubscribe)
return srv.server.Start(settings)
}
func (srv *Server) Stop() {
for _, cleanup := range srv.cleanups {
defer cleanup()
}
srv.server.Stop(context.Background())
}
@ -80,7 +91,12 @@ func (srv *Server) OnMessage(conn types.Connection, msg *protobufs.AgentToServer
if created {
agent.CanLB = model.ExtractLbFlag(msg.AgentDescription)
zap.S().Debugf("New agent added:", zap.Bool("canLb", agent.CanLB), zap.String("ID", agent.ID), zap.Any("status", agent.CurrentStatus))
zap.S().Debugf(
"New agent added:",
zap.Bool("canLb", agent.CanLB),
zap.String("ID", agent.ID),
zap.Any("status", agent.CurrentStatus),
)
}
var response *protobufs.ServerToAgent
@ -89,7 +105,7 @@ func (srv *Server) OnMessage(conn types.Connection, msg *protobufs.AgentToServer
Capabilities: uint64(capabilities),
}
agent.UpdateStatus(msg, response)
agent.UpdateStatus(msg, response, srv.agentConfigProvider)
return response
}

View File

@ -76,6 +76,8 @@ type Server struct {
privateConn net.Listener
privateHTTP *http.Server
opampServer *opamp.Server
unavailableChannel chan healthcheck.Status
}
@ -204,6 +206,13 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
if err := agentConf.Initiate(localDB, "sqlite"); err != nil {
return nil, err
}
// TODO(Raj): Replace this with actual provider in a follow up PR
agentConfigProvider := opamp.NewMockAgentConfigProvider()
s.opampServer = opamp.InitializeServer(
&opAmpModel.AllAgents, agentConfigProvider,
)
return s, nil
}
@ -503,7 +512,7 @@ func (s *Server) Start() error {
go func() {
zap.S().Info("Starting OpAmp Websocket server", zap.String("addr", constants.OpAmpWsEndpoint))
err := opamp.InitializeAndStartServer(constants.OpAmpWsEndpoint, &opAmpModel.AllAgents)
err := s.opampServer.Start(constants.OpAmpWsEndpoint)
if err != nil {
zap.S().Info("opamp ws server failed to start", err)
s.unavailableChannel <- healthcheck.Unavailable
@ -526,7 +535,7 @@ func (s *Server) Stop() error {
}
}
opamp.StopServer()
s.opampServer.Stop()
if s.ruleManager != nil {
s.ruleManager.Stop()

View File

@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"os"
@ -299,7 +298,7 @@ type LogPipelinesTestBed struct {
testUser *model.User
apiHandler *app.APIHandler
opampServer *opamp.Server
opampClientConn *mockOpAmpConnection
opampClientConn *opamp.MockOpAmpConnection
}
func NewLogPipelinesTestBed(t *testing.T) *LogPipelinesTestBed {
@ -447,7 +446,7 @@ func (tb *LogPipelinesTestBed) GetPipelinesFromQS() *logparsingpipeline.Pipeline
func (tb *LogPipelinesTestBed) assertPipelinesSentToOpampClient(
pipelines []logparsingpipeline.Pipeline,
) {
lastMsg := tb.opampClientConn.latestMsgFromServer()
lastMsg := tb.opampClientConn.LatestMsgFromServer()
collectorConfigFiles := lastMsg.RemoteConfig.Config.ConfigMap
assert.Equal(
tb.t, len(collectorConfigFiles), 1,
@ -516,7 +515,7 @@ func (tb *LogPipelinesTestBed) assertPipelinesSentToOpampClient(
}
func (tb *LogPipelinesTestBed) simulateOpampClientAcknowledgementForLatestConfig() {
lastMsg := tb.opampClientConn.latestMsgFromServer()
lastMsg := tb.opampClientConn.LatestMsgFromServer()
tb.opampServer.OnMessage(tb.opampClientConn, &protobufs.AgentToServer{
InstanceUid: "test",
EffectiveConfig: &protobufs.EffectiveConfig{
@ -564,7 +563,7 @@ func assertPipelinesResponseMatchesPostedPipelines(
}
}
func mockOpampAgent(testDBFilePath string) (*opamp.Server, *mockOpAmpConnection, error) {
func mockOpampAgent(testDBFilePath string) (*opamp.Server, *opamp.MockOpAmpConnection, error) {
// Mock an available opamp agent
testDB, err := opampModel.InitDB(testDBFilePath)
if err != nil {
@ -575,8 +574,8 @@ func mockOpampAgent(testDBFilePath string) (*opamp.Server, *mockOpAmpConnection,
return nil, nil, err
}
opampServer := opamp.InitializeServer(constants.OpAmpWsEndpoint, nil)
opampClientConnection := &mockOpAmpConnection{}
opampServer := opamp.InitializeServer(nil, opamp.NewMockAgentConfigProvider())
opampClientConnection := &opamp.MockOpAmpConnection{}
opampServer.OnMessage(
opampClientConnection,
&protobufs.AgentToServer{
@ -674,36 +673,3 @@ func NewAuthenticatedTestRequest(
req.Header.Add("Authorization", "Bearer "+userJwt.AccessJwt)
return req, nil
}
type mockOpAmpConnection struct {
serverToAgentMsgs []*protobufs.ServerToAgent
}
func (conn *mockOpAmpConnection) Send(ctx context.Context, msg *protobufs.ServerToAgent) error {
conn.serverToAgentMsgs = append(conn.serverToAgentMsgs, msg)
return nil
}
func (conn *mockOpAmpConnection) latestMsgFromServer() *protobufs.ServerToAgent {
if len(conn.serverToAgentMsgs) < 1 {
return nil
}
return conn.serverToAgentMsgs[len(conn.serverToAgentMsgs)-1]
}
func (conn *mockOpAmpConnection) LatestPipelinesReceivedFromServer() ([]logparsingpipeline.Pipeline, error) {
pipelines := []logparsingpipeline.Pipeline{}
lastMsg := conn.latestMsgFromServer()
if lastMsg == nil {
return pipelines, nil
}
return pipelines, nil
}
func (conn *mockOpAmpConnection) Disconnect() error {
return nil
}
func (conn *mockOpAmpConnection) RemoteAddr() net.Addr {
return nil
}