feat: aws integration: UI facing QS api for cloud account management (#6771)

* feat: init app/cloud_integrations

* feat: get API test started for cloudintegrations account lifecycle

* feat: cloudintegrations: get controller started

* feat: cloud integrations: add cloudintegrations.Controller to APIHandler and servers

* feat: cloud integrations: get routes started

* feat: cloud integrations: get accounts table schema started

* feat: cloud integrations: get cloudProviderAccountsSQLRepository started

* feat: cloud integrations: cloudProviderAccountsSQLRepository.listAccounts

* feat: cloud integrations: http handler and controller plumbing for /generate-connection-url

* feat: cloud integrations: cloudProviderAccountsSQLRepository.upsert

* feat: cloud integrations: finish up with /generate-connection-url

* feat: cloud integrations: add cloudProviderAccountsRepository.get

* feat: cloud integrations: add API test expectation for being able to get account status

* feat: cloud integrations: add http handler and controller method for getting account status

* feat: cloud integrations: ensure unconnected accounts aren't included in list of connected accounts

* feat: cloud integrations: add test expectation for agent check in request

* feat: cloud integrations: agent check in API

* feat: cloud integrations: ensure polling for status after agent check in works

* feat: cloud integrations: ensure account included in connected account list after agent check in

* feat: cloud integrations: add API expectation for updating account config

* feat: cloud integrations: API for updating cloud account config

* feat: cloud integrations: expectation for agent receiving latest config after account config update

* feat: cloud integrations: expectation for disconnecting cloud accounts from UI

* feat: cloud integrations: API for disconnecting cloud accounts

* feat: cloud integrations: some cleanup

* feat: cloud integrations: some more cleanup

* feat: cloud integrations: repo: scope rows by cloud provider

* feat: testutils: refactor out helper for creating a test sqlite DB

* feat: cloud integrations: controller: add test validating regeneration of connection url

* feat: cloud integrations: controller: validations for agent check ins

* feat: cloud integrations: connected account response structure

* feat: cloud integrations: API response account structure

* feat: cloud integrations: some more cleanup

* feat: cloud integrations: remove cloudProviderAccountsRepository.GetById

* feat: cloud integrations: shouldn't be able to disconnect non-existent account

* feat: cloud integrations: validate agents can't check in to cloud account with 2 signoz ids

* feat: cloud integrations: ensure agents can't check in to cloud account with 2 signoz ids

* feat: cloud integrations: remove stray import of ee/model in cloudintegrations controller
This commit is contained in:
Raj Kamal Singh 2025-01-10 18:43:35 +05:30 committed by GitHub
parent c106f1c9a9
commit d5b847c091
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1280 additions and 3 deletions

View File

@ -12,6 +12,7 @@ import (
"go.signoz.io/signoz/ee/query-service/license"
"go.signoz.io/signoz/ee/query-service/usage"
baseapp "go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
"go.signoz.io/signoz/pkg/query-service/app/integrations"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/cache"
@ -34,6 +35,7 @@ type APIHandlerOptions struct {
FeatureFlags baseint.FeatureLookup
LicenseManager *license.Manager
IntegrationsController *integrations.Controller
CloudIntegrationsController *cloudintegrations.Controller
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
Cache cache.Cache
Gateway *httputil.ReverseProxy
@ -62,6 +64,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
RuleManager: opts.RulesManager,
FeatureFlags: opts.FeatureFlags,
IntegrationsController: opts.IntegrationsController,
CloudIntegrationsController: opts.CloudIntegrationsController,
LogsParsingPipelineController: opts.LogsParsingPipelineController,
Cache: opts.Cache,
FluxInterval: opts.FluxInterval,

View File

@ -40,6 +40,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/agentConf"
baseapp "go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
baseexplorer "go.signoz.io/signoz/pkg/query-service/app/explorer"
"go.signoz.io/signoz/pkg/query-service/app/integrations"
@ -221,6 +222,13 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
)
}
cloudIntegrationsController, err := cloudintegrations.NewController(localDB)
if err != nil {
return nil, fmt.Errorf(
"couldn't create cloud provider integrations controller: %w", err,
)
}
// ingestion pipelines manager
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
localDB, "sqlite", integrationsController.GetPipelinesForInstalledIntegrations,
@ -271,6 +279,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
FeatureFlags: lm,
LicenseManager: lm,
IntegrationsController: integrationsController,
CloudIntegrationsController: cloudIntegrationsController,
LogsParsingPipelineController: logParsingPipelineController,
Cache: c,
FluxInterval: fluxInterval,
@ -370,6 +379,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web *web.Web) (*
apiHandler.RegisterRoutes(r, am)
apiHandler.RegisterLogsRoutes(r, am)
apiHandler.RegisterIntegrationRoutes(r, am)
apiHandler.RegisterCloudIntegrationsRoutes(r, am)
apiHandler.RegisterQueryRangeV3Routes(r, am)
apiHandler.RegisterInfraMetricsRoutes(r, am)
apiHandler.RegisterQueryRangeV4Routes(r, am)

View File

@ -0,0 +1,5 @@
# SigNoz Cloud Integrations
Cloud integrations are unlike the rest of SigNoz integrations.
They have a different UX and so require a different API.
They will also be limited in number and are not expected to have community contributed implementations

View File

@ -0,0 +1,247 @@
package cloudintegrations
import (
"context"
"fmt"
"slices"
"time"
"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/model"
)
var SupportedCloudProviders = []string{
"aws",
}
func validateCloudProviderName(name string) *model.ApiError {
if !slices.Contains(SupportedCloudProviders, name) {
return model.BadRequest(fmt.Errorf("invalid cloud provider: %s", name))
}
return nil
}
type Controller struct {
repo cloudProviderAccountsRepository
}
func NewController(db *sqlx.DB) (
*Controller, error,
) {
repo, err := newCloudProviderAccountsRepository(db)
if err != nil {
return nil, fmt.Errorf("couldn't create cloud provider accounts repo: %w", err)
}
return &Controller{
repo: repo,
}, nil
}
type Account struct {
Id string `json:"id"`
CloudAccountId string `json:"cloud_account_id"`
Config AccountConfig `json:"config"`
Status AccountStatus `json:"status"`
}
type ConnectedAccountsListResponse struct {
Accounts []Account `json:"accounts"`
}
func (c *Controller) ListConnectedAccounts(
ctx context.Context, cloudProvider string,
) (
*ConnectedAccountsListResponse, *model.ApiError,
) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
accountRecords, apiErr := c.repo.listConnected(ctx, cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud accounts")
}
connectedAccounts := []Account{}
for _, a := range accountRecords {
connectedAccounts = append(connectedAccounts, a.account())
}
return &ConnectedAccountsListResponse{
Accounts: connectedAccounts,
}, nil
}
type GenerateConnectionUrlRequest struct {
// Optional. To be specified for updates.
AccountId *string `json:"account_id,omitempty"`
AccountConfig AccountConfig `json:"account_config"`
AgentConfig SigNozAgentConfig `json:"agent_config"`
}
type SigNozAgentConfig struct {
// The region in which SigNoz agent should be installed.
Region string `json:"region"`
}
type GenerateConnectionUrlResponse struct {
AccountId string `json:"account_id"`
ConnectionUrl string `json:"connection_url"`
}
func (c *Controller) GenerateConnectionUrl(
ctx context.Context, cloudProvider string, req GenerateConnectionUrlRequest,
) (*GenerateConnectionUrlResponse, *model.ApiError) {
// Account connection with a simple connection URL may not be available for all providers.
if cloudProvider != "aws" {
return nil, model.BadRequest(fmt.Errorf("unsupported cloud provider: %s", cloudProvider))
}
account, apiErr := c.repo.upsert(
ctx, cloudProvider, req.AccountId, &req.AccountConfig, nil, nil, nil,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
}
// TODO(Raj): Add actual cloudformation template for AWS integration after it has been shipped.
connectionUrl := fmt.Sprintf(
"https://%s.console.aws.amazon.com/cloudformation/home?region=%s#/stacks/quickcreate?stackName=SigNozIntegration/",
req.AgentConfig.Region, req.AgentConfig.Region,
)
return &GenerateConnectionUrlResponse{
AccountId: account.Id,
ConnectionUrl: connectionUrl,
}, nil
}
type AccountStatusResponse struct {
Id string `json:"id"`
Status AccountStatus `json:"status"`
}
func (c *Controller) GetAccountStatus(
ctx context.Context, cloudProvider string, accountId string,
) (
*AccountStatusResponse, *model.ApiError,
) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
account, apiErr := c.repo.get(ctx, cloudProvider, accountId)
if apiErr != nil {
return nil, apiErr
}
resp := AccountStatusResponse{
Id: account.Id,
Status: account.status(),
}
return &resp, nil
}
type AgentCheckInRequest struct {
AccountId string `json:"account_id"`
CloudAccountId string `json:"cloud_account_id"`
// Arbitrary cloud specific Agent data
Data map[string]any `json:"data,omitempty"`
}
type AgentCheckInResponse struct {
Account AccountRecord `json:"account"`
}
func (c *Controller) CheckInAsAgent(
ctx context.Context, cloudProvider string, req AgentCheckInRequest,
) (*AgentCheckInResponse, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
existingAccount, apiErr := c.repo.get(ctx, cloudProvider, req.AccountId)
if existingAccount != nil && existingAccount.CloudAccountId != nil && *existingAccount.CloudAccountId != req.CloudAccountId {
return nil, model.BadRequest(fmt.Errorf(
"can't check in with new %s account id %s for account %s with existing %s id %s",
cloudProvider, req.CloudAccountId, existingAccount.Id, cloudProvider, *existingAccount.CloudAccountId,
))
}
existingAccount, apiErr = c.repo.getConnectedCloudAccount(ctx, cloudProvider, req.CloudAccountId)
if existingAccount != nil && existingAccount.Id != req.AccountId {
return nil, model.BadRequest(fmt.Errorf(
"can't check in to %s account %s with id %s. already connected with id %s",
cloudProvider, req.CloudAccountId, req.AccountId, existingAccount.Id,
))
}
agentReport := AgentReport{
TimestampMillis: time.Now().UnixMilli(),
Data: req.Data,
}
account, apiErr := c.repo.upsert(
ctx, cloudProvider, &req.AccountId, nil, &req.CloudAccountId, &agentReport, nil,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
}
return &AgentCheckInResponse{
Account: *account,
}, nil
}
type UpdateAccountConfigRequest struct {
Config AccountConfig `json:"config"`
}
func (c *Controller) UpdateAccountConfig(
ctx context.Context,
cloudProvider string,
accountId string,
req UpdateAccountConfigRequest,
) (*Account, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
accountRecord, apiErr := c.repo.upsert(
ctx, cloudProvider, &accountId, &req.Config, nil, nil, nil,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
}
account := accountRecord.account()
return &account, nil
}
func (c *Controller) DisconnectAccount(
ctx context.Context, cloudProvider string, accountId string,
) (*AccountRecord, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
account, apiErr := c.repo.get(ctx, cloudProvider, accountId)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't disconnect account")
}
tsNow := time.Now()
account, apiErr = c.repo.upsert(
ctx, cloudProvider, &accountId, nil, nil, nil, &tsNow,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't disconnect account")
}
return account, nil
}

