diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index 181186d323..25787667fa 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -12,6 +12,7 @@ import ( "go.signoz.io/signoz/ee/query-service/license" "go.signoz.io/signoz/ee/query-service/usage" baseapp "go.signoz.io/signoz/pkg/query-service/app" + "go.signoz.io/signoz/pkg/query-service/app/cloudintegrations" "go.signoz.io/signoz/pkg/query-service/app/integrations" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/cache" @@ -34,6 +35,7 @@ type APIHandlerOptions struct { FeatureFlags baseint.FeatureLookup LicenseManager *license.Manager IntegrationsController *integrations.Controller + CloudIntegrationsController *cloudintegrations.Controller LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController Cache cache.Cache Gateway *httputil.ReverseProxy @@ -62,6 +64,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) { RuleManager: opts.RulesManager, FeatureFlags: opts.FeatureFlags, IntegrationsController: opts.IntegrationsController, + CloudIntegrationsController: opts.CloudIntegrationsController, LogsParsingPipelineController: opts.LogsParsingPipelineController, Cache: opts.Cache, FluxInterval: opts.FluxInterval, diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 1b17ca43d0..a604860b70 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -40,6 +40,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/agentConf" baseapp "go.signoz.io/signoz/pkg/query-service/app" + "go.signoz.io/signoz/pkg/query-service/app/cloudintegrations" "go.signoz.io/signoz/pkg/query-service/app/dashboards" baseexplorer "go.signoz.io/signoz/pkg/query-service/app/explorer" "go.signoz.io/signoz/pkg/query-service/app/integrations" @@ -221,6 +222,13 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { ) } + cloudIntegrationsController, err := cloudintegrations.NewController(localDB) + if err != nil { + return nil, fmt.Errorf( + "couldn't create cloud provider integrations controller: %w", err, + ) + } + // ingestion pipelines manager logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController( localDB, "sqlite", integrationsController.GetPipelinesForInstalledIntegrations, @@ -271,6 +279,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { FeatureFlags: lm, LicenseManager: lm, IntegrationsController: integrationsController, + CloudIntegrationsController: cloudIntegrationsController, LogsParsingPipelineController: logParsingPipelineController, Cache: c, FluxInterval: fluxInterval, @@ -370,6 +379,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web *web.Web) (* apiHandler.RegisterRoutes(r, am) apiHandler.RegisterLogsRoutes(r, am) apiHandler.RegisterIntegrationRoutes(r, am) + apiHandler.RegisterCloudIntegrationsRoutes(r, am) apiHandler.RegisterQueryRangeV3Routes(r, am) apiHandler.RegisterInfraMetricsRoutes(r, am) apiHandler.RegisterQueryRangeV4Routes(r, am) diff --git a/pkg/query-service/app/cloudintegrations/Readme.md b/pkg/query-service/app/cloudintegrations/Readme.md new file mode 100644 index 0000000000..9ac1d5fe4a --- /dev/null +++ b/pkg/query-service/app/cloudintegrations/Readme.md @@ -0,0 +1,5 @@ +# SigNoz Cloud Integrations + +Cloud integrations are unlike the rest of SigNoz integrations. +They have a different UX and so require a different API. +They will also be limited in number and are not expected to have community contributed implementations diff --git a/pkg/query-service/app/cloudintegrations/controller.go b/pkg/query-service/app/cloudintegrations/controller.go new file mode 100644 index 0000000000..9cfe9fb3e1 --- /dev/null +++ b/pkg/query-service/app/cloudintegrations/controller.go @@ -0,0 +1,247 @@ +package cloudintegrations + +import ( + "context" + "fmt" + "slices" + "time" + + "github.com/jmoiron/sqlx" + "go.signoz.io/signoz/pkg/query-service/model" +) + +var SupportedCloudProviders = []string{ + "aws", +} + +func validateCloudProviderName(name string) *model.ApiError { + if !slices.Contains(SupportedCloudProviders, name) { + return model.BadRequest(fmt.Errorf("invalid cloud provider: %s", name)) + } + return nil +} + +type Controller struct { + repo cloudProviderAccountsRepository +} + +func NewController(db *sqlx.DB) ( + *Controller, error, +) { + repo, err := newCloudProviderAccountsRepository(db) + if err != nil { + return nil, fmt.Errorf("couldn't create cloud provider accounts repo: %w", err) + } + + return &Controller{ + repo: repo, + }, nil +} + +type Account struct { + Id string `json:"id"` + CloudAccountId string `json:"cloud_account_id"` + Config AccountConfig `json:"config"` + Status AccountStatus `json:"status"` +} + +type ConnectedAccountsListResponse struct { + Accounts []Account `json:"accounts"` +} + +func (c *Controller) ListConnectedAccounts( + ctx context.Context, cloudProvider string, +) ( + *ConnectedAccountsListResponse, *model.ApiError, +) { + if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil { + return nil, apiErr + } + + accountRecords, apiErr := c.repo.listConnected(ctx, cloudProvider) + if apiErr != nil { + return nil, model.WrapApiError(apiErr, "couldn't list cloud accounts") + } + + connectedAccounts := []Account{} + for _, a := range accountRecords { + connectedAccounts = append(connectedAccounts, a.account()) + } + + return &ConnectedAccountsListResponse{ + Accounts: connectedAccounts, + }, nil +} + +type GenerateConnectionUrlRequest struct { + // Optional. To be specified for updates. + AccountId *string `json:"account_id,omitempty"` + + AccountConfig AccountConfig `json:"account_config"` + + AgentConfig SigNozAgentConfig `json:"agent_config"` +} + +type SigNozAgentConfig struct { + // The region in which SigNoz agent should be installed. + Region string `json:"region"` +} + +type GenerateConnectionUrlResponse struct { + AccountId string `json:"account_id"` + ConnectionUrl string `json:"connection_url"` +} + +func (c *Controller) GenerateConnectionUrl( + ctx context.Context, cloudProvider string, req GenerateConnectionUrlRequest, +) (*GenerateConnectionUrlResponse, *model.ApiError) { + // Account connection with a simple connection URL may not be available for all providers. + if cloudProvider != "aws" { + return nil, model.BadRequest(fmt.Errorf("unsupported cloud provider: %s", cloudProvider)) + } + + account, apiErr := c.repo.upsert( + ctx, cloudProvider, req.AccountId, &req.AccountConfig, nil, nil, nil, + ) + if apiErr != nil { + return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account") + } + + // TODO(Raj): Add actual cloudformation template for AWS integration after it has been shipped. + connectionUrl := fmt.Sprintf( + "https://%s.console.aws.amazon.com/cloudformation/home?region=%s#/stacks/quickcreate?stackName=SigNozIntegration/", + req.AgentConfig.Region, req.AgentConfig.Region, + ) + + return &GenerateConnectionUrlResponse{ + AccountId: account.Id, + ConnectionUrl: connectionUrl, + }, nil +} + +type AccountStatusResponse struct { + Id string `json:"id"` + Status AccountStatus `json:"status"` +} + +func (c *Controller) GetAccountStatus( + ctx context.Context, cloudProvider string, accountId string, +) ( + *AccountStatusResponse, *model.ApiError, +) { + if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil { + return nil, apiErr + } + + account, apiErr := c.repo.get(ctx, cloudProvider, accountId) + if apiErr != nil { + return nil, apiErr + } + + resp := AccountStatusResponse{ + Id: account.Id, + Status: account.status(), + } + + return &resp, nil +} + +type AgentCheckInRequest struct { + AccountId string `json:"account_id"` + CloudAccountId string `json:"cloud_account_id"` + // Arbitrary cloud specific Agent data + Data map[string]any `json:"data,omitempty"` +} + +type AgentCheckInResponse struct { + Account AccountRecord `json:"account"` +} + +func (c *Controller) CheckInAsAgent( + ctx context.Context, cloudProvider string, req AgentCheckInRequest, +) (*AgentCheckInResponse, *model.ApiError) { + if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil { + return nil, apiErr + } + + existingAccount, apiErr := c.repo.get(ctx, cloudProvider, req.AccountId) + if existingAccount != nil && existingAccount.CloudAccountId != nil && *existingAccount.CloudAccountId != req.CloudAccountId { + return nil, model.BadRequest(fmt.Errorf( + "can't check in with new %s account id %s for account %s with existing %s id %s", + cloudProvider, req.CloudAccountId, existingAccount.Id, cloudProvider, *existingAccount.CloudAccountId, + )) + } + + existingAccount, apiErr = c.repo.getConnectedCloudAccount(ctx, cloudProvider, req.CloudAccountId) + if existingAccount != nil && existingAccount.Id != req.AccountId { + return nil, model.BadRequest(fmt.Errorf( + "can't check in to %s account %s with id %s. already connected with id %s", + cloudProvider, req.CloudAccountId, req.AccountId, existingAccount.Id, + )) + } + + agentReport := AgentReport{ + TimestampMillis: time.Now().UnixMilli(), + Data: req.Data, + } + + account, apiErr := c.repo.upsert( + ctx, cloudProvider, &req.AccountId, nil, &req.CloudAccountId, &agentReport, nil, + ) + if apiErr != nil { + return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account") + } + + return &AgentCheckInResponse{ + Account: *account, + }, nil +} + +type UpdateAccountConfigRequest struct { + Config AccountConfig `json:"config"` +} + +func (c *Controller) UpdateAccountConfig( + ctx context.Context, + cloudProvider string, + accountId string, + req UpdateAccountConfigRequest, +) (*Account, *model.ApiError) { + if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil { + return nil, apiErr + } + + accountRecord, apiErr := c.repo.upsert( + ctx, cloudProvider, &accountId, &req.Config, nil, nil, nil, + ) + if apiErr != nil { + return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account") + } + + account := accountRecord.account() + + return &account, nil +} + +func (c *Controller) DisconnectAccount( + ctx context.Context, cloudProvider string, accountId string, +) (*AccountRecord, *model.ApiError) { + if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil { + return nil, apiErr + } + + account, apiErr := c.repo.get(ctx, cloudProvider, accountId) + if apiErr != nil { + return nil, model.WrapApiError(apiErr, "couldn't disconnect account") + } + + tsNow := time.Now() + account, apiErr = c.repo.upsert( + ctx, cloudProvider, &accountId, nil, nil, nil, &tsNow, + ) + if apiErr != nil { + return nil, model.WrapApiError(apiErr, "couldn't disconnect account") + } + + return account, nil +} diff --git a/pkg/query-service/app/cloudintegrations/controller_test.go b/pkg/query-service/app/cloudintegrations/controller_test.go new file mode 100644 index 0000000000..2d85cc1b95 --- /dev/null +++ b/pkg/query-service/app/cloudintegrations/controller_test.go @@ -0,0 +1,153 @@ +package cloudintegrations + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "go.signoz.io/signoz/pkg/query-service/model" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +func TestRegenerateConnectionUrlWithUpdatedConfig(t *testing.T) { + require := require.New(t) + testDB, _ := utils.NewTestSqliteDB(t) + controller, err := NewController(testDB) + require.NoError(err) + + // should be able to generate connection url for + // same account id again with updated config + testAccountConfig1 := AccountConfig{EnabledRegions: []string{"us-east-1", "us-west-1"}} + resp1, apiErr := controller.GenerateConnectionUrl( + context.TODO(), "aws", GenerateConnectionUrlRequest{ + AccountConfig: testAccountConfig1, + AgentConfig: SigNozAgentConfig{Region: "us-east-2"}, + }, + ) + require.Nil(apiErr) + require.NotEmpty(resp1.ConnectionUrl) + require.NotEmpty(resp1.AccountId) + + testAccountId := resp1.AccountId + account, apiErr := controller.repo.get( + context.TODO(), "aws", testAccountId, + ) + require.Nil(apiErr) + require.Equal(testAccountConfig1, *account.Config) + + testAccountConfig2 := AccountConfig{EnabledRegions: []string{"us-east-2", "us-west-2"}} + resp2, apiErr := controller.GenerateConnectionUrl( + context.TODO(), "aws", GenerateConnectionUrlRequest{ + AccountId: &testAccountId, + AccountConfig: testAccountConfig2, + AgentConfig: SigNozAgentConfig{Region: "us-east-2"}, + }, + ) + require.Nil(apiErr) + require.Equal(testAccountId, resp2.AccountId) + + account, apiErr = controller.repo.get( + context.TODO(), "aws", testAccountId, + ) + require.Nil(apiErr) + require.Equal(testAccountConfig2, *account.Config) +} + +func TestAgentCheckIns(t *testing.T) { + require := require.New(t) + testDB, _ := utils.NewTestSqliteDB(t) + controller, err := NewController(testDB) + require.NoError(err) + + // An agent should be able to check in from a cloud account even + // if no connection url was requested (no account with agent's account id exists) + testAccountId1 := uuid.NewString() + testCloudAccountId1 := "546311234" + resp1, apiErr := controller.CheckInAsAgent( + context.TODO(), "aws", AgentCheckInRequest{ + AccountId: testAccountId1, + CloudAccountId: testCloudAccountId1, + }, + ) + require.Nil(apiErr) + require.Equal(testAccountId1, resp1.Account.Id) + require.Equal(testCloudAccountId1, *resp1.Account.CloudAccountId) + + // The agent should not be able to check in with a different + // cloud account id for the same account. + testCloudAccountId2 := "99999999" + _, apiErr = controller.CheckInAsAgent( + context.TODO(), "aws", AgentCheckInRequest{ + AccountId: testAccountId1, + CloudAccountId: testCloudAccountId2, + }, + ) + require.NotNil(apiErr) + + // The agent should not be able to check-in with a particular cloud account id + // if another connected AccountRecord exists for same cloud account + // i.e. there can't be 2 connected account records for the same cloud account id + // at any point in time. + existingConnected, apiErr := controller.repo.getConnectedCloudAccount( + context.TODO(), "aws", testCloudAccountId1, + ) + require.Nil(apiErr) + require.NotNil(existingConnected) + require.Equal(testCloudAccountId1, *existingConnected.CloudAccountId) + require.Nil(existingConnected.RemovedAt) + + testAccountId2 := uuid.NewString() + _, apiErr = controller.CheckInAsAgent( + context.TODO(), "aws", AgentCheckInRequest{ + AccountId: testAccountId2, + CloudAccountId: testCloudAccountId1, + }, + ) + require.NotNil(apiErr) + + // After disconnecting existing account record, the agent should be able to + // connected for a particular cloud account id + _, apiErr = controller.DisconnectAccount( + context.TODO(), "aws", testAccountId1, + ) + + existingConnected, apiErr = controller.repo.getConnectedCloudAccount( + context.TODO(), "aws", testCloudAccountId1, + ) + require.Nil(existingConnected) + require.NotNil(apiErr) + require.Equal(model.ErrorNotFound, apiErr.Type()) + + _, apiErr = controller.CheckInAsAgent( + context.TODO(), "aws", AgentCheckInRequest{ + AccountId: testAccountId2, + CloudAccountId: testCloudAccountId1, + }, + ) + require.Nil(apiErr) + + // should be able to keep checking in + _, apiErr = controller.CheckInAsAgent( + context.TODO(), "aws", AgentCheckInRequest{ + AccountId: testAccountId2, + CloudAccountId: testCloudAccountId1, + }, + ) + require.Nil(apiErr) +} + +func TestCantDisconnectNonExistentAccount(t *testing.T) { + require := require.New(t) + testDB, _ := utils.NewTestSqliteDB(t) + controller, err := NewController(testDB) + require.NoError(err) + + // Attempting to disconnect a non-existent account should return error + account, apiErr := controller.DisconnectAccount( + context.TODO(), "aws", uuid.NewString(), + ) + require.NotNil(apiErr) + require.Equal(model.ErrorNotFound, apiErr.Type()) + require.Nil(account) +} diff --git a/pkg/query-service/app/cloudintegrations/model.go b/pkg/query-service/app/cloudintegrations/model.go new file mode 100644 index 0000000000..53ad4c853e --- /dev/null +++ b/pkg/query-service/app/cloudintegrations/model.go @@ -0,0 +1,117 @@ +package cloudintegrations + +import ( + "database/sql/driver" + "encoding/json" + "fmt" + "time" +) + +// Represents a cloud provider account for cloud integrations +type AccountRecord struct { + CloudProvider string `json:"cloud_provider" db:"cloud_provider"` + Id string `json:"id" db:"id"` + Config *AccountConfig `json:"config" db:"config_json"` + CloudAccountId *string `json:"cloud_account_id" db:"cloud_account_id"` + LastAgentReport *AgentReport `json:"last_agent_report" db:"last_agent_report_json"` + CreatedAt time.Time `json:"created_at" db:"created_at"` + RemovedAt *time.Time `json:"removed_at" db:"removed_at"` +} + +type AccountConfig struct { + EnabledRegions []string `json:"regions"` +} + +func DefaultAccountConfig() AccountConfig { + return AccountConfig{ + EnabledRegions: []string{}, + } +} + +// For serializing from db +func (c *AccountConfig) Scan(src any) error { + data, ok := src.([]byte) + if !ok { + return fmt.Errorf("tried to scan from %T instead of bytes", src) + } + + return json.Unmarshal(data, &c) +} + +// For serializing to db +func (c *AccountConfig) Value() (driver.Value, error) { + if c == nil { + return nil, nil + } + + serialized, err := json.Marshal(c) + if err != nil { + return nil, fmt.Errorf( + "couldn't serialize cloud account config to JSON: %w", err, + ) + } + return serialized, nil +} + +type AgentReport struct { + TimestampMillis int64 `json:"timestamp_millis"` + Data map[string]any `json:"data"` +} + +// For serializing from db +func (r *AgentReport) Scan(src any) error { + data, ok := src.([]byte) + if !ok { + return fmt.Errorf("tried to scan from %T instead of bytes", src) + } + + return json.Unmarshal(data, &r) +} + +// For serializing to db +func (r *AgentReport) Value() (driver.Value, error) { + if r == nil { + return nil, nil + } + + serialized, err := json.Marshal(r) + if err != nil { + return nil, fmt.Errorf( + "couldn't serialize agent report to JSON: %w", err, + ) + } + return serialized, nil +} + +type AccountStatus struct { + Integration AccountIntegrationStatus `json:"integration"` +} + +type AccountIntegrationStatus struct { + LastHeartbeatTsMillis *int64 `json:"last_heartbeat_ts_ms"` +} + +func (a *AccountRecord) status() AccountStatus { + status := AccountStatus{} + if a.LastAgentReport != nil { + lastHeartbeat := a.LastAgentReport.TimestampMillis + status.Integration.LastHeartbeatTsMillis = &lastHeartbeat + } + return status +} + +func (a *AccountRecord) account() Account { + ca := Account{Id: a.Id, Status: a.status()} + + if a.CloudAccountId != nil { + ca.CloudAccountId = *a.CloudAccountId + } + + if a.Config != nil { + ca.Config = *a.Config + } else { + ca.Config = DefaultAccountConfig() + } + + return ca +} diff --git a/pkg/query-service/app/cloudintegrations/repo.go b/pkg/query-service/app/cloudintegrations/repo.go new file mode 100644 index 0000000000..e5c4e4cc94 --- /dev/null +++ b/pkg/query-service/app/cloudintegrations/repo.go @@ -0,0 +1,270 @@ +package cloudintegrations + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/google/uuid" + "github.com/jmoiron/sqlx" + "go.signoz.io/signoz/pkg/query-service/model" +) + +type cloudProviderAccountsRepository interface { + listConnected(ctx context.Context, cloudProvider string) ([]AccountRecord, *model.ApiError) + + get(ctx context.Context, cloudProvider string, id string) (*AccountRecord, *model.ApiError) + + getConnectedCloudAccount( + ctx context.Context, cloudProvider string, cloudAccountId string, + ) (*AccountRecord, *model.ApiError) + + // Insert an account or update it by (cloudProvider, id) + // for specified non-empty fields + upsert( + ctx context.Context, + cloudProvider string, + id *string, + config *AccountConfig, + cloudAccountId *string, + agentReport *AgentReport, + removedAt *time.Time, + ) (*AccountRecord, *model.ApiError) +} + +func newCloudProviderAccountsRepository(db *sqlx.DB) ( + *cloudProviderAccountsSQLRepository, error, +) { + if err := InitSqliteDBIfNeeded(db); err != nil { + return nil, fmt.Errorf("could not init sqlite DB for cloudintegrations: %w", err) + } + + return &cloudProviderAccountsSQLRepository{ + db: db, + }, nil +} + +func InitSqliteDBIfNeeded(db *sqlx.DB) error { + if db == nil { + return fmt.Errorf("db is required") + } + + createTablesStatements := ` + CREATE TABLE IF NOT EXISTS cloud_integrations_accounts( + cloud_provider TEXT NOT NULL, + id TEXT NOT NULL, + config_json TEXT, + cloud_account_id TEXT, + last_agent_report_json TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, + removed_at TIMESTAMP, + UNIQUE(cloud_provider, id) + ) + ` + _, err := db.Exec(createTablesStatements) + if err != nil { + return fmt.Errorf( + "could not ensure cloud provider integrations schema in sqlite DB: %w", err, + ) + } + + return nil +} + +type cloudProviderAccountsSQLRepository struct { + db *sqlx.DB +} + +func (r *cloudProviderAccountsSQLRepository) listConnected( + ctx context.Context, cloudProvider string, +) ([]AccountRecord, *model.ApiError) { + accounts := []AccountRecord{} + + err := r.db.SelectContext( + ctx, &accounts, ` + select + cloud_provider, + id, + config_json, + cloud_account_id, + last_agent_report_json, + created_at, + removed_at + from cloud_integrations_accounts + where + cloud_provider=$1 + and removed_at is NULL + and cloud_account_id is not NULL + and last_agent_report_json is not NULL + order by created_at + `, cloudProvider, + ) + if err != nil { + return nil, model.InternalError(fmt.Errorf( + "could not query connected cloud accounts: %w", err, + )) + } + + return accounts, nil +} + +func (r *cloudProviderAccountsSQLRepository) get( + ctx context.Context, cloudProvider string, id string, +) (*AccountRecord, *model.ApiError) { + var result AccountRecord + + err := r.db.GetContext( + ctx, &result, ` + select + cloud_provider, + id, + config_json, + cloud_account_id, + last_agent_report_json, + created_at, + removed_at + from cloud_integrations_accounts + where + cloud_provider=$1 + and id=$2 + `, + cloudProvider, id, + ) + + if err == sql.ErrNoRows { + return nil, model.NotFoundError(fmt.Errorf( + "couldn't find account with Id %s", id, + )) + } else if err != nil { + return nil, model.InternalError(fmt.Errorf( + "couldn't query cloud provider accounts: %w", err, + )) + } + + return &result, nil +} + +func (r *cloudProviderAccountsSQLRepository) getConnectedCloudAccount( + ctx context.Context, cloudProvider string, cloudAccountId string, +) (*AccountRecord, *model.ApiError) { + var result AccountRecord + + err := r.db.GetContext( + ctx, &result, ` + select + cloud_provider, + id, + config_json, + cloud_account_id, + last_agent_report_json, + created_at, + removed_at + from cloud_integrations_accounts + where + cloud_provider=$1 + and cloud_account_id=$2 + and last_agent_report_json is not NULL + and removed_at is NULL + `, + cloudProvider, cloudAccountId, + ) + + if err == sql.ErrNoRows { + return nil, model.NotFoundError(fmt.Errorf( + "couldn't find connected cloud account %s", cloudAccountId, + )) + } else if err != nil { + return nil, model.InternalError(fmt.Errorf( + "couldn't query cloud provider accounts: %w", err, + )) + } + + return &result, nil +} + +func (r *cloudProviderAccountsSQLRepository) upsert( + ctx context.Context, + cloudProvider string, + id *string, + config *AccountConfig, + cloudAccountId *string, + agentReport *AgentReport, + removedAt *time.Time, +) (*AccountRecord, *model.ApiError) { + // Insert + if id == nil { + newId := uuid.NewString() + id = &newId + } + + // Prepare clause for setting values in `on conflict do update` + onConflictSetStmts := []string{} + setColStatement := func(col string) string { + return fmt.Sprintf("%s=excluded.%s", col, col) + } + + if config != nil { + onConflictSetStmts = append( + onConflictSetStmts, setColStatement("config_json"), + ) + } + + if cloudAccountId != nil { + onConflictSetStmts = append( + onConflictSetStmts, setColStatement("cloud_account_id"), + ) + } + + if agentReport != nil { + onConflictSetStmts = append( + onConflictSetStmts, setColStatement("last_agent_report_json"), + ) + } + + if removedAt != nil { + onConflictSetStmts = append( + onConflictSetStmts, setColStatement("removed_at"), + ) + } + + onConflictClause := "" + if len(onConflictSetStmts) > 0 { + onConflictClause = fmt.Sprintf( + "on conflict(cloud_provider, id) do update SET\n%s", + strings.Join(onConflictSetStmts, ",\n"), + ) + } + + insertQuery := fmt.Sprintf(` + INSERT INTO cloud_integrations_accounts ( + cloud_provider, + id, + config_json, + cloud_account_id, + last_agent_report_json, + removed_at + ) values ($1, $2, $3, $4, $5, $6) + %s`, onConflictClause, + ) + + _, dbErr := r.db.ExecContext( + ctx, insertQuery, + cloudProvider, id, config, cloudAccountId, agentReport, removedAt, + ) + if dbErr != nil { + return nil, model.InternalError(fmt.Errorf( + "could not upsert cloud account record: %w", dbErr, + )) + } + + upsertedAccount, apiErr := r.get(ctx, cloudProvider, *id) + if apiErr != nil { + return nil, model.InternalError(fmt.Errorf( + "couldn't fetch upserted account by id: %w", apiErr.ToError(), + )) + } + + return upsertedAccount, nil +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 505464a6ee..cf8724f78f 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/prometheus/promql" "go.signoz.io/signoz/pkg/query-service/agentConf" + "go.signoz.io/signoz/pkg/query-service/app/cloudintegrations" "go.signoz.io/signoz/pkg/query-service/app/dashboards" "go.signoz.io/signoz/pkg/query-service/app/explorer" "go.signoz.io/signoz/pkg/query-service/app/inframetrics" @@ -102,6 +103,8 @@ type APIHandler struct { IntegrationsController *integrations.Controller + CloudIntegrationsController *cloudintegrations.Controller + LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController // SetupCompleted indicates if SigNoz is ready for general use. @@ -155,6 +158,9 @@ type APIHandlerOpts struct { // Integrations IntegrationsController *integrations.Controller + // Cloud Provider Integrations + CloudIntegrationsController *cloudintegrations.Controller + // Log parsing pipelines LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController @@ -226,6 +232,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { ruleManager: opts.RuleManager, featureFlags: opts.FeatureFlags, IntegrationsController: opts.IntegrationsController, + CloudIntegrationsController: opts.CloudIntegrationsController, LogsParsingPipelineController: opts.LogsParsingPipelineController, querier: querier, querierV2: querierv2, @@ -3867,6 +3874,157 @@ func (aH *APIHandler) UninstallIntegration( aH.Respond(w, map[string]interface{}{}) } +// cloud provider integrations +func (aH *APIHandler) RegisterCloudIntegrationsRoutes(router *mux.Router, am *AuthMiddleware) { + subRouter := router.PathPrefix("/api/v1/cloud-integrations").Subrouter() + + subRouter.HandleFunc( + "/{cloudProvider}/accounts/generate-connection-url", am.EditAccess(aH.CloudIntegrationsGenerateConnectionUrl), + ).Methods(http.MethodPost) + + subRouter.HandleFunc( + "/{cloudProvider}/accounts", am.ViewAccess(aH.CloudIntegrationsListConnectedAccounts), + ).Methods(http.MethodGet) + + subRouter.HandleFunc( + "/{cloudProvider}/accounts/{accountId}/status", am.ViewAccess(aH.CloudIntegrationsGetAccountStatus), + ).Methods(http.MethodGet) + + subRouter.HandleFunc( + "/{cloudProvider}/accounts/{accountId}/config", am.EditAccess(aH.CloudIntegrationsUpdateAccountConfig), + ).Methods(http.MethodPost) + + subRouter.HandleFunc( + "/{cloudProvider}/accounts/{accountId}/disconnect", am.EditAccess(aH.CloudIntegrationsDisconnectAccount), + ).Methods(http.MethodPost) + + subRouter.HandleFunc( + "/{cloudProvider}/agent-check-in", am.EditAccess(aH.CloudIntegrationsAgentCheckIn), + ).Methods(http.MethodPost) + +} + +func (aH *APIHandler) CloudIntegrationsListConnectedAccounts( + w http.ResponseWriter, r *http.Request, +) { + cloudProvider := mux.Vars(r)["cloudProvider"] + + resp, apiErr := aH.CloudIntegrationsController.ListConnectedAccounts( + r.Context(), cloudProvider, + ) + + if apiErr != nil { + RespondError(w, apiErr, nil) + return + } + aH.Respond(w, resp) +} + +func (aH *APIHandler) CloudIntegrationsGenerateConnectionUrl( + w http.ResponseWriter, r *http.Request, +) { + cloudProvider := mux.Vars(r)["cloudProvider"] + + req := cloudintegrations.GenerateConnectionUrlRequest{} + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + RespondError(w, model.BadRequest(err), nil) + return + } + + result, apiErr := aH.CloudIntegrationsController.GenerateConnectionUrl( + r.Context(), cloudProvider, req, + ) + + if apiErr != nil { + RespondError(w, apiErr, nil) + return + } + + aH.Respond(w, result) +} + +func (aH *APIHandler) CloudIntegrationsGetAccountStatus( + w http.ResponseWriter, r *http.Request, +) { + cloudProvider := mux.Vars(r)["cloudProvider"] + accountId := mux.Vars(r)["accountId"] + + resp, apiErr := aH.CloudIntegrationsController.GetAccountStatus( + r.Context(), cloudProvider, accountId, + ) + + if apiErr != nil { + RespondError(w, apiErr, nil) + return + } + aH.Respond(w, resp) +} + +func (aH *APIHandler) CloudIntegrationsAgentCheckIn( + w http.ResponseWriter, r *http.Request, +) { + cloudProvider := mux.Vars(r)["cloudProvider"] + + req := cloudintegrations.AgentCheckInRequest{} + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + RespondError(w, model.BadRequest(err), nil) + return + } + + result, apiErr := aH.CloudIntegrationsController.CheckInAsAgent( + r.Context(), cloudProvider, req, + ) + + if apiErr != nil { + RespondError(w, apiErr, nil) + return + } + + aH.Respond(w, result) +} + +func (aH *APIHandler) CloudIntegrationsUpdateAccountConfig( + w http.ResponseWriter, r *http.Request, +) { + cloudProvider := mux.Vars(r)["cloudProvider"] + accountId := mux.Vars(r)["accountId"] + + req := cloudintegrations.UpdateAccountConfigRequest{} + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + RespondError(w, model.BadRequest(err), nil) + return + } + + result, apiErr := aH.CloudIntegrationsController.UpdateAccountConfig( + r.Context(), cloudProvider, accountId, req, + ) + + if apiErr != nil { + RespondError(w, apiErr, nil) + return + } + + aH.Respond(w, result) +} + +func (aH *APIHandler) CloudIntegrationsDisconnectAccount( + w http.ResponseWriter, r *http.Request, +) { + cloudProvider := mux.Vars(r)["cloudProvider"] + accountId := mux.Vars(r)["accountId"] + + result, apiErr := aH.CloudIntegrationsController.DisconnectAccount( + r.Context(), cloudProvider, accountId, + ) + + if apiErr != nil { + RespondError(w, apiErr, nil) + return + } + + aH.Respond(w, result) +} + // logs func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router, am *AuthMiddleware) { subRouter := router.PathPrefix("/api/v1/logs").Subrouter() diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 20d183876c..25ea1e8c93 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -24,6 +24,7 @@ import ( "github.com/soheilhy/cmux" "go.signoz.io/signoz/pkg/query-service/agentConf" "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader" + "go.signoz.io/signoz/pkg/query-service/app/cloudintegrations" "go.signoz.io/signoz/pkg/query-service/app/dashboards" "go.signoz.io/signoz/pkg/query-service/app/integrations" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" @@ -181,6 +182,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, fmt.Errorf("couldn't create integrations controller: %w", err) } + cloudIntegrationsController, err := cloudintegrations.NewController(localDB) + if err != nil { + return nil, fmt.Errorf("couldn't create cloud provider integrations controller: %w", err) + } + logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController( localDB, "sqlite", integrationsController.GetPipelinesForInstalledIntegrations, ) @@ -200,6 +206,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { RuleManager: rm, FeatureFlags: fm, IntegrationsController: integrationsController, + CloudIntegrationsController: cloudIntegrationsController, LogsParsingPipelineController: logParsingPipelineController, Cache: c, FluxInterval: fluxInterval, @@ -310,6 +317,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) { api.RegisterRoutes(r, am) api.RegisterLogsRoutes(r, am) api.RegisterIntegrationRoutes(r, am) + api.RegisterCloudIntegrationsRoutes(r, am) api.RegisterQueryRangeV3Routes(r, am) api.RegisterInfraMetricsRoutes(r, am) api.RegisterWebSocketPaths(r, am) diff --git a/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go b/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go new file mode 100644 index 0000000000..9f199b5b06 --- /dev/null +++ b/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go @@ -0,0 +1,299 @@ +package tests + +import ( + "encoding/json" + "fmt" + "net/http" + "testing" + "time" + + "github.com/jmoiron/sqlx" + mockhouse "github.com/srikanthccv/ClickHouse-go-mock" + "github.com/stretchr/testify/require" + "go.signoz.io/signoz/pkg/query-service/app" + "go.signoz.io/signoz/pkg/query-service/app/cloudintegrations" + "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/dao" + "go.signoz.io/signoz/pkg/query-service/featureManager" + "go.signoz.io/signoz/pkg/query-service/model" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +func TestAWSIntegrationLifecycle(t *testing.T) { + // Test for happy path of connecting and managing AWS integration accounts + + t0 := time.Now() + require := require.New(t) + testbed := NewCloudIntegrationsTestBed(t, nil) + + accountsListResp := testbed.GetConnectedAccountsListFromQS("aws") + require.Equal(len(accountsListResp.Accounts), 0, + "No accounts should be connected at the beginning", + ) + + // Should be able to generate a connection url from UI - initializing an integration account + testAccountConfig := cloudintegrations.AccountConfig{ + EnabledRegions: []string{"us-east-1", "us-east-2"}, + } + connectionUrlResp := testbed.GenerateConnectionUrlFromQS( + "aws", cloudintegrations.GenerateConnectionUrlRequest{ + AgentConfig: cloudintegrations.SigNozAgentConfig{ + Region: "us-east-1", + }, + AccountConfig: testAccountConfig, + }) + testAccountId := connectionUrlResp.AccountId + require.NotEmpty(testAccountId) + connectionUrl := connectionUrlResp.ConnectionUrl + require.NotEmpty(connectionUrl) + + // Should be able to poll for account connection status from the UI + accountStatusResp := testbed.GetAccountStatusFromQS("aws", testAccountId) + require.Equal(testAccountId, accountStatusResp.Id) + require.Nil(accountStatusResp.Status.Integration.LastHeartbeatTsMillis) + + // The unconnected account should not show up in connected accounts list yet + accountsListResp1 := testbed.GetConnectedAccountsListFromQS("aws") + require.Equal(0, len(accountsListResp1.Accounts)) + + // An agent installed in user's AWS account should be able to check in for the new integration account + tsMillisBeforeAgentCheckIn := time.Now().UnixMilli() + testAWSAccountId := "4563215233" + agentCheckInResp := testbed.CheckInAsAgentWithQS( + "aws", cloudintegrations.AgentCheckInRequest{ + AccountId: testAccountId, + CloudAccountId: testAWSAccountId, + }, + ) + require.Equal(testAccountId, agentCheckInResp.Account.Id) + require.Equal(testAccountConfig, *agentCheckInResp.Account.Config) + require.Equal(testAWSAccountId, *agentCheckInResp.Account.CloudAccountId) + require.LessOrEqual(t0.Unix(), agentCheckInResp.Account.CreatedAt.Unix()) + require.Nil(agentCheckInResp.Account.RemovedAt) + + // Polling for connection status from UI should now return latest status + accountStatusResp1 := testbed.GetAccountStatusFromQS("aws", testAccountId) + require.Equal(testAccountId, accountStatusResp1.Id) + require.NotNil(accountStatusResp1.Status.Integration.LastHeartbeatTsMillis) + require.LessOrEqual( + tsMillisBeforeAgentCheckIn, + *accountStatusResp1.Status.Integration.LastHeartbeatTsMillis, + ) + + // The account should now show up in list of connected accounts. + accountsListResp2 := testbed.GetConnectedAccountsListFromQS("aws") + require.Equal(len(accountsListResp2.Accounts), 1) + require.Equal(testAccountId, accountsListResp2.Accounts[0].Id) + require.Equal(testAWSAccountId, accountsListResp2.Accounts[0].CloudAccountId) + + // Should be able to update account config from UI + testAccountConfig2 := cloudintegrations.AccountConfig{ + EnabledRegions: []string{"us-east-2", "us-west-1"}, + } + latestAccount := testbed.UpdateAccountConfigWithQS( + "aws", testAccountId, testAccountConfig2, + ) + require.Equal(testAccountId, latestAccount.Id) + require.Equal(testAccountConfig2, *latestAccount.Config) + + // The agent should now receive latest account config. + agentCheckInResp1 := testbed.CheckInAsAgentWithQS( + "aws", cloudintegrations.AgentCheckInRequest{ + AccountId: testAccountId, + CloudAccountId: testAWSAccountId, + }, + ) + require.Equal(testAccountId, agentCheckInResp1.Account.Id) + require.Equal(testAccountConfig2, *agentCheckInResp1.Account.Config) + require.Equal(testAWSAccountId, *agentCheckInResp1.Account.CloudAccountId) + require.Nil(agentCheckInResp1.Account.RemovedAt) + + // Should be able to disconnect/remove account from UI. + tsBeforeDisconnect := time.Now() + latestAccount = testbed.DisconnectAccountWithQS("aws", testAccountId) + require.Equal(testAccountId, latestAccount.Id) + require.LessOrEqual(tsBeforeDisconnect, *latestAccount.RemovedAt) + + // The agent should receive the disconnected status in account config post disconnection + agentCheckInResp2 := testbed.CheckInAsAgentWithQS( + "aws", cloudintegrations.AgentCheckInRequest{ + AccountId: testAccountId, + CloudAccountId: testAWSAccountId, + }, + ) + require.Equal(testAccountId, agentCheckInResp2.Account.Id) + require.Equal(testAWSAccountId, *agentCheckInResp2.Account.CloudAccountId) + require.LessOrEqual(tsBeforeDisconnect, *agentCheckInResp2.Account.RemovedAt) +} + +type CloudIntegrationsTestBed struct { + t *testing.T + testUser *model.User + qsHttpHandler http.Handler + mockClickhouse mockhouse.ClickConnMockCommon +} + +// testDB can be injected for sharing a DB across multiple integration testbeds. +func NewCloudIntegrationsTestBed(t *testing.T, testDB *sqlx.DB) *CloudIntegrationsTestBed { + if testDB == nil { + testDB = utils.NewQueryServiceDBForTests(t) + } + + controller, err := cloudintegrations.NewController(testDB) + if err != nil { + t.Fatalf("could not create cloud integrations controller: %v", err) + } + + fm := featureManager.StartManager() + apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{ + AppDao: dao.DB(), + CloudIntegrationsController: controller, + FeatureFlags: fm, + }) + if err != nil { + t.Fatalf("could not create a new ApiHandler: %v", err) + } + + router := app.NewRouter() + am := app.NewAuthMiddleware(auth.GetUserFromRequest) + apiHandler.RegisterRoutes(router, am) + apiHandler.RegisterCloudIntegrationsRoutes(router, am) + + user, apiErr := createTestUser() + if apiErr != nil { + t.Fatalf("could not create a test user: %v", apiErr) + } + + return &CloudIntegrationsTestBed{ + t: t, + testUser: user, + qsHttpHandler: router, + } +} + +func (tb *CloudIntegrationsTestBed) GetConnectedAccountsListFromQS( + cloudProvider string, +) *cloudintegrations.ConnectedAccountsListResponse { + respDataJson := tb.RequestQS(fmt.Sprintf("/api/v1/cloud-integrations/%s/accounts", cloudProvider), nil) + + var resp cloudintegrations.ConnectedAccountsListResponse + err := json.Unmarshal(respDataJson, &resp) + if err != nil { + tb.t.Fatalf("could not unmarshal apiResponse.Data json into AccountsListResponse") + } + + return &resp +} + +func (tb *CloudIntegrationsTestBed) GenerateConnectionUrlFromQS( + cloudProvider string, req cloudintegrations.GenerateConnectionUrlRequest, +) *cloudintegrations.GenerateConnectionUrlResponse { + respDataJson := tb.RequestQS( + fmt.Sprintf("/api/v1/cloud-integrations/%s/accounts/generate-connection-url", cloudProvider), + req, + ) + + var resp cloudintegrations.GenerateConnectionUrlResponse + err := json.Unmarshal(respDataJson, &resp) + if err != nil { + tb.t.Fatalf("could not unmarshal apiResponse.Data json into map[string]any") + } + + return &resp +} + +func (tb *CloudIntegrationsTestBed) GetAccountStatusFromQS( + cloudProvider string, accountId string, +) *cloudintegrations.AccountStatusResponse { + respDataJson := tb.RequestQS(fmt.Sprintf( + "/api/v1/cloud-integrations/%s/accounts/%s/status", + cloudProvider, accountId, + ), nil) + + var resp cloudintegrations.AccountStatusResponse + err := json.Unmarshal(respDataJson, &resp) + if err != nil { + tb.t.Fatalf("could not unmarshal apiResponse.Data json into AccountStatusResponse") + } + + return &resp +} + +func (tb *CloudIntegrationsTestBed) CheckInAsAgentWithQS( + cloudProvider string, req cloudintegrations.AgentCheckInRequest, +) *cloudintegrations.AgentCheckInResponse { + respDataJson := tb.RequestQS( + fmt.Sprintf("/api/v1/cloud-integrations/%s/agent-check-in", cloudProvider), req, + ) + + var resp cloudintegrations.AgentCheckInResponse + err := json.Unmarshal(respDataJson, &resp) + if err != nil { + tb.t.Fatalf("could not unmarshal apiResponse.Data json into AgentCheckInResponse") + } + + return &resp +} + +func (tb *CloudIntegrationsTestBed) UpdateAccountConfigWithQS( + cloudProvider string, accountId string, newConfig cloudintegrations.AccountConfig, +) *cloudintegrations.AccountRecord { + respDataJson := tb.RequestQS( + fmt.Sprintf( + "/api/v1/cloud-integrations/%s/accounts/%s/config", + cloudProvider, accountId, + ), cloudintegrations.UpdateAccountConfigRequest{ + Config: newConfig, + }, + ) + + var resp cloudintegrations.AccountRecord + err := json.Unmarshal(respDataJson, &resp) + if err != nil { + tb.t.Fatalf("could not unmarshal apiResponse.Data json into Account") + } + + return &resp +} + +func (tb *CloudIntegrationsTestBed) DisconnectAccountWithQS( + cloudProvider string, accountId string, +) *cloudintegrations.AccountRecord { + respDataJson := tb.RequestQS( + fmt.Sprintf( + "/api/v1/cloud-integrations/%s/accounts/%s/disconnect", + cloudProvider, accountId, + ), map[string]any{}, + ) + + var resp cloudintegrations.AccountRecord + err := json.Unmarshal(respDataJson, &resp) + if err != nil { + tb.t.Fatalf("could not unmarshal apiResponse.Data json into Account") + } + + return &resp +} + +func (tb *CloudIntegrationsTestBed) RequestQS( + path string, + postData interface{}, +) (responseDataJson []byte) { + req, err := AuthenticatedRequestForTest( + tb.testUser, path, postData, + ) + if err != nil { + tb.t.Fatalf("couldn't create authenticated test request: %v", err) + } + + result, err := HandleTestRequest(tb.qsHttpHandler, req, 200) + if err != nil { + tb.t.Fatalf("test request failed: %v", err) + } + + dataJson, err := json.Marshal(result.Data) + if err != nil { + tb.t.Fatalf("could not marshal apiResponse.Data: %v", err) + } + return dataJson +} diff --git a/pkg/query-service/utils/testutils.go b/pkg/query-service/utils/testutils.go index d8989d9323..69c7a9bf24 100644 --- a/pkg/query-service/utils/testutils.go +++ b/pkg/query-service/utils/testutils.go @@ -5,24 +5,31 @@ import ( "testing" "github.com/jmoiron/sqlx" + _ "github.com/mattn/go-sqlite3" "go.signoz.io/signoz/pkg/query-service/app/dashboards" "go.signoz.io/signoz/pkg/query-service/dao" ) -func NewQueryServiceDBForTests(t *testing.T) *sqlx.DB { +func NewTestSqliteDB(t *testing.T) (testDB *sqlx.DB, testDBFilePath string) { testDBFile, err := os.CreateTemp("", "test-signoz-db-*") if err != nil { t.Fatalf("could not create temp file for test db: %v", err) } - testDBFilePath := testDBFile.Name() + testDBFilePath = testDBFile.Name() t.Cleanup(func() { os.Remove(testDBFilePath) }) testDBFile.Close() - testDB, err := sqlx.Open("sqlite3", testDBFilePath) + testDB, err = sqlx.Open("sqlite3", testDBFilePath) if err != nil { t.Fatalf("could not open test db sqlite file: %v", err) } + return testDB, testDBFilePath +} + +func NewQueryServiceDBForTests(t *testing.T) *sqlx.DB { + testDB, testDBFilePath := NewTestSqliteDB(t) + // TODO(Raj): This should not require passing in the DB file path dao.InitDao("sqlite", testDBFilePath) dashboards.InitDB(testDBFilePath)