From 03acc33888664d759b3fcbb90e00b2afb929f962 Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+rkssisodiya@users.noreply.github.com> Date: Sat, 2 Sep 2023 20:01:03 +0530 Subject: [PATCH] chore: log parsing pipelines happy path integration test (#3424) * chore: get started with logparsingpipeline happy path integration test * chore: log pipelines happy path: assert opampclient receives expected otel config processors for log pipelines. * chore: logparsing pipeline happy path: validate deployment status update * chore: logparsing pipeline happy path: validate posting pipelines update * chore: some cleanup * chore: some more cleanup * chore: some more cleanup * fix: address review comments --------- Co-authored-by: Nityananda Gohain --- Makefile | 1 + ee/query-service/app/server.go | 2 +- pkg/query-service/app/http_handler.go | 14 +- .../app/logparsingpipeline/pipelineBuilder.go | 7 - pkg/query-service/app/opamp/opamp_server.go | 11 +- pkg/query-service/app/server.go | 2 +- .../integration/logparsingpipeline_test.go | 542 ++++++++++++++++++ 7 files changed, 559 insertions(+), 20 deletions(-) create mode 100644 pkg/query-service/tests/integration/logparsingpipeline_test.go diff --git a/Makefile b/Makefile index bef8c8bce7..1761acd26d 100644 --- a/Makefile +++ b/Makefile @@ -151,3 +151,4 @@ test: go test ./pkg/query-service/app/querier/... go test ./pkg/query-service/converter/... go test ./pkg/query-service/formatter/... + go test ./pkg/query-service/tests/integration/... \ No newline at end of file diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 9f3a08a394..aee2c87160 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -548,7 +548,7 @@ func (s *Server) Start() error { go func() { zap.S().Info("Starting OpAmp Websocket server", zap.String("addr", baseconst.OpAmpWsEndpoint)) - err := opamp.InitalizeServer(baseconst.OpAmpWsEndpoint, &opAmpModel.AllAgents) + err := opamp.InitializeAndStartServer(baseconst.OpAmpWsEndpoint, &opAmpModel.AllAgents) if err != nil { zap.S().Info("opamp ws server failed to start", err) s.unavailableChannel <- healthcheck.Unavailable diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 45ba2ea970..7ce90effc1 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -208,7 +208,7 @@ func (aH *APIHandler) testReady(f http.HandlerFunc) http.HandlerFunc { } } -type response struct { +type ApiResponse struct { Status status `json:"status"` Data interface{} `json:"data,omitempty"` ErrorType model.ErrorType `json:"errorType,omitempty"` @@ -217,7 +217,7 @@ type response struct { func RespondError(w http.ResponseWriter, apiErr model.BaseApiError, data interface{}) { json := jsoniter.ConfigCompatibleWithStandardLibrary - b, err := json.Marshal(&response{ + b, err := json.Marshal(&ApiResponse{ Status: statusError, ErrorType: apiErr.Type(), Error: apiErr.Error(), @@ -260,7 +260,7 @@ func RespondError(w http.ResponseWriter, apiErr model.BaseApiError, data interfa func writeHttpResponse(w http.ResponseWriter, data interface{}) { json := jsoniter.ConfigCompatibleWithStandardLibrary - b, err := json.Marshal(&response{ + b, err := json.Marshal(&ApiResponse{ Status: statusSuccess, Data: data, }) @@ -2284,8 +2284,8 @@ func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router, am *AuthMiddleware) subRouter.HandleFunc("/aggregate", am.ViewAccess(aH.logAggregate)).Methods(http.MethodGet) // log pipelines - subRouter.HandleFunc("/pipelines/{version}", am.ViewAccess(aH.listLogsPipelinesHandler)).Methods(http.MethodGet) - subRouter.HandleFunc("/pipelines", am.EditAccess(aH.createLogsPipeline)).Methods(http.MethodPost) + subRouter.HandleFunc("/pipelines/{version}", am.ViewAccess(aH.ListLogsPipelinesHandler)).Methods(http.MethodGet) + subRouter.HandleFunc("/pipelines", am.EditAccess(aH.CreateLogsPipeline)).Methods(http.MethodPost) } func (aH *APIHandler) logFields(w http.ResponseWriter, r *http.Request) { @@ -2419,7 +2419,7 @@ func parseAgentConfigVersion(r *http.Request) (int, *model.ApiError) { return int(version64), nil } -func (ah *APIHandler) listLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { +func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { version, err := parseAgentConfigVersion(r) if err != nil { @@ -2488,7 +2488,7 @@ func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version in return payload, nil } -func (ah *APIHandler) createLogsPipeline(w http.ResponseWriter, r *http.Request) { +func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) { req := logparsingpipeline.PostablePipelines{} diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go index 60f0e4df17..93b60c37cc 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go @@ -1,9 +1,6 @@ package logparsingpipeline import ( - "encoding/json" - "fmt" - "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/model" ) @@ -69,9 +66,5 @@ func getOperators(ops []model.PipelineOperator) []model.PipelineOperator { filteredOp[len(filteredOp)-1].Output = "" } } - for _, v := range filteredOp { - x, _ := json.Marshal(v) - fmt.Println(string(x)) - } return filteredOp } diff --git a/pkg/query-service/app/opamp/opamp_server.go b/pkg/query-service/app/opamp/opamp_server.go index cee50ba90c..201fd598c7 100644 --- a/pkg/query-service/app/opamp/opamp_server.go +++ b/pkg/query-service/app/opamp/opamp_server.go @@ -24,8 +24,7 @@ const capabilities = protobufs.ServerCapabilities_ServerCapabilities_AcceptsEffe protobufs.ServerCapabilities_ServerCapabilities_OffersRemoteConfig | protobufs.ServerCapabilities_ServerCapabilities_AcceptsStatus -func InitalizeServer(listener string, agents *model.Agents) error { - +func InitializeServer(listener string, agents *model.Agents) *Server { if agents == nil { agents = &model.AllAgents } @@ -34,7 +33,11 @@ func InitalizeServer(listener string, agents *model.Agents) error { agents: agents, } opAmpServer.server = server.New(zap.S()) + return opAmpServer +} +func InitializeAndStartServer(listener string, agents *model.Agents) error { + InitializeServer(listener, agents) return opAmpServer.Start(listener) } @@ -48,7 +51,7 @@ func (srv *Server) Start(listener string) error { settings := server.StartSettings{ Settings: server.Settings{ Callbacks: server.CallbacksStruct{ - OnMessageFunc: srv.onMessage, + OnMessageFunc: srv.OnMessage, OnConnectionCloseFunc: srv.onDisconnect, }, }, @@ -66,7 +69,7 @@ func (srv *Server) onDisconnect(conn types.Connection) { srv.agents.RemoveConnection(conn) } -func (srv *Server) onMessage(conn types.Connection, msg *protobufs.AgentToServer) *protobufs.ServerToAgent { +func (srv *Server) OnMessage(conn types.Connection, msg *protobufs.AgentToServer) *protobufs.ServerToAgent { agentID := msg.InstanceUid agent, created, err := srv.agents.FindOrCreateAgent(agentID, conn) diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 45bb2ac91e..c05eed039c 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -488,7 +488,7 @@ func (s *Server) Start() error { go func() { zap.S().Info("Starting OpAmp Websocket server", zap.String("addr", constants.OpAmpWsEndpoint)) - err := opamp.InitalizeServer(constants.OpAmpWsEndpoint, &opAmpModel.AllAgents) + err := opamp.InitializeAndStartServer(constants.OpAmpWsEndpoint, &opAmpModel.AllAgents) if err != nil { zap.S().Info("opamp ws server failed to start", err) s.unavailableChannel <- healthcheck.Unavailable diff --git a/pkg/query-service/tests/integration/logparsingpipeline_test.go b/pkg/query-service/tests/integration/logparsingpipeline_test.go new file mode 100644 index 0000000000..d2f1b60910 --- /dev/null +++ b/pkg/query-service/tests/integration/logparsingpipeline_test.go @@ -0,0 +1,542 @@ +package tests + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + + "github.com/gorilla/mux" + "github.com/jmoiron/sqlx" + "github.com/knadh/koanf/parsers/yaml" + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "go.signoz.io/signoz/pkg/query-service/agentConf" + "go.signoz.io/signoz/pkg/query-service/app" + "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" + "go.signoz.io/signoz/pkg/query-service/app/opamp" + opampModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model" + "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/constants" + "go.signoz.io/signoz/pkg/query-service/dao" + "go.signoz.io/signoz/pkg/query-service/model" + "golang.org/x/exp/maps" +) + +func TestLogPipelinesLifecycle(t *testing.T) { + testbed := NewLogPipelinesTestBed(t) + assert := assert.New(t) + + getPipelinesResp := testbed.GetPipelinesFromQS() + assert.Equal( + 0, len(getPipelinesResp.Pipelines), + "There should be no pipelines at the start", + ) + assert.Equal( + 0, len(getPipelinesResp.History), + "There should be no pipelines config history at the start", + ) + + // Should be able to create pipelines config + postablePipelines := logparsingpipeline.PostablePipelines{ + Pipelines: []logparsingpipeline.PostablePipeline{ + { + OrderId: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + Filter: "attributes.method == \"GET\"", + Config: []model.PipelineOperator{ + { + OrderId: 1, + ID: "add", + Type: "add", + Field: "attributes.test", + Value: "val", + Enabled: true, + Name: "test add", + }, + }, + }, { + OrderId: 2, + Name: "pipeline2", + Alias: "pipeline2", + Enabled: true, + Filter: "attributes.method == \"GET\"", + Config: []model.PipelineOperator{ + { + OrderId: 1, + ID: "remove", + Type: "remove", + Field: "attributes.test", + Enabled: true, + Name: "test remove", + }, + }, + }, + }, + } + + createPipelinesResp := testbed.PostPipelinesToQS(postablePipelines) + assertPipelinesResponseMatchesPostedPipelines( + t, postablePipelines, createPipelinesResp, + ) + testbed.assertPipelinesSentToOpampClient(createPipelinesResp.Pipelines) + + // Should be able to get the configured pipelines. + getPipelinesResp = testbed.GetPipelinesFromQS() + assertPipelinesResponseMatchesPostedPipelines( + t, postablePipelines, getPipelinesResp, + ) + + // Deployment status should be pending. + assert.Equal( + 1, len(getPipelinesResp.History), + "pipelines config history should not be empty after 1st configuration", + ) + assert.Equal( + agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus, + "pipelines deployment should be in progress after 1st configuration", + ) + + // Deployment status should get updated after acknowledgement from opamp client + testbed.simulateOpampClientAcknowledgementForLatestConfig() + + getPipelinesResp = testbed.GetPipelinesFromQS() + assertPipelinesResponseMatchesPostedPipelines( + t, postablePipelines, getPipelinesResp, + ) + assert.Equal( + getPipelinesResp.History[0].DeployStatus, agentConf.Deployed, + "pipeline deployment should be complete after acknowledgment from opamp client", + ) + + // Should be able to update pipelines config. + postablePipelines.Pipelines[1].Enabled = false + updatePipelinesResp := testbed.PostPipelinesToQS(postablePipelines) + assertPipelinesResponseMatchesPostedPipelines( + t, postablePipelines, updatePipelinesResp, + ) + testbed.assertPipelinesSentToOpampClient(updatePipelinesResp.Pipelines) + + assert.Equal( + 2, len(updatePipelinesResp.History), + "there should be 2 history entries after posting pipelines config for the 2nd time", + ) + assert.Equal( + agentConf.DeployInitiated, updatePipelinesResp.History[0].DeployStatus, + "deployment should be in progress for latest pipeline config", + ) + + // Deployment status should get updated again on receiving msg from client. + testbed.simulateOpampClientAcknowledgementForLatestConfig() + + getPipelinesResp = testbed.GetPipelinesFromQS() + assertPipelinesResponseMatchesPostedPipelines( + t, postablePipelines, getPipelinesResp, + ) + assert.Equal( + getPipelinesResp.History[0].DeployStatus, agentConf.Deployed, + "deployment for latest pipeline config should be complete after acknowledgment from opamp client", + ) +} + +// LogPipelinesTestBed coordinates and mocks components involved in +// configuring log pipelines and provides test helpers. +type LogPipelinesTestBed struct { + t *testing.T + testUser *model.User + apiHandler *app.APIHandler + opampServer *opamp.Server + opampClientConn *mockOpAmpConnection +} + +func NewLogPipelinesTestBed(t *testing.T) *LogPipelinesTestBed { + // Create a tmp file based sqlite db for testing. + testDBFile, err := os.CreateTemp("", "test-signoz-db-*") + if err != nil { + t.Fatalf("could not create temp file for test db: %v", err) + } + testDBFilePath := testDBFile.Name() + t.Cleanup(func() { os.Remove(testDBFilePath) }) + testDBFile.Close() + + // TODO(Raj): move away from singleton DB instances to avoid + // issues when running tests in parallel. + dao.InitDao("sqlite", testDBFilePath) + + testDB, err := sqlx.Open("sqlite3", testDBFilePath) + if err != nil { + t.Fatalf("could not open test db sqlite file: %v", err) + } + controller, err := logparsingpipeline.NewLogParsingPipelinesController(testDB, "sqlite") + if err != nil { + t.Fatalf("could not create a logparsingpipelines controller: %v", err) + } + + apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{ + AppDao: dao.DB(), + LogsParsingPipelineController: controller, + }) + if err != nil { + t.Fatalf("could not create a new ApiHandler: %v", err) + } + + opampServer, clientConn, err := mockOpampAgent(testDBFilePath) + if err != nil { + t.Fatalf("could not create opamp server and mock client connection: %v", err) + } + + user, apiErr := createTestUser() + if apiErr != nil { + t.Fatalf("could not create a test user: %v", apiErr) + } + + return &LogPipelinesTestBed{ + t: t, + testUser: user, + apiHandler: apiHandler, + opampServer: opampServer, + opampClientConn: clientConn, + } +} + +func (tb *LogPipelinesTestBed) PostPipelinesToQSExpectingStatusCode( + postablePipelines logparsingpipeline.PostablePipelines, + expectedStatusCode int, +) *logparsingpipeline.PipelinesResponse { + req, err := NewAuthenticatedTestRequest( + tb.testUser, "/api/v1/logs/pipelines", postablePipelines, + ) + if err != nil { + tb.t.Fatalf("couldn't create authenticated test request: %v", err) + } + + respWriter := httptest.NewRecorder() + tb.apiHandler.CreateLogsPipeline(respWriter, req) + + response := respWriter.Result() + responseBody, err := io.ReadAll(response.Body) + if err != nil { + tb.t.Fatalf("couldn't read response body received from posting pipelines to QS: %v", err) + } + + if response.StatusCode != expectedStatusCode { + tb.t.Fatalf( + "Received response status %d after posting log pipelines. Expected: %d", + response.StatusCode, expectedStatusCode, + ) + } + + var result app.ApiResponse + err = json.Unmarshal(responseBody, &result) + if err != nil { + tb.t.Fatalf( + "Could not unmarshal QS response into an ApiResponse.\nResponse body: %s", + responseBody, + ) + } + + pipelinesResp, err := unmarshalPipelinesResponse(&result) + if err != nil { + tb.t.Fatalf("could not extract PipelinesResponse from apiResponse: %v", err) + } + return pipelinesResp +} + +func (tb *LogPipelinesTestBed) PostPipelinesToQS( + postablePipelines logparsingpipeline.PostablePipelines, +) *logparsingpipeline.PipelinesResponse { + return tb.PostPipelinesToQSExpectingStatusCode( + postablePipelines, 200, + ) +} + +func (tb *LogPipelinesTestBed) GetPipelinesFromQS() *logparsingpipeline.PipelinesResponse { + req, err := NewAuthenticatedTestRequest( + tb.testUser, "/api/v1/logs/pipelines/latest", nil, + ) + if err != nil { + tb.t.Fatalf("couldn't create authenticated test request: %v", err) + } + req = mux.SetURLVars(req, map[string]string{ + "version": "latest", + }) + + respWriter := httptest.NewRecorder() + tb.apiHandler.ListLogsPipelinesHandler(respWriter, req) + response := respWriter.Result() + responseBody, err := io.ReadAll(response.Body) + if err != nil { + tb.t.Fatalf("couldn't read response body received from QS: %v", err) + } + + if response.StatusCode != 200 { + tb.t.Fatalf( + "could not list log parsing pipelines. status: %d, body: %v", + response.StatusCode, responseBody, + ) + } + + var result app.ApiResponse + err = json.Unmarshal(responseBody, &result) + if err != nil { + tb.t.Fatalf( + "Could not unmarshal QS response into an ApiResponse.\nResponse body: %s", + responseBody, + ) + } + pipelinesResp, err := unmarshalPipelinesResponse(&result) + if err != nil { + tb.t.Fatalf("could not extract PipelinesResponse from apiResponse: %v", err) + } + return pipelinesResp +} + +func (tb *LogPipelinesTestBed) assertPipelinesSentToOpampClient( + pipelines []model.Pipeline, +) { + lastMsg := tb.opampClientConn.latestMsgFromServer() + otelConfigFiles := lastMsg.RemoteConfig.Config.ConfigMap + assert.Equal( + tb.t, len(otelConfigFiles), 1, + "otel config sent to client is expected to contain atleast 1 file", + ) + + otelConfigYaml := maps.Values(otelConfigFiles)[0].Body + otelConfSentToClient, err := yaml.Parser().Unmarshal(otelConfigYaml) + if err != nil { + tb.t.Fatalf("could not unmarshal config file sent to opamp client: %v", err) + } + + // Each pipeline is expected to become its own processor + // in the logs service in otel collector config. + otelConfSvcs := otelConfSentToClient["service"].(map[string]interface{}) + otelConfLogsSvc := otelConfSvcs["pipelines"].(map[string]interface{})["logs"].(map[string]interface{}) + otelConfLogsSvcProcessorNames := otelConfLogsSvc["processors"].([]interface{}) + otelConfLogsPipelineProcNames := []string{} + for _, procNameVal := range otelConfLogsSvcProcessorNames { + procName := procNameVal.(string) + if strings.HasPrefix(procName, constants.LogsPPLPfx) { + otelConfLogsPipelineProcNames = append( + otelConfLogsPipelineProcNames, + procName, + ) + } + } + + _, expectedLogProcessorNames, err := logparsingpipeline.PreparePipelineProcessor(pipelines) + assert.Equal( + tb.t, expectedLogProcessorNames, otelConfLogsPipelineProcNames, + "config sent to opamp client doesn't contain expected log pipelines", + ) + + otelConfProcessors := otelConfSentToClient["processors"].(map[string]interface{}) + for _, procName := range expectedLogProcessorNames { + _, procExists := otelConfProcessors[procName] + assert.True(tb.t, procExists, fmt.Sprintf( + "%s processor not found in config sent to opamp client", procName, + )) + } +} + +func (tb *LogPipelinesTestBed) simulateOpampClientAcknowledgementForLatestConfig() { + lastMsg := tb.opampClientConn.latestMsgFromServer() + tb.opampServer.OnMessage(tb.opampClientConn, &protobufs.AgentToServer{ + InstanceUid: "test", + EffectiveConfig: &protobufs.EffectiveConfig{ + ConfigMap: lastMsg.RemoteConfig.Config, + }, + RemoteConfigStatus: &protobufs.RemoteConfigStatus{ + Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED, + LastRemoteConfigHash: lastMsg.RemoteConfig.ConfigHash, + }, + }) +} + +func unmarshalPipelinesResponse(apiResponse *app.ApiResponse) ( + *logparsingpipeline.PipelinesResponse, + error, +) { + dataJson, err := json.Marshal(apiResponse.Data) + if err != nil { + return nil, errors.Wrap(err, "could not marshal apiResponse.Data") + } + var pipelinesResp logparsingpipeline.PipelinesResponse + err = json.Unmarshal(dataJson, &pipelinesResp) + if err != nil { + return nil, errors.Wrap(err, "could not unmarshal apiResponse.Data json into PipelinesResponse") + } + + return &pipelinesResp, nil +} + +func assertPipelinesResponseMatchesPostedPipelines( + t *testing.T, + postablePipelines logparsingpipeline.PostablePipelines, + pipelinesResp *logparsingpipeline.PipelinesResponse, +) { + assert.Equal( + t, len(postablePipelines.Pipelines), len(pipelinesResp.Pipelines), + "length mistmatch between posted pipelines and pipelines in response", + ) + for i, pipeline := range pipelinesResp.Pipelines { + postable := postablePipelines.Pipelines[i] + assert.Equal(t, postable.Name, pipeline.Name, "pipeline.Name mismatch") + assert.Equal(t, postable.OrderId, pipeline.OrderId, "pipeline.OrderId mismatch") + assert.Equal(t, postable.Enabled, pipeline.Enabled, "pipeline.Enabled mismatch") + assert.Equal(t, postable.Config, pipeline.Config, "pipeline.Config mismatch") + } +} + +func mockOpampAgent(testDBFilePath string) (*opamp.Server, *mockOpAmpConnection, error) { + // Mock an available opamp agent + testDB, err := opampModel.InitDB(testDBFilePath) + if err != nil { + return nil, nil, err + } + err = agentConf.Initiate(testDB, "sqlite") + if err != nil { + return nil, nil, err + } + + opampServer := opamp.InitializeServer(constants.OpAmpWsEndpoint, nil) + opampClientConnection := &mockOpAmpConnection{} + opampServer.OnMessage( + opampClientConnection, + &protobufs.AgentToServer{ + InstanceUid: "test", + EffectiveConfig: &protobufs.EffectiveConfig{ + ConfigMap: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "otel-collector.yaml": { + Body: []byte(` + receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + processors: + batch: + send_batch_size: 10000 + send_batch_max_size: 11000 + timeout: 10s + exporters: + otlp: + endpoint: otelcol2:4317 + service: + pipelines: + logs: + receivers: [otlp] + processors: [batch] + exporters: [otlp] + `), + ContentType: "text/yaml", + }, + }, + }, + }, + }, + ) + return opampServer, opampClientConnection, nil +} + +func createTestUser() (*model.User, *model.ApiError) { + // Create a test user for auth + ctx := context.Background() + org, apiErr := dao.DB().CreateOrg(ctx, &model.Organization{ + Name: "test", + }) + if apiErr != nil { + return nil, apiErr + } + + group, apiErr := dao.DB().CreateGroup(ctx, &model.Group{ + Name: "test", + }) + if apiErr != nil { + return nil, apiErr + } + + return dao.DB().CreateUser( + ctx, + &model.User{ + Name: "test", + Email: "test@test.com", + Password: "test", + OrgId: org.Id, + GroupId: group.Id, + }, + true, + ) +} + +func NewAuthenticatedTestRequest( + user *model.User, + path string, + postData interface{}, +) (*http.Request, error) { + userJwt, err := auth.GenerateJWTForUser(user) + if err != nil { + return nil, err + } + + var req *http.Request + + if postData != nil { + var body bytes.Buffer + err = json.NewEncoder(&body).Encode(postData) + if err != nil { + return nil, err + } + req = httptest.NewRequest(http.MethodPost, path, &body) + } else { + req = httptest.NewRequest(http.MethodPost, path, nil) + } + + req.Header.Add("Authorization", "Bearer "+userJwt.AccessJwt) + return req, nil +} + +type mockOpAmpConnection struct { + serverToAgentMsgs []*protobufs.ServerToAgent +} + +func (conn *mockOpAmpConnection) Send(ctx context.Context, msg *protobufs.ServerToAgent) error { + conn.serverToAgentMsgs = append(conn.serverToAgentMsgs, msg) + return nil +} + +func (conn *mockOpAmpConnection) latestMsgFromServer() *protobufs.ServerToAgent { + if len(conn.serverToAgentMsgs) < 1 { + return nil + } + return conn.serverToAgentMsgs[len(conn.serverToAgentMsgs)-1] +} + +func (conn *mockOpAmpConnection) LatestPipelinesReceivedFromServer() ([]model.Pipeline, error) { + pipelines := []model.Pipeline{} + lastMsg := conn.latestMsgFromServer() + if lastMsg == nil { + return pipelines, nil + } + + return pipelines, nil +} + +func (conn *mockOpAmpConnection) Disconnect() error { + return nil +} +func (conn *mockOpAmpConnection) RemoteAddr() net.Addr { + return nil +}