View File

@ -0,0 +1,153 @@
package cloudintegrations
import (
"context"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/utils"
)
func TestRegenerateConnectionUrlWithUpdatedConfig(t *testing.T) {
require := require.New(t)
testDB, _ := utils.NewTestSqliteDB(t)
controller, err := NewController(testDB)
require.NoError(err)
// should be able to generate connection url for
// same account id again with updated config
testAccountConfig1 := AccountConfig{EnabledRegions: []string{"us-east-1", "us-west-1"}}
resp1, apiErr := controller.GenerateConnectionUrl(
context.TODO(), "aws", GenerateConnectionUrlRequest{
AccountConfig: testAccountConfig1,
AgentConfig: SigNozAgentConfig{Region: "us-east-2"},
},
)
require.Nil(apiErr)
require.NotEmpty(resp1.ConnectionUrl)
require.NotEmpty(resp1.AccountId)
testAccountId := resp1.AccountId
account, apiErr := controller.repo.get(
context.TODO(), "aws", testAccountId,
)
require.Nil(apiErr)
require.Equal(testAccountConfig1, *account.Config)
testAccountConfig2 := AccountConfig{EnabledRegions: []string{"us-east-2", "us-west-2"}}
resp2, apiErr := controller.GenerateConnectionUrl(
context.TODO(), "aws", GenerateConnectionUrlRequest{
AccountId: &testAccountId,
AccountConfig: testAccountConfig2,
AgentConfig: SigNozAgentConfig{Region: "us-east-2"},
},
)
require.Nil(apiErr)
require.Equal(testAccountId, resp2.AccountId)
account, apiErr = controller.repo.get(
context.TODO(), "aws", testAccountId,
)
require.Nil(apiErr)
require.Equal(testAccountConfig2, *account.Config)
}
func TestAgentCheckIns(t *testing.T) {
require := require.New(t)
testDB, _ := utils.NewTestSqliteDB(t)
controller, err := NewController(testDB)
require.NoError(err)
// An agent should be able to check in from a cloud account even
// if no connection url was requested (no account with agent's account id exists)
testAccountId1 := uuid.NewString()
testCloudAccountId1 := "546311234"
resp1, apiErr := controller.CheckInAsAgent(
context.TODO(), "aws", AgentCheckInRequest{
AccountId: testAccountId1,
CloudAccountId: testCloudAccountId1,
},
)
require.Nil(apiErr)
require.Equal(testAccountId1, resp1.Account.Id)
require.Equal(testCloudAccountId1, *resp1.Account.CloudAccountId)
// The agent should not be able to check in with a different
// cloud account id for the same account.
testCloudAccountId2 := "99999999"
_, apiErr = controller.CheckInAsAgent(
context.TODO(), "aws", AgentCheckInRequest{
AccountId: testAccountId1,
CloudAccountId: testCloudAccountId2,
},
)
require.NotNil(apiErr)
// The agent should not be able to check-in with a particular cloud account id
// if another connected AccountRecord exists for same cloud account
// i.e. there can't be 2 connected account records for the same cloud account id
// at any point in time.
existingConnected, apiErr := controller.repo.getConnectedCloudAccount(
context.TODO(), "aws", testCloudAccountId1,
)
require.Nil(apiErr)
require.NotNil(existingConnected)
require.Equal(testCloudAccountId1, *existingConnected.CloudAccountId)
require.Nil(existingConnected.RemovedAt)
testAccountId2 := uuid.NewString()
_, apiErr = controller.CheckInAsAgent(
context.TODO(), "aws", AgentCheckInRequest{
AccountId: testAccountId2,
CloudAccountId: testCloudAccountId1,
},
)
require.NotNil(apiErr)
// After disconnecting existing account record, the agent should be able to
// connected for a particular cloud account id
_, apiErr = controller.DisconnectAccount(
context.TODO(), "aws", testAccountId1,
)
existingConnected, apiErr = controller.repo.getConnectedCloudAccount(
context.TODO(), "aws", testCloudAccountId1,
)
require.Nil(existingConnected)
require.NotNil(apiErr)
require.Equal(model.ErrorNotFound, apiErr.Type())
_, apiErr = controller.CheckInAsAgent(
context.TODO(), "aws", AgentCheckInRequest{
AccountId: testAccountId2,
CloudAccountId: testCloudAccountId1,
},
)
require.Nil(apiErr)
// should be able to keep checking in
_, apiErr = controller.CheckInAsAgent(
context.TODO(), "aws", AgentCheckInRequest{
AccountId: testAccountId2,
CloudAccountId: testCloudAccountId1,
},
)
require.Nil(apiErr)
}
func TestCantDisconnectNonExistentAccount(t *testing.T) {
require := require.New(t)
testDB, _ := utils.NewTestSqliteDB(t)
controller, err := NewController(testDB)
require.NoError(err)
// Attempting to disconnect a non-existent account should return error
account, apiErr := controller.DisconnectAccount(
context.TODO(), "aws", uuid.NewString(),
)
require.NotNil(apiErr)
require.Equal(model.ErrorNotFound, apiErr.Type())
require.Nil(account)
}

View File

@ -0,0 +1,117 @@
package cloudintegrations
import (
"database/sql/driver"
"encoding/json"
"fmt"
"time"
)
// Represents a cloud provider account for cloud integrations
type AccountRecord struct {
CloudProvider string `json:"cloud_provider" db:"cloud_provider"`
Id string `json:"id" db:"id"`
Config *AccountConfig `json:"config" db:"config_json"`
CloudAccountId *string `json:"cloud_account_id" db:"cloud_account_id"`
LastAgentReport *AgentReport `json:"last_agent_report" db:"last_agent_report_json"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
RemovedAt *time.Time `json:"removed_at" db:"removed_at"`
}
type AccountConfig struct {
EnabledRegions []string `json:"regions"`
}
func DefaultAccountConfig() AccountConfig {
return AccountConfig{
EnabledRegions: []string{},
}
}
// For serializing from db
func (c *AccountConfig) Scan(src any) error {
data, ok := src.([]byte)
if !ok {
return fmt.Errorf("tried to scan from %T instead of bytes", src)
}
return json.Unmarshal(data, &c)
}
// For serializing to db
func (c *AccountConfig) Value() (driver.Value, error) {
if c == nil {
return nil, nil
}
serialized, err := json.Marshal(c)
if err != nil {
return nil, fmt.Errorf(
"couldn't serialize cloud account config to JSON: %w", err,
)
}
return serialized, nil
}
type AgentReport struct {
TimestampMillis int64 `json:"timestamp_millis"`
Data map[string]any `json:"data"`
}
// For serializing from db
func (r *AgentReport) Scan(src any) error {
data, ok := src.([]byte)
if !ok {
return fmt.Errorf("tried to scan from %T instead of bytes", src)
}
return json.Unmarshal(data, &r)
}
// For serializing to db
func (r *AgentReport) Value() (driver.Value, error) {
if r == nil {
return nil, nil
}
serialized, err := json.Marshal(r)
if err != nil {
return nil, fmt.Errorf(
"couldn't serialize agent report to JSON: %w", err,
)
}
return serialized, nil
}
type AccountStatus struct {
Integration AccountIntegrationStatus `json:"integration"`
}
type AccountIntegrationStatus struct {
LastHeartbeatTsMillis *int64 `json:"last_heartbeat_ts_ms"`
}
func (a *AccountRecord) status() AccountStatus {
status := AccountStatus{}
if a.LastAgentReport != nil {
lastHeartbeat := a.LastAgentReport.TimestampMillis
status.Integration.LastHeartbeatTsMillis = &lastHeartbeat
}
return status
}
func (a *AccountRecord) account() Account {
ca := Account{Id: a.Id, Status: a.status()}
if a.CloudAccountId != nil {
ca.CloudAccountId = *a.CloudAccountId
}
if a.Config != nil {
ca.Config = *a.Config
} else {
ca.Config = DefaultAccountConfig()
}
return ca
}

View File

@ -0,0 +1,270 @@
package cloudintegrations
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/model"
)
type cloudProviderAccountsRepository interface {
listConnected(ctx context.Context, cloudProvider string) ([]AccountRecord, *model.ApiError)
get(ctx context.Context, cloudProvider string, id string) (*AccountRecord, *model.ApiError)
getConnectedCloudAccount(
ctx context.Context, cloudProvider string, cloudAccountId string,
) (*AccountRecord, *model.ApiError)
// Insert an account or update it by (cloudProvider, id)
// for specified non-empty fields
upsert(
ctx context.Context,
cloudProvider string,
id *string,
config *AccountConfig,
cloudAccountId *string,
agentReport *AgentReport,
removedAt *time.Time,
) (*AccountRecord, *model.ApiError)
}
func newCloudProviderAccountsRepository(db *sqlx.DB) (
*cloudProviderAccountsSQLRepository, error,
) {
if err := InitSqliteDBIfNeeded(db); err != nil {
return nil, fmt.Errorf("could not init sqlite DB for cloudintegrations: %w", err)
}
return &cloudProviderAccountsSQLRepository{
db: db,
}, nil
}
func InitSqliteDBIfNeeded(db *sqlx.DB) error {
if db == nil {
return fmt.Errorf("db is required")
}
createTablesStatements := `
CREATE TABLE IF NOT EXISTS cloud_integrations_accounts(
cloud_provider TEXT NOT NULL,
id TEXT NOT NULL,
config_json TEXT,
cloud_account_id TEXT,
last_agent_report_json TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
removed_at TIMESTAMP,
UNIQUE(cloud_provider, id)
)
`
_, err := db.Exec(createTablesStatements)
if err != nil {
return fmt.Errorf(
"could not ensure cloud provider integrations schema in sqlite DB: %w", err,
)
}
return nil
}
type cloudProviderAccountsSQLRepository struct {
db *sqlx.DB
}
func (r *cloudProviderAccountsSQLRepository) listConnected(
ctx context.Context, cloudProvider string,
) ([]AccountRecord, *model.ApiError) {
accounts := []AccountRecord{}
err := r.db.SelectContext(
ctx, &accounts, `
select
cloud_provider,
id,
config_json,
cloud_account_id,
last_agent_report_json,
created_at,
removed_at
from cloud_integrations_accounts
where
cloud_provider=$1
and removed_at is NULL
and cloud_account_id is not NULL
and last_agent_report_json is not NULL
order by created_at
`, cloudProvider,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not query connected cloud accounts: %w", err,
))
}
return accounts, nil
}
func (r *cloudProviderAccountsSQLRepository) get(
ctx context.Context, cloudProvider string, id string,
) (*AccountRecord, *model.ApiError) {
var result AccountRecord
err := r.db.GetContext(
ctx, &result, `
select
cloud_provider,
id,
config_json,
cloud_account_id,
last_agent_report_json,
created_at,
removed_at
from cloud_integrations_accounts
where
cloud_provider=$1
and id=$2
`,
cloudProvider, id,
)
if err == sql.ErrNoRows {
return nil, model.NotFoundError(fmt.Errorf(
"couldn't find account with Id %s", id,
))
} else if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't query cloud provider accounts: %w", err,
))
}
return &result, nil
}
func (r *cloudProviderAccountsSQLRepository) getConnectedCloudAccount(
ctx context.Context, cloudProvider string, cloudAccountId string,
) (*AccountRecord, *model.ApiError) {
var result AccountRecord
err := r.db.GetContext(
ctx, &result, `
select
cloud_provider,
id,
config_json,
cloud_account_id,
last_agent_report_json,
created_at,
removed_at
from cloud_integrations_accounts
where
cloud_provider=$1
and cloud_account_id=$2
and last_agent_report_json is not NULL
and removed_at is NULL
`,
cloudProvider, cloudAccountId,
)
if err == sql.ErrNoRows {
return nil, model.NotFoundError(fmt.Errorf(
"couldn't find connected cloud account %s", cloudAccountId,
))
} else if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't query cloud provider accounts: %w", err,
))
}
return &result, nil
}
func (r *cloudProviderAccountsSQLRepository) upsert(
ctx context.Context,
cloudProvider string,
id *string,
config *AccountConfig,
cloudAccountId *string,
agentReport *AgentReport,
removedAt *time.Time,
) (*AccountRecord, *model.ApiError) {
// Insert
if id == nil {
newId := uuid.NewString()
id = &newId
}
// Prepare clause for setting values in `on conflict do update`
onConflictSetStmts := []string{}
setColStatement := func(col string) string {
return fmt.Sprintf("%s=excluded.%s", col, col)
}
if config != nil {
onConflictSetStmts = append(
onConflictSetStmts, setColStatement("config_json"),
)
}
if cloudAccountId != nil {
onConflictSetStmts = append(
onConflictSetStmts, setColStatement("cloud_account_id"),
)
}
if agentReport != nil {
onConflictSetStmts = append(
onConflictSetStmts, setColStatement("last_agent_report_json"),
)
}
if removedAt != nil {
onConflictSetStmts = append(
onConflictSetStmts, setColStatement("removed_at"),
)
}
onConflictClause := ""
if len(onConflictSetStmts) > 0 {
onConflictClause = fmt.Sprintf(
"on conflict(cloud_provider, id) do update SET\n%s",
strings.Join(onConflictSetStmts, ",\n"),
)
}
insertQuery := fmt.Sprintf(`
INSERT INTO cloud_integrations_accounts (
cloud_provider,
id,
config_json,
cloud_account_id,
last_agent_report_json,
removed_at
) values ($1, $2, $3, $4, $5, $6)
%s`, onConflictClause,
)
_, dbErr := r.db.ExecContext(
ctx, insertQuery,
cloudProvider, id, config, cloudAccountId, agentReport, removedAt,
)
if dbErr != nil {
return nil, model.InternalError(fmt.Errorf(
"could not upsert cloud account record: %w", dbErr,
))
}
upsertedAccount, apiErr := r.get(ctx, cloudProvider, *id)
if apiErr != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't fetch upserted account by id: %w", apiErr.ToError(),
))
}
return upsertedAccount, nil
}

View File

@ -25,6 +25,7 @@ import (
"github.com/prometheus/prometheus/promql"
"go.signoz.io/signoz/pkg/query-service/agentConf"
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/app/explorer"
"go.signoz.io/signoz/pkg/query-service/app/inframetrics"
@ -102,6 +103,8 @@ type APIHandler struct {
IntegrationsController *integrations.Controller
CloudIntegrationsController *cloudintegrations.Controller
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
// SetupCompleted indicates if SigNoz is ready for general use.
@ -155,6 +158,9 @@ type APIHandlerOpts struct {
// Integrations
IntegrationsController *integrations.Controller
// Cloud Provider Integrations
CloudIntegrationsController *cloudintegrations.Controller
// Log parsing pipelines
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
@ -226,6 +232,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
ruleManager: opts.RuleManager,
featureFlags: opts.FeatureFlags,
IntegrationsController: opts.IntegrationsController,
CloudIntegrationsController: opts.CloudIntegrationsController,
LogsParsingPipelineController: opts.LogsParsingPipelineController,
querier: querier,
querierV2: querierv2,
@ -3867,6 +3874,157 @@ func (aH *APIHandler) UninstallIntegration(
aH.Respond(w, map[string]interface{}{})
}
// cloud provider integrations
func (aH *APIHandler) RegisterCloudIntegrationsRoutes(router *mux.Router, am *AuthMiddleware) {
subRouter := router.PathPrefix("/api/v1/cloud-integrations").Subrouter()
subRouter.HandleFunc(
"/{cloudProvider}/accounts/generate-connection-url", am.EditAccess(aH.CloudIntegrationsGenerateConnectionUrl),
).Methods(http.MethodPost)
subRouter.HandleFunc(
"/{cloudProvider}/accounts", am.ViewAccess(aH.CloudIntegrationsListConnectedAccounts),
).Methods(http.MethodGet)
subRouter.HandleFunc(
"/{cloudProvider}/accounts/{accountId}/status", am.ViewAccess(aH.CloudIntegrationsGetAccountStatus),
).Methods(http.MethodGet)
subRouter.HandleFunc(
"/{cloudProvider}/accounts/{accountId}/config", am.EditAccess(aH.CloudIntegrationsUpdateAccountConfig),
).Methods(http.MethodPost)
subRouter.HandleFunc(
"/{cloudProvider}/accounts/{accountId}/disconnect", am.EditAccess(aH.CloudIntegrationsDisconnectAccount),
).Methods(http.MethodPost)
subRouter.HandleFunc(
"/{cloudProvider}/agent-check-in", am.EditAccess(aH.CloudIntegrationsAgentCheckIn),
).Methods(http.MethodPost)
}
func (aH *APIHandler) CloudIntegrationsListConnectedAccounts(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
resp, apiErr := aH.CloudIntegrationsController.ListConnectedAccounts(
r.Context(), cloudProvider,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, resp)
}
func (aH *APIHandler) CloudIntegrationsGenerateConnectionUrl(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
req := cloudintegrations.GenerateConnectionUrlRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
result, apiErr := aH.CloudIntegrationsController.GenerateConnectionUrl(
r.Context(), cloudProvider, req,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, result)
}
func (aH *APIHandler) CloudIntegrationsGetAccountStatus(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
accountId := mux.Vars(r)["accountId"]
resp, apiErr := aH.CloudIntegrationsController.GetAccountStatus(
r.Context(), cloudProvider, accountId,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, resp)
}
func (aH *APIHandler) CloudIntegrationsAgentCheckIn(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
req := cloudintegrations.AgentCheckInRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
result, apiErr := aH.CloudIntegrationsController.CheckInAsAgent(
r.Context(), cloudProvider, req,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, result)
}
func (aH *APIHandler) CloudIntegrationsUpdateAccountConfig(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
accountId := mux.Vars(r)["accountId"]
req := cloudintegrations.UpdateAccountConfigRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
RespondError(w, model.BadRequest(err), nil)
return
}
result, apiErr := aH.CloudIntegrationsController.UpdateAccountConfig(
r.Context(), cloudProvider, accountId, req,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, result)
}
func (aH *APIHandler) CloudIntegrationsDisconnectAccount(
w http.ResponseWriter, r *http.Request,
) {
cloudProvider := mux.Vars(r)["cloudProvider"]
accountId := mux.Vars(r)["accountId"]
result, apiErr := aH.CloudIntegrationsController.DisconnectAccount(
r.Context(), cloudProvider, accountId,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.Respond(w, result)
}
// logs
func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router, am *AuthMiddleware) {
subRouter := router.PathPrefix("/api/v1/logs").Subrouter()

View File

@ -24,6 +24,7 @@ import (
"github.com/soheilhy/cmux"
"go.signoz.io/signoz/pkg/query-service/agentConf"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/app/integrations"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
@ -181,6 +182,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, fmt.Errorf("couldn't create integrations controller: %w", err)
}
cloudIntegrationsController, err := cloudintegrations.NewController(localDB)
if err != nil {
return nil, fmt.Errorf("couldn't create cloud provider integrations controller: %w", err)
}
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
localDB, "sqlite", integrationsController.GetPipelinesForInstalledIntegrations,
)
@ -200,6 +206,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
RuleManager: rm,
FeatureFlags: fm,
IntegrationsController: integrationsController,
CloudIntegrationsController: cloudIntegrationsController,
LogsParsingPipelineController: logParsingPipelineController,
Cache: c,
FluxInterval: fluxInterval,
@ -310,6 +317,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) {
api.RegisterRoutes(r, am)
api.RegisterLogsRoutes(r, am)
api.RegisterIntegrationRoutes(r, am)
api.RegisterCloudIntegrationsRoutes(r, am)
api.RegisterQueryRangeV3Routes(r, am)
api.RegisterInfraMetricsRoutes(r, am)
api.RegisterWebSocketPaths(r, am)

View File

@ -0,0 +1,299 @@
package tests
import (
"encoding/json"
"fmt"
"net/http"
"testing"
"time"
"github.com/jmoiron/sqlx"
mockhouse "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/dao"
"go.signoz.io/signoz/pkg/query-service/featureManager"
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/utils"
)
func TestAWSIntegrationLifecycle(t *testing.T) {
// Test for happy path of connecting and managing AWS integration accounts
t0 := time.Now()
require := require.New(t)
testbed := NewCloudIntegrationsTestBed(t, nil)
accountsListResp := testbed.GetConnectedAccountsListFromQS("aws")
require.Equal(len(accountsListResp.Accounts), 0,
"No accounts should be connected at the beginning",
)
// Should be able to generate a connection url from UI - initializing an integration account
testAccountConfig := cloudintegrations.AccountConfig{
EnabledRegions: []string{"us-east-1", "us-east-2"},
}
connectionUrlResp := testbed.GenerateConnectionUrlFromQS(
"aws", cloudintegrations.GenerateConnectionUrlRequest{
AgentConfig: cloudintegrations.SigNozAgentConfig{
Region: "us-east-1",
},
AccountConfig: testAccountConfig,
})
testAccountId := connectionUrlResp.AccountId
require.NotEmpty(testAccountId)
connectionUrl := connectionUrlResp.ConnectionUrl
require.NotEmpty(connectionUrl)
// Should be able to poll for account connection status from the UI
accountStatusResp := testbed.GetAccountStatusFromQS("aws", testAccountId)
require.Equal(testAccountId, accountStatusResp.Id)
require.Nil(accountStatusResp.Status.Integration.LastHeartbeatTsMillis)
// The unconnected account should not show up in connected accounts list yet
accountsListResp1 := testbed.GetConnectedAccountsListFromQS("aws")
require.Equal(0, len(accountsListResp1.Accounts))
// An agent installed in user's AWS account should be able to check in for the new integration account
tsMillisBeforeAgentCheckIn := time.Now().UnixMilli()
testAWSAccountId := "4563215233"
agentCheckInResp := testbed.CheckInAsAgentWithQS(
"aws", cloudintegrations.AgentCheckInRequest{
AccountId: testAccountId,
CloudAccountId: testAWSAccountId,
},
)
require.Equal(testAccountId, agentCheckInResp.Account.Id)
require.Equal(testAccountConfig, *agentCheckInResp.Account.Config)
require.Equal(testAWSAccountId, *agentCheckInResp.Account.CloudAccountId)
require.LessOrEqual(t0.Unix(), agentCheckInResp.Account.CreatedAt.Unix())
require.Nil(agentCheckInResp.Account.RemovedAt)
// Polling for connection status from UI should now return latest status
accountStatusResp1 := testbed.GetAccountStatusFromQS("aws", testAccountId)
require.Equal(testAccountId, accountStatusResp1.Id)
require.NotNil(accountStatusResp1.Status.Integration.LastHeartbeatTsMillis)
require.LessOrEqual(
tsMillisBeforeAgentCheckIn,
*accountStatusResp1.Status.Integration.LastHeartbeatTsMillis,
)
// The account should now show up in list of connected accounts.
accountsListResp2 := testbed.GetConnectedAccountsListFromQS("aws")
require.Equal(len(accountsListResp2.Accounts), 1)
require.Equal(testAccountId, accountsListResp2.Accounts[0].Id)
require.Equal(testAWSAccountId, accountsListResp2.Accounts[0].CloudAccountId)
// Should be able to update account config from UI
testAccountConfig2 := cloudintegrations.AccountConfig{
EnabledRegions: []string{"us-east-2", "us-west-1"},
}
latestAccount := testbed.UpdateAccountConfigWithQS(
"aws", testAccountId, testAccountConfig2,
)
require.Equal(testAccountId, latestAccount.Id)
require.Equal(testAccountConfig2, *latestAccount.Config)
// The agent should now receive latest account config.
agentCheckInResp1 := testbed.CheckInAsAgentWithQS(
"aws", cloudintegrations.AgentCheckInRequest{
AccountId: testAccountId,
CloudAccountId: testAWSAccountId,
},
)
require.Equal(testAccountId, agentCheckInResp1.Account.Id)
require.Equal(testAccountConfig2, *agentCheckInResp1.Account.Config)
require.Equal(testAWSAccountId, *agentCheckInResp1.Account.CloudAccountId)
require.Nil(agentCheckInResp1.Account.RemovedAt)
// Should be able to disconnect/remove account from UI.
tsBeforeDisconnect := time.Now()
latestAccount = testbed.DisconnectAccountWithQS("aws", testAccountId)
require.Equal(testAccountId, latestAccount.Id)
require.LessOrEqual(tsBeforeDisconnect, *latestAccount.RemovedAt)
// The agent should receive the disconnected status in account config post disconnection
agentCheckInResp2 := testbed.CheckInAsAgentWithQS(
"aws", cloudintegrations.AgentCheckInRequest{
AccountId: testAccountId,
CloudAccountId: testAWSAccountId,
},
)
require.Equal(testAccountId, agentCheckInResp2.Account.Id)
require.Equal(testAWSAccountId, *agentCheckInResp2.Account.CloudAccountId)
require.LessOrEqual(tsBeforeDisconnect, *agentCheckInResp2.Account.RemovedAt)
}
type CloudIntegrationsTestBed struct {
t *testing.T
testUser *model.User
qsHttpHandler http.Handler
mockClickhouse mockhouse.ClickConnMockCommon
}
// testDB can be injected for sharing a DB across multiple integration testbeds.
func NewCloudIntegrationsTestBed(t *testing.T, testDB *sqlx.DB) *CloudIntegrationsTestBed {
if testDB == nil {
testDB = utils.NewQueryServiceDBForTests(t)
}
controller, err := cloudintegrations.NewController(testDB)
if err != nil {
t.Fatalf("could not create cloud integrations controller: %v", err)
}
fm := featureManager.StartManager()
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{
AppDao: dao.DB(),
CloudIntegrationsController: controller,
FeatureFlags: fm,
})
if err != nil {
t.Fatalf("could not create a new ApiHandler: %v", err)
}
router := app.NewRouter()
am := app.NewAuthMiddleware(auth.GetUserFromRequest)
apiHandler.RegisterRoutes(router, am)
apiHandler.RegisterCloudIntegrationsRoutes(router, am)
user, apiErr := createTestUser()
if apiErr != nil {
t.Fatalf("could not create a test user: %v", apiErr)
}
return &CloudIntegrationsTestBed{
t: t,
testUser: user,
qsHttpHandler: router,
}
}
func (tb *CloudIntegrationsTestBed) GetConnectedAccountsListFromQS(
cloudProvider string,
) *cloudintegrations.ConnectedAccountsListResponse {
respDataJson := tb.RequestQS(fmt.Sprintf("/api/v1/cloud-integrations/%s/accounts", cloudProvider), nil)
var resp cloudintegrations.ConnectedAccountsListResponse
err := json.Unmarshal(respDataJson, &resp)
if err != nil {
tb.t.Fatalf("could not unmarshal apiResponse.Data json into AccountsListResponse")
}
return &resp
}
func (tb *CloudIntegrationsTestBed) GenerateConnectionUrlFromQS(
cloudProvider string, req cloudintegrations.GenerateConnectionUrlRequest,
) *cloudintegrations.GenerateConnectionUrlResponse {
respDataJson := tb.RequestQS(
fmt.Sprintf("/api/v1/cloud-integrations/%s/accounts/generate-connection-url", cloudProvider),
req,
)
var resp cloudintegrations.GenerateConnectionUrlResponse
err := json.Unmarshal(respDataJson, &resp)
if err != nil {
tb.t.Fatalf("could not unmarshal apiResponse.Data json into map[string]any")
}
return &resp
}
func (tb *CloudIntegrationsTestBed) GetAccountStatusFromQS(
cloudProvider string, accountId string,
) *cloudintegrations.AccountStatusResponse {
respDataJson := tb.RequestQS(fmt.Sprintf(
"/api/v1/cloud-integrations/%s/accounts/%s/status",
cloudProvider, accountId,
), nil)
var resp cloudintegrations.AccountStatusResponse
err := json.Unmarshal(respDataJson, &resp)
if err != nil {
tb.t.Fatalf("could not unmarshal apiResponse.Data json into AccountStatusResponse")
}
return &resp
}
func (tb *CloudIntegrationsTestBed) CheckInAsAgentWithQS(
cloudProvider string, req cloudintegrations.AgentCheckInRequest,
) *cloudintegrations.AgentCheckInResponse {
respDataJson := tb.RequestQS(
fmt.Sprintf("/api/v1/cloud-integrations/%s/agent-check-in", cloudProvider), req,
)
var resp cloudintegrations.AgentCheckInResponse
err := json.Unmarshal(respDataJson, &resp)
if err != nil {
tb.t.Fatalf("could not unmarshal apiResponse.Data json into AgentCheckInResponse")
}
return &resp
}
func (tb *CloudIntegrationsTestBed) UpdateAccountConfigWithQS(
cloudProvider string, accountId string, newConfig cloudintegrations.AccountConfig,
) *cloudintegrations.AccountRecord {
respDataJson := tb.RequestQS(
fmt.Sprintf(
"/api/v1/cloud-integrations/%s/accounts/%s/config",
cloudProvider, accountId,
), cloudintegrations.UpdateAccountConfigRequest{
Config: newConfig,
},
)
var resp cloudintegrations.AccountRecord
err := json.Unmarshal(respDataJson, &resp)
if err != nil {
tb.t.Fatalf("could not unmarshal apiResponse.Data json into Account")
}
return &resp
}
func (tb *CloudIntegrationsTestBed) DisconnectAccountWithQS(
cloudProvider string, accountId string,
) *cloudintegrations.AccountRecord {
respDataJson := tb.RequestQS(
fmt.Sprintf(
"/api/v1/cloud-integrations/%s/accounts/%s/disconnect",
cloudProvider, accountId,
), map[string]any{},
)
var resp cloudintegrations.AccountRecord
err := json.Unmarshal(respDataJson, &resp)
if err != nil {
tb.t.Fatalf("could not unmarshal apiResponse.Data json into Account")
}
return &resp
}
func (tb *CloudIntegrationsTestBed) RequestQS(
path string,
postData interface{},
) (responseDataJson []byte) {
req, err := AuthenticatedRequestForTest(
tb.testUser, path, postData,
)
if err != nil {
tb.t.Fatalf("couldn't create authenticated test request: %v", err)
}
result, err := HandleTestRequest(tb.qsHttpHandler, req, 200)
if err != nil {
tb.t.Fatalf("test request failed: %v", err)
}
dataJson, err := json.Marshal(result.Data)
if err != nil {
tb.t.Fatalf("could not marshal apiResponse.Data: %v", err)
}
return dataJson
}

View File

@ -5,24 +5,31 @@ import (
"testing"
"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/dao"
)
func NewQueryServiceDBForTests(t *testing.T) *sqlx.DB {
func NewTestSqliteDB(t *testing.T) (testDB *sqlx.DB, testDBFilePath string) {
testDBFile, err := os.CreateTemp("", "test-signoz-db-*")
if err != nil {
t.Fatalf("could not create temp file for test db: %v", err)
}
testDBFilePath := testDBFile.Name()
testDBFilePath = testDBFile.Name()
t.Cleanup(func() { os.Remove(testDBFilePath) })
testDBFile.Close()
testDB, err := sqlx.Open("sqlite3", testDBFilePath)
testDB, err = sqlx.Open("sqlite3", testDBFilePath)
if err != nil {
t.Fatalf("could not open test db sqlite file: %v", err)
}
return testDB, testDBFilePath
}
func NewQueryServiceDBForTests(t *testing.T) *sqlx.DB {
testDB, testDBFilePath := NewTestSqliteDB(t)
// TODO(Raj): This should not require passing in the DB file path
dao.InitDao("sqlite", testDBFilePath)
dashboards.InitDB(testDBFilePath)