diff --git a/ee/query-service/app/api/cloudIntegrations.go b/ee/query-service/app/api/cloudIntegrations.go index 41e0d58f85..90714beff3 100644 --- a/ee/query-service/app/api/cloudIntegrations.go +++ b/ee/query-service/app/api/cloudIntegrations.go @@ -153,9 +153,11 @@ func (ah *APIHandler) getOrCreateCloudIntegrationPAT(ctx context.Context, orgId func (ah *APIHandler) getOrCreateCloudIntegrationUser( ctx context.Context, orgId string, cloudProvider string, ) (*types.User, *basemodel.ApiError) { - cloudIntegrationUserId := fmt.Sprintf("%s-integration", cloudProvider) + cloudIntegrationUser := fmt.Sprintf("%s-integration", cloudProvider) + email := fmt.Sprintf("%s@signoz.io", cloudIntegrationUser) - integrationUserResult, apiErr := ah.AppDao().GetUser(ctx, cloudIntegrationUserId) + // TODO(nitya): there should be orgId here + integrationUserResult, apiErr := ah.AppDao().GetUserByEmail(ctx, email) if apiErr != nil { return nil, basemodel.WrapApiError(apiErr, "couldn't look for integration user") } @@ -170,9 +172,9 @@ func (ah *APIHandler) getOrCreateCloudIntegrationUser( ) newUser := &types.User{ - ID: cloudIntegrationUserId, - Name: fmt.Sprintf("%s integration", cloudProvider), - Email: fmt.Sprintf("%s@signoz.io", cloudIntegrationUserId), + ID: uuid.New().String(), + Name: cloudIntegrationUser, + Email: email, TimeAuditable: types.TimeAuditable{ CreatedAt: time.Now(), }, diff --git a/ee/query-service/app/api/pat.go b/ee/query-service/app/api/pat.go index 95a4ae0788..b852a3be4e 100644 --- a/ee/query-service/app/api/pat.go +++ b/ee/query-service/app/api/pat.go @@ -5,16 +5,18 @@ import ( "encoding/json" "fmt" "net/http" + "slices" "time" "github.com/SigNoz/signoz/ee/query-service/model" - "github.com/SigNoz/signoz/ee/types" eeTypes "github.com/SigNoz/signoz/ee/types" "github.com/SigNoz/signoz/pkg/errors" + errorsV2 "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/http/render" "github.com/SigNoz/signoz/pkg/query-service/auth" baseconstants "github.com/SigNoz/signoz/pkg/query-service/constants" basemodel "github.com/SigNoz/signoz/pkg/query-service/model" + "github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/valuer" "github.com/gorilla/mux" "go.uber.org/zap" @@ -58,7 +60,7 @@ func (ah *APIHandler) createPAT(w http.ResponseWriter, r *http.Request) { ah.Respond(w, &pat) } -func validatePATRequest(req types.GettablePAT) error { +func validatePATRequest(req eeTypes.GettablePAT) error { if req.Role == "" || (req.Role != baseconstants.ViewerGroup && req.Role != baseconstants.EditorGroup && req.Role != baseconstants.AdminGroup) { return fmt.Errorf("valid role is required") } @@ -74,12 +76,19 @@ func validatePATRequest(req types.GettablePAT) error { func (ah *APIHandler) updatePAT(w http.ResponseWriter, r *http.Request) { ctx := context.Background() - req := types.GettablePAT{} + req := eeTypes.GettablePAT{} if err := json.NewDecoder(r.Body).Decode(&req); err != nil { RespondError(w, model.BadRequest(err), nil) return } + idStr := mux.Vars(r)["id"] + id, err := valuer.NewUUID(idStr) + if err != nil { + render.Error(w, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid uuid-v7")) + return + } + user, err := auth.GetUserFromReqContext(r.Context()) if err != nil { RespondError(w, &model.ApiError{ @@ -89,6 +98,25 @@ func (ah *APIHandler) updatePAT(w http.ResponseWriter, r *http.Request) { return } + //get the pat + existingPAT, paterr := ah.AppDao().GetPATByID(ctx, user.OrgID, id) + if paterr != nil { + render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, paterr.Error())) + return + } + + // get the user + createdByUser, usererr := ah.AppDao().GetUser(ctx, existingPAT.UserID) + if usererr != nil { + render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, usererr.Error())) + return + } + + if slices.Contains(types.AllIntegrationUserEmails, types.IntegrationUserEmail(createdByUser.Email)) { + render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, "integration user pat cannot be updated")) + return + } + err = validatePATRequest(req) if err != nil { RespondError(w, model.BadRequest(err), nil) @@ -96,12 +124,6 @@ func (ah *APIHandler) updatePAT(w http.ResponseWriter, r *http.Request) { } req.UpdatedByUserID = user.ID - idStr := mux.Vars(r)["id"] - id, err := valuer.NewUUID(idStr) - if err != nil { - render.Error(w, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid uuid-v7")) - return - } req.UpdatedAt = time.Now() zap.L().Info("Got Update PAT request", zap.Any("pat", req)) var apierr basemodel.BaseApiError @@ -149,6 +171,25 @@ func (ah *APIHandler) revokePAT(w http.ResponseWriter, r *http.Request) { return } + //get the pat + existingPAT, paterr := ah.AppDao().GetPATByID(ctx, user.OrgID, id) + if paterr != nil { + render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, paterr.Error())) + return + } + + // get the user + createdByUser, usererr := ah.AppDao().GetUser(ctx, existingPAT.UserID) + if usererr != nil { + render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, usererr.Error())) + return + } + + if slices.Contains(types.AllIntegrationUserEmails, types.IntegrationUserEmail(createdByUser.Email)) { + render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, "integration user pat cannot be updated")) + return + } + zap.L().Info("Revoke PAT with id", zap.String("id", id.StringValue())) if apierr := ah.AppDao().RevokePAT(ctx, user.OrgID, id, user.ID); apierr != nil { RespondError(w, apierr, nil) diff --git a/ee/query-service/dao/interface.go b/ee/query-service/dao/interface.go index 6c5bc1e612..32e2513245 100644 --- a/ee/query-service/dao/interface.go +++ b/ee/query-service/dao/interface.go @@ -8,7 +8,6 @@ import ( basedao "github.com/SigNoz/signoz/pkg/query-service/dao" baseint "github.com/SigNoz/signoz/pkg/query-service/interfaces" basemodel "github.com/SigNoz/signoz/pkg/query-service/model" - ossTypes "github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/valuer" "github.com/google/uuid" @@ -40,7 +39,6 @@ type ModelDao interface { UpdatePAT(ctx context.Context, orgID string, p types.GettablePAT, id valuer.UUID) basemodel.BaseApiError GetPAT(ctx context.Context, pat string) (*types.GettablePAT, basemodel.BaseApiError) GetPATByID(ctx context.Context, orgID string, id valuer.UUID) (*types.GettablePAT, basemodel.BaseApiError) - GetUserByPAT(ctx context.Context, orgID string, token string) (*ossTypes.GettableUser, basemodel.BaseApiError) ListPATs(ctx context.Context, orgID string) ([]types.GettablePAT, basemodel.BaseApiError) RevokePAT(ctx context.Context, orgID string, id valuer.UUID, userID string) basemodel.BaseApiError } diff --git a/ee/query-service/dao/sqlite/auth.go b/ee/query-service/dao/sqlite/auth.go index 0e8115d04a..a651fabfea 100644 --- a/ee/query-service/dao/sqlite/auth.go +++ b/ee/query-service/dao/sqlite/auth.go @@ -43,7 +43,7 @@ func (m *modelDao) createUserForSAMLRequest(ctx context.Context, email string) ( } user := &types.User{ - ID: uuid.NewString(), + ID: uuid.New().String(), Name: "", Email: email, Password: hash, diff --git a/ee/query-service/dao/sqlite/pat.go b/ee/query-service/dao/sqlite/pat.go index be51b716f5..f60f05d14f 100644 --- a/ee/query-service/dao/sqlite/pat.go +++ b/ee/query-service/dao/sqlite/pat.go @@ -196,27 +196,3 @@ func (m *modelDao) GetPATByID(ctx context.Context, orgID string, id valuer.UUID) return &patWithUser, nil } - -// deprecated -func (m *modelDao) GetUserByPAT(ctx context.Context, orgID string, token string) (*ossTypes.GettableUser, basemodel.BaseApiError) { - users := []ossTypes.GettableUser{} - - if err := m.DB().NewSelect(). - Model(&users). - Column("u.id", "u.name", "u.email", "u.password", "u.created_at", "u.profile_picture_url", "u.org_id", "u.group_id"). - Join("JOIN personal_access_tokens p ON u.id = p.user_id"). - Where("p.token = ?", token). - Where("p.expires_at >= strftime('%s', 'now')"). - Where("p.org_id = ?", orgID). - Scan(ctx); err != nil { - return nil, model.InternalError(fmt.Errorf("failed to fetch user from PAT, err: %v", err)) - } - - if len(users) != 1 { - return nil, &model.ApiError{ - Typ: model.ErrorInternal, - Err: fmt.Errorf("found zero or multiple users with same PAT token"), - } - } - return &users[0], nil -} diff --git a/ee/sqlstore/postgressqlstore/dialect.go b/ee/sqlstore/postgressqlstore/dialect.go index c15fbe8d56..e80469ae52 100644 --- a/ee/sqlstore/postgressqlstore/dialect.go +++ b/ee/sqlstore/postgressqlstore/dialect.go @@ -17,13 +17,15 @@ var ( ) var ( - Org = "org" - User = "user" + Org = "org" + User = "user" + CloudIntegration = "cloud_integration" ) var ( - OrgReference = `("org_id") REFERENCES "organizations" ("id")` - UserReference = `("user_id") REFERENCES "users" ("id") ON DELETE CASCADE ON UPDATE CASCADE` + OrgReference = `("org_id") REFERENCES "organizations" ("id")` + UserReference = `("user_id") REFERENCES "users" ("id") ON DELETE CASCADE ON UPDATE CASCADE` + CloudIntegrationReference = `("cloud_integration_id") REFERENCES "cloud_integration" ("id") ON DELETE CASCADE` ) type dialect struct { @@ -211,6 +213,8 @@ func (dialect *dialect) RenameTableAndModifyModel(ctx context.Context, bun bun.I fkReferences = append(fkReferences, OrgReference) } else if reference == User && !slices.Contains(fkReferences, UserReference) { fkReferences = append(fkReferences, UserReference) + } else if reference == CloudIntegration && !slices.Contains(fkReferences, CloudIntegrationReference) { + fkReferences = append(fkReferences, CloudIntegrationReference) } } diff --git a/pkg/query-service/app/cloudintegrations/accountsRepo.go b/pkg/query-service/app/cloudintegrations/accountsRepo.go index 905254abf0..7bf3f61de0 100644 --- a/pkg/query-service/app/cloudintegrations/accountsRepo.go +++ b/pkg/query-service/app/cloudintegrations/accountsRepo.go @@ -8,68 +8,59 @@ import ( "time" "github.com/SigNoz/signoz/pkg/query-service/model" - "github.com/google/uuid" - "github.com/jmoiron/sqlx" + "github.com/SigNoz/signoz/pkg/sqlstore" + "github.com/SigNoz/signoz/pkg/types" + "github.com/SigNoz/signoz/pkg/valuer" ) type cloudProviderAccountsRepository interface { - listConnected(ctx context.Context, cloudProvider string) ([]AccountRecord, *model.ApiError) + listConnected(ctx context.Context, orgId string, provider string) ([]types.CloudIntegration, *model.ApiError) - get(ctx context.Context, cloudProvider string, id string) (*AccountRecord, *model.ApiError) + get(ctx context.Context, orgId string, provider string, id string) (*types.CloudIntegration, *model.ApiError) - getConnectedCloudAccount( - ctx context.Context, cloudProvider string, cloudAccountId string, - ) (*AccountRecord, *model.ApiError) + getConnectedCloudAccount(ctx context.Context, orgId string, provider string, accountID string) (*types.CloudIntegration, *model.ApiError) // Insert an account or update it by (cloudProvider, id) // for specified non-empty fields upsert( ctx context.Context, - cloudProvider string, + orgId string, + provider string, id *string, - config *AccountConfig, - cloudAccountId *string, - agentReport *AgentReport, + config *types.AccountConfig, + accountId *string, + agentReport *types.AgentReport, removedAt *time.Time, - ) (*AccountRecord, *model.ApiError) + ) (*types.CloudIntegration, *model.ApiError) } -func newCloudProviderAccountsRepository(db *sqlx.DB) ( +func newCloudProviderAccountsRepository(store sqlstore.SQLStore) ( *cloudProviderAccountsSQLRepository, error, ) { return &cloudProviderAccountsSQLRepository{ - db: db, + store: store, }, nil } type cloudProviderAccountsSQLRepository struct { - db *sqlx.DB + store sqlstore.SQLStore } func (r *cloudProviderAccountsSQLRepository) listConnected( - ctx context.Context, cloudProvider string, -) ([]AccountRecord, *model.ApiError) { - accounts := []AccountRecord{} + ctx context.Context, orgId string, cloudProvider string, +) ([]types.CloudIntegration, *model.ApiError) { + accounts := []types.CloudIntegration{} + + err := r.store.BunDB().NewSelect(). + Model(&accounts). + Where("org_id = ?", orgId). + Where("provider = ?", cloudProvider). + Where("removed_at is NULL"). + Where("account_id is not NULL"). + Where("last_agent_report is not NULL"). + Order("created_at"). + Scan(ctx) - err := r.db.SelectContext( - ctx, &accounts, ` - select - cloud_provider, - id, - config_json, - cloud_account_id, - last_agent_report_json, - created_at, - removed_at - from cloud_integrations_accounts - where - cloud_provider=$1 - and removed_at is NULL - and cloud_account_id is not NULL - and last_agent_report_json is not NULL - order by created_at - `, cloudProvider, - ) if err != nil { return nil, model.InternalError(fmt.Errorf( "could not query connected cloud accounts: %w", err, @@ -80,27 +71,16 @@ func (r *cloudProviderAccountsSQLRepository) listConnected( } func (r *cloudProviderAccountsSQLRepository) get( - ctx context.Context, cloudProvider string, id string, -) (*AccountRecord, *model.ApiError) { - var result AccountRecord + ctx context.Context, orgId string, provider string, id string, +) (*types.CloudIntegration, *model.ApiError) { + var result types.CloudIntegration - err := r.db.GetContext( - ctx, &result, ` - select - cloud_provider, - id, - config_json, - cloud_account_id, - last_agent_report_json, - created_at, - removed_at - from cloud_integrations_accounts - where - cloud_provider=$1 - and id=$2 - `, - cloudProvider, id, - ) + err := r.store.BunDB().NewSelect(). + Model(&result). + Where("org_id = ?", orgId). + Where("provider = ?", provider). + Where("id = ?", id). + Scan(ctx) if err == sql.ErrNoRows { return nil, model.NotFoundError(fmt.Errorf( @@ -116,33 +96,22 @@ func (r *cloudProviderAccountsSQLRepository) get( } func (r *cloudProviderAccountsSQLRepository) getConnectedCloudAccount( - ctx context.Context, cloudProvider string, cloudAccountId string, -) (*AccountRecord, *model.ApiError) { - var result AccountRecord + ctx context.Context, orgId string, provider string, accountId string, +) (*types.CloudIntegration, *model.ApiError) { + var result types.CloudIntegration - err := r.db.GetContext( - ctx, &result, ` - select - cloud_provider, - id, - config_json, - cloud_account_id, - last_agent_report_json, - created_at, - removed_at - from cloud_integrations_accounts - where - cloud_provider=$1 - and cloud_account_id=$2 - and last_agent_report_json is not NULL - and removed_at is NULL - `, - cloudProvider, cloudAccountId, - ) + err := r.store.BunDB().NewSelect(). + Model(&result). + Where("org_id = ?", orgId). + Where("provider = ?", provider). + Where("account_id = ?", accountId). + Where("last_agent_report is not NULL"). + Where("removed_at is NULL"). + Scan(ctx) if err == sql.ErrNoRows { return nil, model.NotFoundError(fmt.Errorf( - "couldn't find connected cloud account %s", cloudAccountId, + "couldn't find connected cloud account %s", accountId, )) } else if err != nil { return nil, model.InternalError(fmt.Errorf( @@ -155,17 +124,18 @@ func (r *cloudProviderAccountsSQLRepository) getConnectedCloudAccount( func (r *cloudProviderAccountsSQLRepository) upsert( ctx context.Context, - cloudProvider string, + orgId string, + provider string, id *string, - config *AccountConfig, - cloudAccountId *string, - agentReport *AgentReport, + config *types.AccountConfig, + accountId *string, + agentReport *types.AgentReport, removedAt *time.Time, -) (*AccountRecord, *model.ApiError) { +) (*types.CloudIntegration, *model.ApiError) { // Insert if id == nil { - newId := uuid.NewString() - id = &newId + temp := valuer.GenerateUUID().StringValue() + id = &temp } // Prepare clause for setting values in `on conflict do update` @@ -176,19 +146,19 @@ func (r *cloudProviderAccountsSQLRepository) upsert( if config != nil { onConflictSetStmts = append( - onConflictSetStmts, setColStatement("config_json"), + onConflictSetStmts, setColStatement("config"), ) } - if cloudAccountId != nil { + if accountId != nil { onConflictSetStmts = append( - onConflictSetStmts, setColStatement("cloud_account_id"), + onConflictSetStmts, setColStatement("account_id"), ) } if agentReport != nil { onConflictSetStmts = append( - onConflictSetStmts, setColStatement("last_agent_report_json"), + onConflictSetStmts, setColStatement("last_agent_report"), ) } @@ -198,37 +168,45 @@ func (r *cloudProviderAccountsSQLRepository) upsert( ) } + // set updated_at to current timestamp if it's an upsert + onConflictSetStmts = append( + onConflictSetStmts, setColStatement("updated_at"), + ) + onConflictClause := "" if len(onConflictSetStmts) > 0 { onConflictClause = fmt.Sprintf( - "on conflict(cloud_provider, id) do update SET\n%s", + "conflict(id, provider, org_id) do update SET\n%s", strings.Join(onConflictSetStmts, ",\n"), ) } - insertQuery := fmt.Sprintf(` - INSERT INTO cloud_integrations_accounts ( - cloud_provider, - id, - config_json, - cloud_account_id, - last_agent_report_json, - removed_at - ) values ($1, $2, $3, $4, $5, $6) - %s`, onConflictClause, - ) + integration := types.CloudIntegration{ + OrgID: orgId, + Provider: provider, + Identifiable: types.Identifiable{ID: valuer.MustNewUUID(*id)}, + TimeAuditable: types.TimeAuditable{ + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + Config: config, + AccountID: accountId, + LastAgentReport: agentReport, + RemovedAt: removedAt, + } + + _, dbErr := r.store.BunDB().NewInsert(). + Model(&integration). + On(onConflictClause). + Exec(ctx) - _, dbErr := r.db.ExecContext( - ctx, insertQuery, - cloudProvider, id, config, cloudAccountId, agentReport, removedAt, - ) if dbErr != nil { return nil, model.InternalError(fmt.Errorf( "could not upsert cloud account record: %w", dbErr, )) } - upsertedAccount, apiErr := r.get(ctx, cloudProvider, *id) + upsertedAccount, apiErr := r.get(ctx, orgId, provider, *id) if apiErr != nil { return nil, model.InternalError(fmt.Errorf( "couldn't fetch upserted account by id: %w", apiErr.ToError(), diff --git a/pkg/query-service/app/cloudintegrations/controller.go b/pkg/query-service/app/cloudintegrations/controller.go index bf1144eec2..39770d0edd 100644 --- a/pkg/query-service/app/cloudintegrations/controller.go +++ b/pkg/query-service/app/cloudintegrations/controller.go @@ -33,12 +33,12 @@ type Controller struct { func NewController(sqlStore sqlstore.SQLStore) ( *Controller, error, ) { - accountsRepo, err := newCloudProviderAccountsRepository(sqlStore.SQLxDB()) + accountsRepo, err := newCloudProviderAccountsRepository(sqlStore) if err != nil { return nil, fmt.Errorf("couldn't create cloud provider accounts repo: %w", err) } - serviceConfigRepo, err := newServiceConfigRepository(sqlStore.SQLxDB()) + serviceConfigRepo, err := newServiceConfigRepository(sqlStore) if err != nil { return nil, fmt.Errorf("couldn't create cloud provider service config repo: %w", err) } @@ -49,19 +49,12 @@ func NewController(sqlStore sqlstore.SQLStore) ( }, nil } -type Account struct { - Id string `json:"id"` - CloudAccountId string `json:"cloud_account_id"` - Config AccountConfig `json:"config"` - Status AccountStatus `json:"status"` -} - type ConnectedAccountsListResponse struct { - Accounts []Account `json:"accounts"` + Accounts []types.Account `json:"accounts"` } func (c *Controller) ListConnectedAccounts( - ctx context.Context, cloudProvider string, + ctx context.Context, orgId string, cloudProvider string, ) ( *ConnectedAccountsListResponse, *model.ApiError, ) { @@ -69,14 +62,14 @@ func (c *Controller) ListConnectedAccounts( return nil, apiErr } - accountRecords, apiErr := c.accountsRepo.listConnected(ctx, cloudProvider) + accountRecords, apiErr := c.accountsRepo.listConnected(ctx, orgId, cloudProvider) if apiErr != nil { return nil, model.WrapApiError(apiErr, "couldn't list cloud accounts") } - connectedAccounts := []Account{} + connectedAccounts := []types.Account{} for _, a := range accountRecords { - connectedAccounts = append(connectedAccounts, a.account()) + connectedAccounts = append(connectedAccounts, a.Account()) } return &ConnectedAccountsListResponse{ @@ -88,7 +81,7 @@ type GenerateConnectionUrlRequest struct { // Optional. To be specified for updates. AccountId *string `json:"account_id,omitempty"` - AccountConfig AccountConfig `json:"account_config"` + AccountConfig types.AccountConfig `json:"account_config"` AgentConfig SigNozAgentConfig `json:"agent_config"` } @@ -109,7 +102,7 @@ type GenerateConnectionUrlResponse struct { } func (c *Controller) GenerateConnectionUrl( - ctx context.Context, cloudProvider string, req GenerateConnectionUrlRequest, + ctx context.Context, orgId string, cloudProvider string, req GenerateConnectionUrlRequest, ) (*GenerateConnectionUrlResponse, *model.ApiError) { // Account connection with a simple connection URL may not be available for all providers. if cloudProvider != "aws" { @@ -117,7 +110,7 @@ func (c *Controller) GenerateConnectionUrl( } account, apiErr := c.accountsRepo.upsert( - ctx, cloudProvider, req.AccountId, &req.AccountConfig, nil, nil, nil, + ctx, orgId, cloudProvider, req.AccountId, &req.AccountConfig, nil, nil, nil, ) if apiErr != nil { return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account") @@ -135,7 +128,7 @@ func (c *Controller) GenerateConnectionUrl( "param_SigNozIntegrationAgentVersion": agentVersion, "param_SigNozApiUrl": req.AgentConfig.SigNozAPIUrl, "param_SigNozApiKey": req.AgentConfig.SigNozAPIKey, - "param_SigNozAccountId": account.Id, + "param_SigNozAccountId": account.ID.StringValue(), "param_IngestionUrl": req.AgentConfig.IngestionUrl, "param_IngestionKey": req.AgentConfig.IngestionKey, "stackName": "signoz-integration", @@ -148,19 +141,19 @@ func (c *Controller) GenerateConnectionUrl( } return &GenerateConnectionUrlResponse{ - AccountId: account.Id, + AccountId: account.ID.StringValue(), ConnectionUrl: connectionUrl, }, nil } type AccountStatusResponse struct { - Id string `json:"id"` - CloudAccountId *string `json:"cloud_account_id,omitempty"` - Status AccountStatus `json:"status"` + Id string `json:"id"` + CloudAccountId *string `json:"cloud_account_id,omitempty"` + Status types.AccountStatus `json:"status"` } func (c *Controller) GetAccountStatus( - ctx context.Context, cloudProvider string, accountId string, + ctx context.Context, orgId string, cloudProvider string, accountId string, ) ( *AccountStatusResponse, *model.ApiError, ) { @@ -168,23 +161,23 @@ func (c *Controller) GetAccountStatus( return nil, apiErr } - account, apiErr := c.accountsRepo.get(ctx, cloudProvider, accountId) + account, apiErr := c.accountsRepo.get(ctx, orgId, cloudProvider, accountId) if apiErr != nil { return nil, apiErr } resp := AccountStatusResponse{ - Id: account.Id, - CloudAccountId: account.CloudAccountId, - Status: account.status(), + Id: account.ID.StringValue(), + CloudAccountId: account.AccountID, + Status: account.Status(), } return &resp, nil } type AgentCheckInRequest struct { - AccountId string `json:"account_id"` - CloudAccountId string `json:"cloud_account_id"` + ID string `json:"account_id"` + AccountID string `json:"cloud_account_id"` // Arbitrary cloud specific Agent data Data map[string]any `json:"data,omitempty"` } @@ -204,35 +197,35 @@ type IntegrationConfigForAgent struct { } func (c *Controller) CheckInAsAgent( - ctx context.Context, cloudProvider string, req AgentCheckInRequest, + ctx context.Context, orgId string, cloudProvider string, req AgentCheckInRequest, ) (*AgentCheckInResponse, *model.ApiError) { if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil { return nil, apiErr } - existingAccount, apiErr := c.accountsRepo.get(ctx, cloudProvider, req.AccountId) - if existingAccount != nil && existingAccount.CloudAccountId != nil && *existingAccount.CloudAccountId != req.CloudAccountId { + existingAccount, apiErr := c.accountsRepo.get(ctx, orgId, cloudProvider, req.ID) + if existingAccount != nil && existingAccount.AccountID != nil && *existingAccount.AccountID != req.AccountID { return nil, model.BadRequest(fmt.Errorf( "can't check in with new %s account id %s for account %s with existing %s id %s", - cloudProvider, req.CloudAccountId, existingAccount.Id, cloudProvider, *existingAccount.CloudAccountId, + cloudProvider, req.AccountID, existingAccount.ID.StringValue(), cloudProvider, *existingAccount.AccountID, )) } - existingAccount, apiErr = c.accountsRepo.getConnectedCloudAccount(ctx, cloudProvider, req.CloudAccountId) - if existingAccount != nil && existingAccount.Id != req.AccountId { + existingAccount, apiErr = c.accountsRepo.getConnectedCloudAccount(ctx, orgId, cloudProvider, req.AccountID) + if existingAccount != nil && existingAccount.ID.StringValue() != req.ID { return nil, model.BadRequest(fmt.Errorf( "can't check in to %s account %s with id %s. already connected with id %s", - cloudProvider, req.CloudAccountId, req.AccountId, existingAccount.Id, + cloudProvider, req.AccountID, req.ID, existingAccount.ID.StringValue(), )) } - agentReport := AgentReport{ + agentReport := types.AgentReport{ TimestampMillis: time.Now().UnixMilli(), Data: req.Data, } account, apiErr := c.accountsRepo.upsert( - ctx, cloudProvider, &req.AccountId, nil, &req.CloudAccountId, &agentReport, nil, + ctx, orgId, cloudProvider, &req.ID, nil, &req.AccountID, &agentReport, nil, ) if apiErr != nil { return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account") @@ -265,7 +258,7 @@ func (c *Controller) CheckInAsAgent( } svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount( - ctx, cloudProvider, *account.CloudAccountId, + ctx, orgId, account.ID.StringValue(), ) if apiErr != nil { return nil, model.WrapApiError( @@ -298,54 +291,55 @@ func (c *Controller) CheckInAsAgent( } return &AgentCheckInResponse{ - AccountId: account.Id, - CloudAccountId: *account.CloudAccountId, + AccountId: account.ID.StringValue(), + CloudAccountId: *account.AccountID, RemovedAt: account.RemovedAt, IntegrationConfig: agentConfig, }, nil } type UpdateAccountConfigRequest struct { - Config AccountConfig `json:"config"` + Config types.AccountConfig `json:"config"` } func (c *Controller) UpdateAccountConfig( ctx context.Context, + orgId string, cloudProvider string, accountId string, req UpdateAccountConfigRequest, -) (*Account, *model.ApiError) { +) (*types.Account, *model.ApiError) { if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil { return nil, apiErr } accountRecord, apiErr := c.accountsRepo.upsert( - ctx, cloudProvider, &accountId, &req.Config, nil, nil, nil, + ctx, orgId, cloudProvider, &accountId, &req.Config, nil, nil, nil, ) if apiErr != nil { return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account") } - account := accountRecord.account() + account := accountRecord.Account() return &account, nil } func (c *Controller) DisconnectAccount( - ctx context.Context, cloudProvider string, accountId string, -) (*AccountRecord, *model.ApiError) { + ctx context.Context, orgId string, cloudProvider string, accountId string, +) (*types.CloudIntegration, *model.ApiError) { if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil { return nil, apiErr } - account, apiErr := c.accountsRepo.get(ctx, cloudProvider, accountId) + account, apiErr := c.accountsRepo.get(ctx, orgId, cloudProvider, accountId) if apiErr != nil { return nil, model.WrapApiError(apiErr, "couldn't disconnect account") } tsNow := time.Now() account, apiErr = c.accountsRepo.upsert( - ctx, cloudProvider, &accountId, nil, nil, nil, &tsNow, + ctx, orgId, cloudProvider, &accountId, nil, nil, nil, &tsNow, ) if apiErr != nil { return nil, model.WrapApiError(apiErr, "couldn't disconnect account") @@ -360,6 +354,7 @@ type ListServicesResponse struct { func (c *Controller) ListServices( ctx context.Context, + orgID string, cloudProvider string, cloudAccountId *string, ) (*ListServicesResponse, *model.ApiError) { @@ -373,10 +368,16 @@ func (c *Controller) ListServices( return nil, model.WrapApiError(apiErr, "couldn't list cloud services") } - svcConfigs := map[string]*CloudServiceConfig{} + svcConfigs := map[string]*types.CloudServiceConfig{} if cloudAccountId != nil { + activeAccount, apiErr := c.accountsRepo.getConnectedCloudAccount( + ctx, orgID, cloudProvider, *cloudAccountId, + ) + if apiErr != nil { + return nil, model.WrapApiError(apiErr, "couldn't get active account") + } svcConfigs, apiErr = c.serviceConfigRepo.getAllForAccount( - ctx, cloudProvider, *cloudAccountId, + ctx, orgID, activeAccount.ID.StringValue(), ) if apiErr != nil { return nil, model.WrapApiError( @@ -400,6 +401,7 @@ func (c *Controller) ListServices( func (c *Controller) GetServiceDetails( ctx context.Context, + orgID string, cloudProvider string, serviceId string, cloudAccountId *string, @@ -415,8 +417,16 @@ func (c *Controller) GetServiceDetails( } if cloudAccountId != nil { + + activeAccount, apiErr := c.accountsRepo.getConnectedCloudAccount( + ctx, orgID, cloudProvider, *cloudAccountId, + ) + if apiErr != nil { + return nil, model.WrapApiError(apiErr, "couldn't get active account") + } + config, apiErr := c.serviceConfigRepo.get( - ctx, cloudProvider, *cloudAccountId, serviceId, + ctx, orgID, activeAccount.ID.StringValue(), serviceId, ) if apiErr != nil && apiErr.Type() != model.ErrorNotFound { return nil, model.WrapApiError(apiErr, "couldn't fetch service config") @@ -425,15 +435,22 @@ func (c *Controller) GetServiceDetails( if config != nil { service.Config = config + enabled := false if config.Metrics != nil && config.Metrics.Enabled { - // add links to service dashboards, making them clickable. - for i, d := range service.Assets.Dashboards { - dashboardUuid := c.dashboardUuid( - cloudProvider, serviceId, d.Id, - ) + enabled = true + } + + // add links to service dashboards, making them clickable. + for i, d := range service.Assets.Dashboards { + dashboardUuid := c.dashboardUuid( + cloudProvider, serviceId, d.Id, + ) + if enabled { service.Assets.Dashboards[i].Url = fmt.Sprintf( "/dashboard/%s", dashboardUuid, ) + } else { + service.Assets.Dashboards[i].Url = "" } } } @@ -443,17 +460,18 @@ func (c *Controller) GetServiceDetails( } type UpdateServiceConfigRequest struct { - CloudAccountId string `json:"cloud_account_id"` - Config CloudServiceConfig `json:"config"` + CloudAccountId string `json:"cloud_account_id"` + Config types.CloudServiceConfig `json:"config"` } type UpdateServiceConfigResponse struct { - Id string `json:"id"` - Config CloudServiceConfig `json:"config"` + Id string `json:"id"` + Config types.CloudServiceConfig `json:"config"` } func (c *Controller) UpdateServiceConfig( ctx context.Context, + orgID string, cloudProvider string, serviceId string, req UpdateServiceConfigRequest, @@ -465,7 +483,7 @@ func (c *Controller) UpdateServiceConfig( // can only update config for a connected cloud account id _, apiErr := c.accountsRepo.getConnectedCloudAccount( - ctx, cloudProvider, req.CloudAccountId, + ctx, orgID, cloudProvider, req.CloudAccountId, ) if apiErr != nil { return nil, model.WrapApiError(apiErr, "couldn't find connected cloud account") @@ -478,7 +496,7 @@ func (c *Controller) UpdateServiceConfig( } updatedConfig, apiErr := c.serviceConfigRepo.upsert( - ctx, cloudProvider, req.CloudAccountId, serviceId, req.Config, + ctx, orgID, cloudProvider, req.CloudAccountId, serviceId, req.Config, ) if apiErr != nil { return nil, model.WrapApiError(apiErr, "couldn't update service config") @@ -492,13 +510,13 @@ func (c *Controller) UpdateServiceConfig( // All dashboards that are available based on cloud integrations configuration // across all cloud providers -func (c *Controller) AvailableDashboards(ctx context.Context) ( +func (c *Controller) AvailableDashboards(ctx context.Context, orgId string) ( []types.Dashboard, *model.ApiError, ) { allDashboards := []types.Dashboard{} for _, provider := range []string{"aws"} { - providerDashboards, apiErr := c.AvailableDashboardsForCloudProvider(ctx, provider) + providerDashboards, apiErr := c.AvailableDashboardsForCloudProvider(ctx, orgId, provider) if apiErr != nil { return nil, model.WrapApiError( apiErr, fmt.Sprintf("couldn't get available dashboards for %s", provider), @@ -512,10 +530,10 @@ func (c *Controller) AvailableDashboards(ctx context.Context) ( } func (c *Controller) AvailableDashboardsForCloudProvider( - ctx context.Context, cloudProvider string, + ctx context.Context, orgID string, cloudProvider string, ) ([]types.Dashboard, *model.ApiError) { - accountRecords, apiErr := c.accountsRepo.listConnected(ctx, cloudProvider) + accountRecords, apiErr := c.accountsRepo.listConnected(ctx, orgID, cloudProvider) if apiErr != nil { return nil, model.WrapApiError(apiErr, "couldn't list connected cloud accounts") } @@ -524,9 +542,9 @@ func (c *Controller) AvailableDashboardsForCloudProvider( servicesWithAvailableMetrics := map[string]*time.Time{} for _, ar := range accountRecords { - if ar.CloudAccountId != nil { + if ar.AccountID != nil { configsBySvcId, apiErr := c.serviceConfigRepo.getAllForAccount( - ctx, cloudProvider, *ar.CloudAccountId, + ctx, orgID, ar.ID.StringValue(), ) if apiErr != nil { return nil, apiErr @@ -574,6 +592,7 @@ func (c *Controller) AvailableDashboardsForCloudProvider( } func (c *Controller) GetDashboardById( ctx context.Context, + orgId string, dashboardUuid string, ) (*types.Dashboard, *model.ApiError) { cloudProvider, _, _, apiErr := c.parseDashboardUuid(dashboardUuid) @@ -581,7 +600,7 @@ func (c *Controller) GetDashboardById( return nil, apiErr } - allDashboards, apiErr := c.AvailableDashboardsForCloudProvider(ctx, cloudProvider) + allDashboards, apiErr := c.AvailableDashboardsForCloudProvider(ctx, orgId, cloudProvider) if apiErr != nil { return nil, model.WrapApiError( apiErr, fmt.Sprintf("couldn't list available dashboards"), diff --git a/pkg/query-service/app/cloudintegrations/controller_test.go b/pkg/query-service/app/cloudintegrations/controller_test.go index 05c6dac287..b34ab9b83b 100644 --- a/pkg/query-service/app/cloudintegrations/controller_test.go +++ b/pkg/query-service/app/cloudintegrations/controller_test.go @@ -4,23 +4,30 @@ import ( "context" "testing" + "github.com/SigNoz/signoz/pkg/query-service/auth" + "github.com/SigNoz/signoz/pkg/query-service/constants" + "github.com/SigNoz/signoz/pkg/query-service/dao" "github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/utils" + "github.com/SigNoz/signoz/pkg/types" "github.com/google/uuid" "github.com/stretchr/testify/require" ) func TestRegenerateConnectionUrlWithUpdatedConfig(t *testing.T) { require := require.New(t) - sqlStore, _ := utils.NewTestSqliteDB(t) + sqlStore := utils.NewQueryServiceDBForTests(t) controller, err := NewController(sqlStore) require.NoError(err) + user, apiErr := createTestUser() + require.Nil(apiErr) + // should be able to generate connection url for // same account id again with updated config - testAccountConfig1 := AccountConfig{EnabledRegions: []string{"us-east-1", "us-west-1"}} + testAccountConfig1 := types.AccountConfig{EnabledRegions: []string{"us-east-1", "us-west-1"}} resp1, apiErr := controller.GenerateConnectionUrl( - context.TODO(), "aws", GenerateConnectionUrlRequest{ + context.TODO(), user.OrgID, "aws", GenerateConnectionUrlRequest{ AccountConfig: testAccountConfig1, AgentConfig: SigNozAgentConfig{Region: "us-east-2"}, }, @@ -31,14 +38,14 @@ func TestRegenerateConnectionUrlWithUpdatedConfig(t *testing.T) { testAccountId := resp1.AccountId account, apiErr := controller.accountsRepo.get( - context.TODO(), "aws", testAccountId, + context.TODO(), user.OrgID, "aws", testAccountId, ) require.Nil(apiErr) require.Equal(testAccountConfig1, *account.Config) - testAccountConfig2 := AccountConfig{EnabledRegions: []string{"us-east-2", "us-west-2"}} + testAccountConfig2 := types.AccountConfig{EnabledRegions: []string{"us-east-2", "us-west-2"}} resp2, apiErr := controller.GenerateConnectionUrl( - context.TODO(), "aws", GenerateConnectionUrlRequest{ + context.TODO(), user.OrgID, "aws", GenerateConnectionUrlRequest{ AccountId: &testAccountId, AccountConfig: testAccountConfig2, AgentConfig: SigNozAgentConfig{Region: "us-east-2"}, @@ -48,7 +55,7 @@ func TestRegenerateConnectionUrlWithUpdatedConfig(t *testing.T) { require.Equal(testAccountId, resp2.AccountId) account, apiErr = controller.accountsRepo.get( - context.TODO(), "aws", testAccountId, + context.TODO(), user.OrgID, "aws", testAccountId, ) require.Nil(apiErr) require.Equal(testAccountConfig2, *account.Config) @@ -56,18 +63,21 @@ func TestRegenerateConnectionUrlWithUpdatedConfig(t *testing.T) { func TestAgentCheckIns(t *testing.T) { require := require.New(t) - sqlStore, _ := utils.NewTestSqliteDB(t) + sqlStore := utils.NewQueryServiceDBForTests(t) controller, err := NewController(sqlStore) require.NoError(err) + user, apiErr := createTestUser() + require.Nil(apiErr) + // An agent should be able to check in from a cloud account even // if no connection url was requested (no account with agent's account id exists) testAccountId1 := uuid.NewString() testCloudAccountId1 := "546311234" resp1, apiErr := controller.CheckInAsAgent( - context.TODO(), "aws", AgentCheckInRequest{ - AccountId: testAccountId1, - CloudAccountId: testCloudAccountId1, + context.TODO(), user.OrgID, "aws", AgentCheckInRequest{ + ID: testAccountId1, + AccountID: testCloudAccountId1, }, ) require.Nil(apiErr) @@ -78,9 +88,9 @@ func TestAgentCheckIns(t *testing.T) { // cloud account id for the same account. testCloudAccountId2 := "99999999" _, apiErr = controller.CheckInAsAgent( - context.TODO(), "aws", AgentCheckInRequest{ - AccountId: testAccountId1, - CloudAccountId: testCloudAccountId2, + context.TODO(), user.OrgID, "aws", AgentCheckInRequest{ + ID: testAccountId1, + AccountID: testCloudAccountId2, }, ) require.NotNil(apiErr) @@ -90,18 +100,18 @@ func TestAgentCheckIns(t *testing.T) { // i.e. there can't be 2 connected account records for the same cloud account id // at any point in time. existingConnected, apiErr := controller.accountsRepo.getConnectedCloudAccount( - context.TODO(), "aws", testCloudAccountId1, + context.TODO(), user.OrgID, "aws", testCloudAccountId1, ) require.Nil(apiErr) require.NotNil(existingConnected) - require.Equal(testCloudAccountId1, *existingConnected.CloudAccountId) + require.Equal(testCloudAccountId1, *existingConnected.AccountID) require.Nil(existingConnected.RemovedAt) testAccountId2 := uuid.NewString() _, apiErr = controller.CheckInAsAgent( - context.TODO(), "aws", AgentCheckInRequest{ - AccountId: testAccountId2, - CloudAccountId: testCloudAccountId1, + context.TODO(), user.OrgID, "aws", AgentCheckInRequest{ + ID: testAccountId2, + AccountID: testCloudAccountId1, }, ) require.NotNil(apiErr) @@ -109,29 +119,29 @@ func TestAgentCheckIns(t *testing.T) { // After disconnecting existing account record, the agent should be able to // connected for a particular cloud account id _, apiErr = controller.DisconnectAccount( - context.TODO(), "aws", testAccountId1, + context.TODO(), user.OrgID, "aws", testAccountId1, ) existingConnected, apiErr = controller.accountsRepo.getConnectedCloudAccount( - context.TODO(), "aws", testCloudAccountId1, + context.TODO(), user.OrgID, "aws", testCloudAccountId1, ) require.Nil(existingConnected) require.NotNil(apiErr) require.Equal(model.ErrorNotFound, apiErr.Type()) _, apiErr = controller.CheckInAsAgent( - context.TODO(), "aws", AgentCheckInRequest{ - AccountId: testAccountId2, - CloudAccountId: testCloudAccountId1, + context.TODO(), user.OrgID, "aws", AgentCheckInRequest{ + ID: testAccountId2, + AccountID: testCloudAccountId1, }, ) require.Nil(apiErr) // should be able to keep checking in _, apiErr = controller.CheckInAsAgent( - context.TODO(), "aws", AgentCheckInRequest{ - AccountId: testAccountId2, - CloudAccountId: testCloudAccountId1, + context.TODO(), user.OrgID, "aws", AgentCheckInRequest{ + ID: testAccountId2, + AccountID: testCloudAccountId1, }, ) require.Nil(apiErr) @@ -139,13 +149,16 @@ func TestAgentCheckIns(t *testing.T) { func TestCantDisconnectNonExistentAccount(t *testing.T) { require := require.New(t) - sqlStore, _ := utils.NewTestSqliteDB(t) + sqlStore := utils.NewQueryServiceDBForTests(t) controller, err := NewController(sqlStore) require.NoError(err) + user, apiErr := createTestUser() + require.Nil(apiErr) + // Attempting to disconnect a non-existent account should return error account, apiErr := controller.DisconnectAccount( - context.TODO(), "aws", uuid.NewString(), + context.TODO(), user.OrgID, "aws", uuid.NewString(), ) require.NotNil(apiErr) require.Equal(model.ErrorNotFound, apiErr.Type()) @@ -154,15 +167,23 @@ func TestCantDisconnectNonExistentAccount(t *testing.T) { func TestConfigureService(t *testing.T) { require := require.New(t) - sqlStore, _ := utils.NewTestSqliteDB(t) + sqlStore := utils.NewQueryServiceDBForTests(t) controller, err := NewController(sqlStore) require.NoError(err) + user, apiErr := createTestUser() + require.Nil(apiErr) + + // create a connected account testCloudAccountId := "546311234" + testConnectedAccount := makeTestConnectedAccount(t, user.OrgID, controller, testCloudAccountId) + require.Nil(testConnectedAccount.RemovedAt) + require.NotEmpty(testConnectedAccount.AccountID) + require.Equal(testCloudAccountId, *testConnectedAccount.AccountID) // should start out without any service config svcListResp, apiErr := controller.ListServices( - context.TODO(), "aws", &testCloudAccountId, + context.TODO(), user.OrgID, "aws", &testCloudAccountId, ) require.Nil(apiErr) @@ -170,25 +191,20 @@ func TestConfigureService(t *testing.T) { require.Nil(svcListResp.Services[0].Config) svcDetails, apiErr := controller.GetServiceDetails( - context.TODO(), "aws", testSvcId, &testCloudAccountId, + context.TODO(), user.OrgID, "aws", testSvcId, &testCloudAccountId, ) require.Nil(apiErr) require.Equal(testSvcId, svcDetails.Id) require.Nil(svcDetails.Config) // should be able to configure a service for a connected account - testConnectedAccount := makeTestConnectedAccount(t, controller, testCloudAccountId) - require.Nil(testConnectedAccount.RemovedAt) - require.NotNil(testConnectedAccount.CloudAccountId) - require.Equal(testCloudAccountId, *testConnectedAccount.CloudAccountId) - - testSvcConfig := CloudServiceConfig{ - Metrics: &CloudServiceMetricsConfig{ + testSvcConfig := types.CloudServiceConfig{ + Metrics: &types.CloudServiceMetricsConfig{ Enabled: true, }, } updateSvcConfigResp, apiErr := controller.UpdateServiceConfig( - context.TODO(), "aws", testSvcId, UpdateServiceConfigRequest{ + context.TODO(), user.OrgID, "aws", testSvcId, UpdateServiceConfigRequest{ CloudAccountId: testCloudAccountId, Config: testSvcConfig, }, @@ -198,14 +214,14 @@ func TestConfigureService(t *testing.T) { require.Equal(testSvcConfig, updateSvcConfigResp.Config) svcDetails, apiErr = controller.GetServiceDetails( - context.TODO(), "aws", testSvcId, &testCloudAccountId, + context.TODO(), user.OrgID, "aws", testSvcId, &testCloudAccountId, ) require.Nil(apiErr) require.Equal(testSvcId, svcDetails.Id) require.Equal(testSvcConfig, *svcDetails.Config) svcListResp, apiErr = controller.ListServices( - context.TODO(), "aws", &testCloudAccountId, + context.TODO(), user.OrgID, "aws", &testCloudAccountId, ) require.Nil(apiErr) for _, svc := range svcListResp.Services { @@ -216,12 +232,12 @@ func TestConfigureService(t *testing.T) { // should not be able to configure service after cloud account has been disconnected _, apiErr = controller.DisconnectAccount( - context.TODO(), "aws", testConnectedAccount.Id, + context.TODO(), user.OrgID, "aws", testConnectedAccount.ID.StringValue(), ) require.Nil(apiErr) _, apiErr = controller.UpdateServiceConfig( - context.TODO(), "aws", testSvcId, + context.TODO(), user.OrgID, "aws", testSvcId, UpdateServiceConfigRequest{ CloudAccountId: testCloudAccountId, Config: testSvcConfig, @@ -231,7 +247,7 @@ func TestConfigureService(t *testing.T) { // should not be able to configure a service for a cloud account id that is not connected yet _, apiErr = controller.UpdateServiceConfig( - context.TODO(), "aws", testSvcId, + context.TODO(), user.OrgID, "aws", testSvcId, UpdateServiceConfigRequest{ CloudAccountId: "9999999999", Config: testSvcConfig, @@ -241,7 +257,7 @@ func TestConfigureService(t *testing.T) { // should not be able to set config for an unsupported service _, apiErr = controller.UpdateServiceConfig( - context.TODO(), "aws", "bad-service", UpdateServiceConfigRequest{ + context.TODO(), user.OrgID, "aws", "bad-service", UpdateServiceConfigRequest{ CloudAccountId: testCloudAccountId, Config: testSvcConfig, }, @@ -250,22 +266,54 @@ func TestConfigureService(t *testing.T) { } -func makeTestConnectedAccount(t *testing.T, controller *Controller, cloudAccountId string) *AccountRecord { +func makeTestConnectedAccount(t *testing.T, orgId string, controller *Controller, cloudAccountId string) *types.CloudIntegration { require := require.New(t) // a check in from SigNoz agent creates or updates a connected account. testAccountId := uuid.NewString() resp, apiErr := controller.CheckInAsAgent( - context.TODO(), "aws", AgentCheckInRequest{ - AccountId: testAccountId, - CloudAccountId: cloudAccountId, + context.TODO(), orgId, "aws", AgentCheckInRequest{ + ID: testAccountId, + AccountID: cloudAccountId, }, ) require.Nil(apiErr) require.Equal(testAccountId, resp.AccountId) require.Equal(cloudAccountId, resp.CloudAccountId) - acc, err := controller.accountsRepo.get(context.TODO(), "aws", resp.AccountId) + acc, err := controller.accountsRepo.get(context.TODO(), orgId, "aws", resp.AccountId) require.Nil(err) return acc } + +func createTestUser() (*types.User, *model.ApiError) { + // Create a test user for auth + ctx := context.Background() + org, apiErr := dao.DB().CreateOrg(ctx, &types.Organization{ + Name: "test", + }) + if apiErr != nil { + return nil, apiErr + } + + group, apiErr := dao.DB().GetGroupByName(ctx, constants.AdminGroup) + if apiErr != nil { + return nil, apiErr + } + + auth.InitAuthCache(ctx) + + userId := uuid.NewString() + return dao.DB().CreateUser( + ctx, + &types.User{ + ID: userId, + Name: "test", + Email: userId[:8] + "test@test.com", + Password: "test", + OrgID: org.ID, + GroupID: group.ID, + }, + true, + ) +} diff --git a/pkg/query-service/app/cloudintegrations/model.go b/pkg/query-service/app/cloudintegrations/model.go index 8dc83f2e9f..6cc2060663 100644 --- a/pkg/query-service/app/cloudintegrations/model.go +++ b/pkg/query-service/app/cloudintegrations/model.go @@ -1,123 +1,11 @@ package cloudintegrations import ( - "database/sql/driver" - "encoding/json" "fmt" - "time" "github.com/SigNoz/signoz/pkg/types" ) -// Represents a cloud provider account for cloud integrations -type AccountRecord struct { - CloudProvider string `json:"cloud_provider" db:"cloud_provider"` - Id string `json:"id" db:"id"` - Config *AccountConfig `json:"config" db:"config_json"` - CloudAccountId *string `json:"cloud_account_id" db:"cloud_account_id"` - LastAgentReport *AgentReport `json:"last_agent_report" db:"last_agent_report_json"` - CreatedAt time.Time `json:"created_at" db:"created_at"` - RemovedAt *time.Time `json:"removed_at" db:"removed_at"` -} - -type AccountConfig struct { - EnabledRegions []string `json:"regions"` -} - -func DefaultAccountConfig() AccountConfig { - return AccountConfig{ - EnabledRegions: []string{}, - } -} - -// For serializing from db -func (c *AccountConfig) Scan(src any) error { - data, ok := src.([]byte) - if !ok { - return fmt.Errorf("tried to scan from %T instead of bytes", src) - } - - return json.Unmarshal(data, &c) -} - -// For serializing to db -func (c *AccountConfig) Value() (driver.Value, error) { - if c == nil { - return nil, nil - } - - serialized, err := json.Marshal(c) - if err != nil { - return nil, fmt.Errorf( - "couldn't serialize cloud account config to JSON: %w", err, - ) - } - return serialized, nil -} - -type AgentReport struct { - TimestampMillis int64 `json:"timestamp_millis"` - Data map[string]any `json:"data"` -} - -// For serializing from db -func (r *AgentReport) Scan(src any) error { - data, ok := src.([]byte) - if !ok { - return fmt.Errorf("tried to scan from %T instead of bytes", src) - } - - return json.Unmarshal(data, &r) -} - -// For serializing to db -func (r *AgentReport) Value() (driver.Value, error) { - if r == nil { - return nil, nil - } - - serialized, err := json.Marshal(r) - if err != nil { - return nil, fmt.Errorf( - "couldn't serialize agent report to JSON: %w", err, - ) - } - return serialized, nil -} - -type AccountStatus struct { - Integration AccountIntegrationStatus `json:"integration"` -} - -type AccountIntegrationStatus struct { - LastHeartbeatTsMillis *int64 `json:"last_heartbeat_ts_ms"` -} - -func (a *AccountRecord) status() AccountStatus { - status := AccountStatus{} - if a.LastAgentReport != nil { - lastHeartbeat := a.LastAgentReport.TimestampMillis - status.Integration.LastHeartbeatTsMillis = &lastHeartbeat - } - return status -} - -func (a *AccountRecord) account() Account { - ca := Account{Id: a.Id, Status: a.status()} - - if a.CloudAccountId != nil { - ca.CloudAccountId = *a.CloudAccountId - } - - if a.Config != nil { - ca.Config = *a.Config - } else { - ca.Config = DefaultAccountConfig() - } - - return ca -} - type CloudServiceSummary struct { Id string `json:"id"` Title string `json:"title"` @@ -125,7 +13,7 @@ type CloudServiceSummary struct { // Present only if the service has been configured in the // context of a cloud provider account. - Config *CloudServiceConfig `json:"config,omitempty"` + Config *types.CloudServiceConfig `json:"config,omitempty"` } type CloudServiceDetails struct { @@ -144,44 +32,6 @@ type CloudServiceDetails struct { TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry_collection_strategy"` } -type CloudServiceConfig struct { - Logs *CloudServiceLogsConfig `json:"logs,omitempty"` - Metrics *CloudServiceMetricsConfig `json:"metrics,omitempty"` -} - -// For serializing from db -func (c *CloudServiceConfig) Scan(src any) error { - data, ok := src.([]byte) - if !ok { - return fmt.Errorf("tried to scan from %T instead of bytes", src) - } - - return json.Unmarshal(data, &c) -} - -// For serializing to db -func (c *CloudServiceConfig) Value() (driver.Value, error) { - if c == nil { - return nil, nil - } - - serialized, err := json.Marshal(c) - if err != nil { - return nil, fmt.Errorf( - "couldn't serialize cloud service config to JSON: %w", err, - ) - } - return serialized, nil -} - -type CloudServiceLogsConfig struct { - Enabled bool `json:"enabled"` -} - -type CloudServiceMetricsConfig struct { - Enabled bool `json:"enabled"` -} - type CloudServiceAssets struct { Dashboards []CloudServiceDashboard `json:"dashboards"` } diff --git a/pkg/query-service/app/cloudintegrations/serviceConfigRepo.go b/pkg/query-service/app/cloudintegrations/serviceConfigRepo.go index e3d4ac634a..853f324dda 100644 --- a/pkg/query-service/app/cloudintegrations/serviceConfigRepo.go +++ b/pkg/query-service/app/cloudintegrations/serviceConfigRepo.go @@ -4,161 +4,161 @@ import ( "context" "database/sql" "fmt" + "time" "github.com/SigNoz/signoz/pkg/query-service/model" - "github.com/jmoiron/sqlx" + "github.com/SigNoz/signoz/pkg/sqlstore" + "github.com/SigNoz/signoz/pkg/types" + "github.com/SigNoz/signoz/pkg/valuer" ) type serviceConfigRepository interface { get( ctx context.Context, - cloudProvider string, + orgID string, cloudAccountId string, - serviceId string, - ) (*CloudServiceConfig, *model.ApiError) + serviceType string, + ) (*types.CloudServiceConfig, *model.ApiError) upsert( ctx context.Context, + orgID string, cloudProvider string, cloudAccountId string, serviceId string, - config CloudServiceConfig, - ) (*CloudServiceConfig, *model.ApiError) + config types.CloudServiceConfig, + ) (*types.CloudServiceConfig, *model.ApiError) getAllForAccount( ctx context.Context, - cloudProvider string, + orgID string, cloudAccountId string, ) ( - configsBySvcId map[string]*CloudServiceConfig, + configsBySvcId map[string]*types.CloudServiceConfig, apiErr *model.ApiError, ) } -func newServiceConfigRepository(db *sqlx.DB) ( +func newServiceConfigRepository(store sqlstore.SQLStore) ( *serviceConfigSQLRepository, error, ) { return &serviceConfigSQLRepository{ - db: db, + store: store, }, nil } type serviceConfigSQLRepository struct { - db *sqlx.DB + store sqlstore.SQLStore } func (r *serviceConfigSQLRepository) get( ctx context.Context, - cloudProvider string, + orgID string, cloudAccountId string, - serviceId string, -) (*CloudServiceConfig, *model.ApiError) { + serviceType string, +) (*types.CloudServiceConfig, *model.ApiError) { - var result CloudServiceConfig + var result types.CloudIntegrationService - err := r.db.GetContext( - ctx, &result, ` - select - config_json - from cloud_integrations_service_configs - where - cloud_provider=$1 - and cloud_account_id=$2 - and service_id=$3 - `, - cloudProvider, cloudAccountId, serviceId, - ) + err := r.store.BunDB().NewSelect(). + Model(&result). + Join("JOIN cloud_integration ci ON ci.id = cis.cloud_integration_id"). + Where("ci.org_id = ?", orgID). + Where("ci.id = ?", cloudAccountId). + Where("cis.type = ?", serviceType). + Scan(ctx) if err == sql.ErrNoRows { return nil, model.NotFoundError(fmt.Errorf( - "couldn't find %s %s config for %s", - cloudProvider, serviceId, cloudAccountId, + "couldn't find config for cloud account %s", + cloudAccountId, )) - } else if err != nil { return nil, model.InternalError(fmt.Errorf( "couldn't query cloud service config: %w", err, )) } - return &result, nil + return &result.Config, nil } func (r *serviceConfigSQLRepository) upsert( ctx context.Context, + orgID string, cloudProvider string, cloudAccountId string, serviceId string, - config CloudServiceConfig, -) (*CloudServiceConfig, *model.ApiError) { + config types.CloudServiceConfig, +) (*types.CloudServiceConfig, *model.ApiError) { - query := ` - INSERT INTO cloud_integrations_service_configs ( - cloud_provider, - cloud_account_id, - service_id, - config_json - ) values ($1, $2, $3, $4) - on conflict(cloud_provider, cloud_account_id, service_id) - do update set config_json=excluded.config_json - ` - _, dbErr := r.db.ExecContext( - ctx, query, - cloudProvider, cloudAccountId, serviceId, &config, - ) - if dbErr != nil { + // get cloud integration id from account id + // if the account is not connected, we don't need to upsert the config + var cloudIntegrationId string + err := r.store.BunDB().NewSelect(). + Model((*types.CloudIntegration)(nil)). + Column("id"). + Where("provider = ?", cloudProvider). + Where("account_id = ?", cloudAccountId). + Where("org_id = ?", orgID). + Where("removed_at is NULL"). + Where("last_agent_report is not NULL"). + Scan(ctx, &cloudIntegrationId) + + if err != nil { return nil, model.InternalError(fmt.Errorf( - "could not upsert cloud service config: %w", dbErr, + "couldn't query cloud integration id: %w", err, )) } - upsertedConfig, apiErr := r.get(ctx, cloudProvider, cloudAccountId, serviceId) - if apiErr != nil { + serviceConfig := types.CloudIntegrationService{ + Identifiable: types.Identifiable{ID: valuer.GenerateUUID()}, + TimeAuditable: types.TimeAuditable{ + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + Config: config, + Type: serviceId, + CloudIntegrationID: cloudIntegrationId, + } + _, err = r.store.BunDB().NewInsert(). + Model(&serviceConfig). + On("conflict(cloud_integration_id, type) do update set config=excluded.config, updated_at=excluded.updated_at"). + Exec(ctx) + if err != nil { return nil, model.InternalError(fmt.Errorf( - "couldn't fetch upserted service config: %w", apiErr.ToError(), + "could not upsert cloud service config: %w", err, )) } - return upsertedConfig, nil + return &serviceConfig.Config, nil } func (r *serviceConfigSQLRepository) getAllForAccount( ctx context.Context, - cloudProvider string, + orgID string, cloudAccountId string, -) (map[string]*CloudServiceConfig, *model.ApiError) { +) (map[string]*types.CloudServiceConfig, *model.ApiError) { - type ScannedServiceConfigRecord struct { - ServiceId string `db:"service_id"` - Config CloudServiceConfig `db:"config_json"` - } + serviceConfigs := []types.CloudIntegrationService{} - records := []ScannedServiceConfigRecord{} - - err := r.db.SelectContext( - ctx, &records, ` - select - service_id, - config_json - from cloud_integrations_service_configs - where - cloud_provider=$1 - and cloud_account_id=$2 - `, - cloudProvider, cloudAccountId, - ) + err := r.store.BunDB().NewSelect(). + Model(&serviceConfigs). + Join("JOIN cloud_integration ci ON ci.id = cis.cloud_integration_id"). + Where("ci.id = ?", cloudAccountId). + Where("ci.org_id = ?", orgID). + Scan(ctx) if err != nil { return nil, model.InternalError(fmt.Errorf( "could not query service configs from db: %w", err, )) } - result := map[string]*CloudServiceConfig{} + result := map[string]*types.CloudServiceConfig{} - for _, r := range records { - result[r.ServiceId] = &r.Config + for _, r := range serviceConfigs { + result[r.Type] = &r.Config } return result, nil diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 2401e32828..b15e240a41 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -22,6 +22,7 @@ import ( errorsV2 "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/http/render" "github.com/SigNoz/signoz/pkg/modules/preference" + "github.com/SigNoz/signoz/pkg/query-service/app/integrations" "github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer" "github.com/SigNoz/signoz/pkg/signoz" "github.com/SigNoz/signoz/pkg/valuer" @@ -37,7 +38,6 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/app/dashboards" "github.com/SigNoz/signoz/pkg/query-service/app/explorer" "github.com/SigNoz/signoz/pkg/query-service/app/inframetrics" - "github.com/SigNoz/signoz/pkg/query-service/app/integrations" queues2 "github.com/SigNoz/signoz/pkg/query-service/app/integrations/messagingQueues/queues" "github.com/SigNoz/signoz/pkg/query-service/app/integrations/thirdPartyApi" "github.com/SigNoz/signoz/pkg/query-service/app/logs" @@ -1082,14 +1082,14 @@ func (aH *APIHandler) getDashboards(w http.ResponseWriter, r *http.Request) { } ic := aH.IntegrationsController - installedIntegrationDashboards, err := ic.GetDashboardsForInstalledIntegrations(r.Context()) + installedIntegrationDashboards, err := ic.GetDashboardsForInstalledIntegrations(r.Context(), claims.OrgID) if err != nil { zap.L().Error("failed to get dashboards for installed integrations", zap.Error(err)) } else { allDashboards = append(allDashboards, installedIntegrationDashboards...) } - cloudIntegrationDashboards, err := aH.CloudIntegrationsController.AvailableDashboards(r.Context()) + cloudIntegrationDashboards, err := aH.CloudIntegrationsController.AvailableDashboards(r.Context(), claims.OrgID) if err != nil { zap.L().Error("failed to get cloud dashboards", zap.Error(err)) } else { @@ -1267,7 +1267,7 @@ func (aH *APIHandler) getDashboard(w http.ResponseWriter, r *http.Request) { if aH.CloudIntegrationsController.IsCloudIntegrationDashboardUuid(uuid) { dashboard, apiError = aH.CloudIntegrationsController.GetDashboardById( - r.Context(), uuid, + r.Context(), claims.OrgID, uuid, ) if apiError != nil { RespondError(w, apiError, nil) @@ -1276,7 +1276,7 @@ func (aH *APIHandler) getDashboard(w http.ResponseWriter, r *http.Request) { } else { dashboard, apiError = aH.IntegrationsController.GetInstalledIntegrationDashboardById( - r.Context(), uuid, + r.Context(), claims.OrgID, uuid, ) if apiError != nil { RespondError(w, apiError, nil) @@ -2207,6 +2207,11 @@ func (aH *APIHandler) editUser(w http.ResponseWriter, r *http.Request) { old.ProfilePictureURL = update.ProfilePictureURL } + if slices.Contains(types.AllIntegrationUserEmails, types.IntegrationUserEmail(old.Email)) { + render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, "integration user cannot be updated")) + return + } + _, apiErr = dao.DB().EditUser(ctx, &types.User{ ID: old.ID, Name: old.Name, @@ -2238,6 +2243,11 @@ func (aH *APIHandler) deleteUser(w http.ResponseWriter, r *http.Request) { return } + if slices.Contains(types.AllIntegrationUserEmails, types.IntegrationUserEmail(user.Email)) { + render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, "integration user cannot be updated")) + return + } + if user == nil { RespondError(w, &model.ApiError{ Typ: model.ErrorNotFound, @@ -3497,9 +3507,14 @@ func (aH *APIHandler) ListIntegrations( for k, values := range r.URL.Query() { params[k] = values[0] } + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } resp, apiErr := aH.IntegrationsController.ListIntegrations( - r.Context(), params, + r.Context(), claims.OrgID, params, ) if apiErr != nil { RespondError(w, apiErr, "Failed to fetch integrations") @@ -3512,8 +3527,13 @@ func (aH *APIHandler) GetIntegration( w http.ResponseWriter, r *http.Request, ) { integrationId := mux.Vars(r)["integrationId"] + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } integration, apiErr := aH.IntegrationsController.GetIntegration( - r.Context(), integrationId, + r.Context(), claims.OrgID, integrationId, ) if apiErr != nil { RespondError(w, apiErr, "Failed to fetch integration details") @@ -3527,8 +3547,13 @@ func (aH *APIHandler) GetIntegrationConnectionStatus( w http.ResponseWriter, r *http.Request, ) { integrationId := mux.Vars(r)["integrationId"] + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } isInstalled, apiErr := aH.IntegrationsController.IsIntegrationInstalled( - r.Context(), integrationId, + r.Context(), claims.OrgID, integrationId, ) if apiErr != nil { RespondError(w, apiErr, "failed to check if integration is installed") @@ -3542,7 +3567,7 @@ func (aH *APIHandler) GetIntegrationConnectionStatus( } connectionTests, apiErr := aH.IntegrationsController.GetIntegrationConnectionTests( - r.Context(), integrationId, + r.Context(), claims.OrgID, integrationId, ) if apiErr != nil { RespondError(w, apiErr, "failed to fetch integration connection tests") @@ -3741,8 +3766,14 @@ func (aH *APIHandler) InstallIntegration( return } + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } + integration, apiErr := aH.IntegrationsController.Install( - r.Context(), &req, + r.Context(), claims.OrgID, &req, ) if apiErr != nil { RespondError(w, apiErr, nil) @@ -3763,7 +3794,13 @@ func (aH *APIHandler) UninstallIntegration( return } - apiErr := aH.IntegrationsController.Uninstall(r.Context(), &req) + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } + + apiErr := aH.IntegrationsController.Uninstall(r.Context(), claims.OrgID, &req) if apiErr != nil { RespondError(w, apiErr, nil) return @@ -3819,8 +3856,14 @@ func (aH *APIHandler) CloudIntegrationsListConnectedAccounts( ) { cloudProvider := mux.Vars(r)["cloudProvider"] + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } + resp, apiErr := aH.CloudIntegrationsController.ListConnectedAccounts( - r.Context(), cloudProvider, + r.Context(), claims.OrgID, cloudProvider, ) if apiErr != nil { @@ -3841,8 +3884,14 @@ func (aH *APIHandler) CloudIntegrationsGenerateConnectionUrl( return } + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } + result, apiErr := aH.CloudIntegrationsController.GenerateConnectionUrl( - r.Context(), cloudProvider, req, + r.Context(), claims.OrgID, cloudProvider, req, ) if apiErr != nil { @@ -3859,8 +3908,14 @@ func (aH *APIHandler) CloudIntegrationsGetAccountStatus( cloudProvider := mux.Vars(r)["cloudProvider"] accountId := mux.Vars(r)["accountId"] + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } + resp, apiErr := aH.CloudIntegrationsController.GetAccountStatus( - r.Context(), cloudProvider, accountId, + r.Context(), claims.OrgID, cloudProvider, accountId, ) if apiErr != nil { @@ -3881,8 +3936,14 @@ func (aH *APIHandler) CloudIntegrationsAgentCheckIn( return } + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } + result, apiErr := aH.CloudIntegrationsController.CheckInAsAgent( - r.Context(), cloudProvider, req, + r.Context(), claims.OrgID, cloudProvider, req, ) if apiErr != nil { @@ -3905,8 +3966,14 @@ func (aH *APIHandler) CloudIntegrationsUpdateAccountConfig( return } + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } + result, apiErr := aH.CloudIntegrationsController.UpdateAccountConfig( - r.Context(), cloudProvider, accountId, req, + r.Context(), claims.OrgID, cloudProvider, accountId, req, ) if apiErr != nil { @@ -3923,8 +3990,14 @@ func (aH *APIHandler) CloudIntegrationsDisconnectAccount( cloudProvider := mux.Vars(r)["cloudProvider"] accountId := mux.Vars(r)["accountId"] + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } + result, apiErr := aH.CloudIntegrationsController.DisconnectAccount( - r.Context(), cloudProvider, accountId, + r.Context(), claims.OrgID, cloudProvider, accountId, ) if apiErr != nil { @@ -3947,8 +4020,14 @@ func (aH *APIHandler) CloudIntegrationsListServices( cloudAccountId = &cloudAccountIdQP } + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } + resp, apiErr := aH.CloudIntegrationsController.ListServices( - r.Context(), cloudProvider, cloudAccountId, + r.Context(), claims.OrgID, cloudProvider, cloudAccountId, ) if apiErr != nil { @@ -3971,8 +4050,14 @@ func (aH *APIHandler) CloudIntegrationsGetServiceDetails( cloudAccountId = &cloudAccountIdQP } + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } + resp, apiErr := aH.CloudIntegrationsController.GetServiceDetails( - r.Context(), cloudProvider, serviceId, cloudAccountId, + r.Context(), claims.OrgID, cloudProvider, serviceId, cloudAccountId, ) if apiErr != nil { RespondError(w, apiErr, nil) @@ -4211,8 +4296,14 @@ func (aH *APIHandler) CloudIntegrationsUpdateServiceConfig( return } + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated")) + return + } + result, apiErr := aH.CloudIntegrationsController.UpdateServiceConfig( - r.Context(), cloudProvider, serviceId, req, + r.Context(), claims.OrgID, cloudProvider, serviceId, req, ) if apiErr != nil { diff --git a/pkg/query-service/app/integrations/controller.go b/pkg/query-service/app/integrations/controller.go index d2fbd7c552..b62e1c7319 100644 --- a/pkg/query-service/app/integrations/controller.go +++ b/pkg/query-service/app/integrations/controller.go @@ -18,7 +18,7 @@ type Controller struct { func NewController(sqlStore sqlstore.SQLStore) ( *Controller, error, ) { - mgr, err := NewManager(sqlStore.SQLxDB()) + mgr, err := NewManager(sqlStore) if err != nil { return nil, fmt.Errorf("couldn't create integrations manager: %w", err) } @@ -35,7 +35,7 @@ type IntegrationsListResponse struct { } func (c *Controller) ListIntegrations( - ctx context.Context, params map[string]string, + ctx context.Context, orgId string, params map[string]string, ) ( *IntegrationsListResponse, *model.ApiError, ) { @@ -47,7 +47,7 @@ func (c *Controller) ListIntegrations( } } - integrations, apiErr := c.mgr.ListIntegrations(ctx, filters) + integrations, apiErr := c.mgr.ListIntegrations(ctx, orgId, filters) if apiErr != nil { return nil, apiErr } @@ -58,16 +58,15 @@ func (c *Controller) ListIntegrations( } func (c *Controller) GetIntegration( - ctx context.Context, integrationId string, + ctx context.Context, orgId string, integrationId string, ) (*Integration, *model.ApiError) { - return c.mgr.GetIntegration(ctx, integrationId) + return c.mgr.GetIntegration(ctx, orgId, integrationId) } func (c *Controller) IsIntegrationInstalled( - ctx context.Context, - integrationId string, + ctx context.Context, orgId string, integrationId string, ) (bool, *model.ApiError) { - installation, apiErr := c.mgr.getInstalledIntegration(ctx, integrationId) + installation, apiErr := c.mgr.getInstalledIntegration(ctx, orgId, integrationId) if apiErr != nil { return false, apiErr } @@ -76,9 +75,9 @@ func (c *Controller) IsIntegrationInstalled( } func (c *Controller) GetIntegrationConnectionTests( - ctx context.Context, integrationId string, + ctx context.Context, orgId string, integrationId string, ) (*IntegrationConnectionTests, *model.ApiError) { - return c.mgr.GetIntegrationConnectionTests(ctx, integrationId) + return c.mgr.GetIntegrationConnectionTests(ctx, orgId, integrationId) } type InstallIntegrationRequest struct { @@ -87,10 +86,10 @@ type InstallIntegrationRequest struct { } func (c *Controller) Install( - ctx context.Context, req *InstallIntegrationRequest, + ctx context.Context, orgId string, req *InstallIntegrationRequest, ) (*IntegrationsListItem, *model.ApiError) { res, apiErr := c.mgr.InstallIntegration( - ctx, req.IntegrationId, req.Config, + ctx, orgId, req.IntegrationId, req.Config, ) if apiErr != nil { return nil, apiErr @@ -104,7 +103,7 @@ type UninstallIntegrationRequest struct { } func (c *Controller) Uninstall( - ctx context.Context, req *UninstallIntegrationRequest, + ctx context.Context, orgId string, req *UninstallIntegrationRequest, ) *model.ApiError { if len(req.IntegrationId) < 1 { return model.BadRequest(fmt.Errorf( @@ -113,7 +112,7 @@ func (c *Controller) Uninstall( } apiErr := c.mgr.UninstallIntegration( - ctx, req.IntegrationId, + ctx, orgId, req.IntegrationId, ) if apiErr != nil { return apiErr @@ -123,19 +122,19 @@ func (c *Controller) Uninstall( } func (c *Controller) GetPipelinesForInstalledIntegrations( - ctx context.Context, + ctx context.Context, orgId string, ) ([]pipelinetypes.GettablePipeline, *model.ApiError) { - return c.mgr.GetPipelinesForInstalledIntegrations(ctx) + return c.mgr.GetPipelinesForInstalledIntegrations(ctx, orgId) } func (c *Controller) GetDashboardsForInstalledIntegrations( - ctx context.Context, + ctx context.Context, orgId string, ) ([]types.Dashboard, *model.ApiError) { - return c.mgr.GetDashboardsForInstalledIntegrations(ctx) + return c.mgr.GetDashboardsForInstalledIntegrations(ctx, orgId) } func (c *Controller) GetInstalledIntegrationDashboardById( - ctx context.Context, dashboardUuid string, + ctx context.Context, orgId string, dashboardUuid string, ) (*types.Dashboard, *model.ApiError) { - return c.mgr.GetInstalledIntegrationDashboardById(ctx, dashboardUuid) + return c.mgr.GetInstalledIntegrationDashboardById(ctx, orgId, dashboardUuid) } diff --git a/pkg/query-service/app/integrations/manager.go b/pkg/query-service/app/integrations/manager.go index e9db5c7c3b..e4f5128539 100644 --- a/pkg/query-service/app/integrations/manager.go +++ b/pkg/query-service/app/integrations/manager.go @@ -5,15 +5,14 @@ import ( "fmt" "slices" "strings" - "time" "github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/rules" "github.com/SigNoz/signoz/pkg/query-service/utils" + "github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/SigNoz/signoz/pkg/valuer" - "github.com/jmoiron/sqlx" ) type IntegrationAuthor struct { @@ -105,16 +104,9 @@ type IntegrationsListItem struct { IsInstalled bool `json:"is_installed"` } -type InstalledIntegration struct { - IntegrationId string `json:"integration_id" db:"integration_id"` - Config InstalledIntegrationConfig `json:"config_json" db:"config_json"` - InstalledAt time.Time `json:"installed_at" db:"installed_at"` -} -type InstalledIntegrationConfig map[string]interface{} - type Integration struct { IntegrationDetails - Installation *InstalledIntegration `json:"installation"` + Installation *types.InstalledIntegration `json:"installation"` } type Manager struct { @@ -122,8 +114,8 @@ type Manager struct { installedIntegrationsRepo InstalledIntegrationsRepo } -func NewManager(db *sqlx.DB) (*Manager, error) { - iiRepo, err := NewInstalledIntegrationsSqliteRepo(db) +func NewManager(store sqlstore.SQLStore) (*Manager, error) { + iiRepo, err := NewInstalledIntegrationsSqliteRepo(store) if err != nil { return nil, fmt.Errorf( "could not init sqlite DB for installed integrations: %w", err, @@ -142,6 +134,7 @@ type IntegrationsFilter struct { func (m *Manager) ListIntegrations( ctx context.Context, + orgId string, filter *IntegrationsFilter, // Expected to have pagination over time. ) ([]IntegrationsListItem, *model.ApiError) { @@ -152,22 +145,22 @@ func (m *Manager) ListIntegrations( ) } - installed, apiErr := m.installedIntegrationsRepo.list(ctx) + installed, apiErr := m.installedIntegrationsRepo.list(ctx, orgId) if apiErr != nil { return nil, model.WrapApiError( apiErr, "could not fetch installed integrations", ) } - installedIds := []string{} + installedTypes := []string{} for _, ii := range installed { - installedIds = append(installedIds, ii.IntegrationId) + installedTypes = append(installedTypes, ii.Type) } result := []IntegrationsListItem{} for _, ai := range available { result = append(result, IntegrationsListItem{ IntegrationSummary: ai.IntegrationSummary, - IsInstalled: slices.Contains(installedIds, ai.Id), + IsInstalled: slices.Contains(installedTypes, ai.Id), }) } @@ -188,6 +181,7 @@ func (m *Manager) ListIntegrations( func (m *Manager) GetIntegration( ctx context.Context, + orgId string, integrationId string, ) (*Integration, *model.ApiError) { integrationDetails, apiErr := m.getIntegrationDetails( @@ -198,7 +192,7 @@ func (m *Manager) GetIntegration( } installation, apiErr := m.getInstalledIntegration( - ctx, integrationId, + ctx, orgId, integrationId, ) if apiErr != nil { return nil, apiErr @@ -212,6 +206,7 @@ func (m *Manager) GetIntegration( func (m *Manager) GetIntegrationConnectionTests( ctx context.Context, + orgId string, integrationId string, ) (*IntegrationConnectionTests, *model.ApiError) { integrationDetails, apiErr := m.getIntegrationDetails( @@ -225,8 +220,9 @@ func (m *Manager) GetIntegrationConnectionTests( func (m *Manager) InstallIntegration( ctx context.Context, + orgId string, integrationId string, - config InstalledIntegrationConfig, + config types.InstalledIntegrationConfig, ) (*IntegrationsListItem, *model.ApiError) { integrationDetails, apiErr := m.getIntegrationDetails(ctx, integrationId) if apiErr != nil { @@ -234,7 +230,7 @@ func (m *Manager) InstallIntegration( } _, apiErr = m.installedIntegrationsRepo.upsert( - ctx, integrationId, config, + ctx, orgId, integrationId, config, ) if apiErr != nil { return nil, model.WrapApiError( @@ -250,15 +246,17 @@ func (m *Manager) InstallIntegration( func (m *Manager) UninstallIntegration( ctx context.Context, + orgId string, integrationId string, ) *model.ApiError { - return m.installedIntegrationsRepo.delete(ctx, integrationId) + return m.installedIntegrationsRepo.delete(ctx, orgId, integrationId) } func (m *Manager) GetPipelinesForInstalledIntegrations( ctx context.Context, + orgId string, ) ([]pipelinetypes.GettablePipeline, *model.ApiError) { - installedIntegrations, apiErr := m.getInstalledIntegrations(ctx) + installedIntegrations, apiErr := m.getInstalledIntegrations(ctx, orgId) if apiErr != nil { return nil, apiErr } @@ -308,6 +306,7 @@ func (m *Manager) parseDashboardUuid(dashboardUuid string) ( func (m *Manager) GetInstalledIntegrationDashboardById( ctx context.Context, + orgId string, dashboardUuid string, ) (*types.Dashboard, *model.ApiError) { integrationId, dashboardId, apiErr := m.parseDashboardUuid(dashboardUuid) @@ -315,7 +314,7 @@ func (m *Manager) GetInstalledIntegrationDashboardById( return nil, apiErr } - integration, apiErr := m.GetIntegration(ctx, integrationId) + integration, apiErr := m.GetIntegration(ctx, orgId, integrationId) if apiErr != nil { return nil, apiErr } @@ -355,8 +354,9 @@ func (m *Manager) GetInstalledIntegrationDashboardById( func (m *Manager) GetDashboardsForInstalledIntegrations( ctx context.Context, + orgId string, ) ([]types.Dashboard, *model.ApiError) { - installedIntegrations, apiErr := m.getInstalledIntegrations(ctx) + installedIntegrations, apiErr := m.getInstalledIntegrations(ctx, orgId) if apiErr != nil { return nil, apiErr } @@ -421,10 +421,11 @@ func (m *Manager) getIntegrationDetails( func (m *Manager) getInstalledIntegration( ctx context.Context, + orgId string, integrationId string, -) (*InstalledIntegration, *model.ApiError) { +) (*types.InstalledIntegration, *model.ApiError) { iis, apiErr := m.installedIntegrationsRepo.get( - ctx, []string{integrationId}, + ctx, orgId, []string{integrationId}, ) if apiErr != nil { return nil, model.WrapApiError(apiErr, fmt.Sprintf( @@ -441,32 +442,33 @@ func (m *Manager) getInstalledIntegration( func (m *Manager) getInstalledIntegrations( ctx context.Context, + orgId string, ) ( map[string]Integration, *model.ApiError, ) { - installations, apiErr := m.installedIntegrationsRepo.list(ctx) + installations, apiErr := m.installedIntegrationsRepo.list(ctx, orgId) if apiErr != nil { return nil, apiErr } - installedIds := utils.MapSlice(installations, func(i InstalledIntegration) string { - return i.IntegrationId + installedTypes := utils.MapSlice(installations, func(i types.InstalledIntegration) string { + return i.Type }) - integrationDetails, apiErr := m.availableIntegrationsRepo.get(ctx, installedIds) + integrationDetails, apiErr := m.availableIntegrationsRepo.get(ctx, installedTypes) if apiErr != nil { return nil, apiErr } result := map[string]Integration{} for _, ii := range installations { - iDetails, exists := integrationDetails[ii.IntegrationId] + iDetails, exists := integrationDetails[ii.Type] if !exists { return nil, model.InternalError(fmt.Errorf( - "couldn't find integration details for %s", ii.IntegrationId, + "couldn't find integration details for %s", ii.Type, )) } - result[ii.IntegrationId] = Integration{ + result[ii.Type] = Integration{ Installation: &ii, IntegrationDetails: iDetails, } diff --git a/pkg/query-service/app/integrations/manager_test.go b/pkg/query-service/app/integrations/manager_test.go index 08dd50b255..a69c68d11a 100644 --- a/pkg/query-service/app/integrations/manager_test.go +++ b/pkg/query-service/app/integrations/manager_test.go @@ -14,18 +14,23 @@ func TestIntegrationLifecycle(t *testing.T) { mgr := NewTestIntegrationsManager(t) ctx := context.Background() + user, apiErr := createTestUser() + if apiErr != nil { + t.Fatalf("could not create test user: %v", apiErr) + } + ii := true installedIntegrationsFilter := &IntegrationsFilter{ IsInstalled: &ii, } installedIntegrations, apiErr := mgr.ListIntegrations( - ctx, installedIntegrationsFilter, + ctx, user.OrgID, installedIntegrationsFilter, ) require.Nil(apiErr) require.Equal([]IntegrationsListItem{}, installedIntegrations) - availableIntegrations, apiErr := mgr.ListIntegrations(ctx, nil) + availableIntegrations, apiErr := mgr.ListIntegrations(ctx, user.OrgID, nil) require.Nil(apiErr) require.Equal(2, len(availableIntegrations)) require.False(availableIntegrations[0].IsInstalled) @@ -33,44 +38,44 @@ func TestIntegrationLifecycle(t *testing.T) { testIntegrationConfig := map[string]interface{}{} installed, apiErr := mgr.InstallIntegration( - ctx, availableIntegrations[1].Id, testIntegrationConfig, + ctx, user.OrgID, availableIntegrations[1].Id, testIntegrationConfig, ) require.Nil(apiErr) require.Equal(installed.Id, availableIntegrations[1].Id) - integration, apiErr := mgr.GetIntegration(ctx, availableIntegrations[1].Id) + integration, apiErr := mgr.GetIntegration(ctx, user.OrgID, availableIntegrations[1].Id) require.Nil(apiErr) require.Equal(integration.Id, availableIntegrations[1].Id) require.NotNil(integration.Installation) installedIntegrations, apiErr = mgr.ListIntegrations( - ctx, installedIntegrationsFilter, + ctx, user.OrgID, installedIntegrationsFilter, ) require.Nil(apiErr) require.Equal(1, len(installedIntegrations)) require.Equal(availableIntegrations[1].Id, installedIntegrations[0].Id) - availableIntegrations, apiErr = mgr.ListIntegrations(ctx, nil) + availableIntegrations, apiErr = mgr.ListIntegrations(ctx, user.OrgID, nil) require.Nil(apiErr) require.Equal(2, len(availableIntegrations)) require.False(availableIntegrations[0].IsInstalled) require.True(availableIntegrations[1].IsInstalled) - apiErr = mgr.UninstallIntegration(ctx, installed.Id) + apiErr = mgr.UninstallIntegration(ctx, user.OrgID, installed.Id) require.Nil(apiErr) - integration, apiErr = mgr.GetIntegration(ctx, availableIntegrations[1].Id) + integration, apiErr = mgr.GetIntegration(ctx, user.OrgID, availableIntegrations[1].Id) require.Nil(apiErr) require.Equal(integration.Id, availableIntegrations[1].Id) require.Nil(integration.Installation) installedIntegrations, apiErr = mgr.ListIntegrations( - ctx, installedIntegrationsFilter, + ctx, user.OrgID, installedIntegrationsFilter, ) require.Nil(apiErr) require.Equal(0, len(installedIntegrations)) - availableIntegrations, apiErr = mgr.ListIntegrations(ctx, nil) + availableIntegrations, apiErr = mgr.ListIntegrations(ctx, user.OrgID, nil) require.Nil(apiErr) require.Equal(2, len(availableIntegrations)) require.False(availableIntegrations[0].IsInstalled) diff --git a/pkg/query-service/app/integrations/repo.go b/pkg/query-service/app/integrations/repo.go index 690bb8c0cb..84bf440185 100644 --- a/pkg/query-service/app/integrations/repo.go +++ b/pkg/query-service/app/integrations/repo.go @@ -2,51 +2,33 @@ package integrations import ( "context" - "database/sql/driver" - "encoding/json" "github.com/SigNoz/signoz/pkg/query-service/model" - "github.com/pkg/errors" + "github.com/SigNoz/signoz/pkg/types" ) -// For serializing from db -func (c *InstalledIntegrationConfig) Scan(src interface{}) error { - if data, ok := src.([]byte); ok { - return json.Unmarshal(data, &c) - } - return nil -} - -// For serializing to db -func (c *InstalledIntegrationConfig) Value() (driver.Value, error) { - filterSetJson, err := json.Marshal(c) - if err != nil { - return nil, errors.Wrap(err, "could not serialize integration config to JSON") - } - return filterSetJson, nil -} - type InstalledIntegrationsRepo interface { - list(context.Context) ([]InstalledIntegration, *model.ApiError) + list(ctx context.Context, orgId string) ([]types.InstalledIntegration, *model.ApiError) get( - ctx context.Context, integrationIds []string, - ) (map[string]InstalledIntegration, *model.ApiError) + ctx context.Context, orgId string, integrationTypes []string, + ) (map[string]types.InstalledIntegration, *model.ApiError) upsert( ctx context.Context, - integrationId string, - config InstalledIntegrationConfig, - ) (*InstalledIntegration, *model.ApiError) + orgId string, + integrationType string, + config types.InstalledIntegrationConfig, + ) (*types.InstalledIntegration, *model.ApiError) - delete(ctx context.Context, integrationId string) *model.ApiError + delete(ctx context.Context, orgId string, integrationType string) *model.ApiError } type AvailableIntegrationsRepo interface { list(context.Context) ([]IntegrationDetails, *model.ApiError) get( - ctx context.Context, integrationIds []string, + ctx context.Context, integrationTypes []string, ) (map[string]IntegrationDetails, *model.ApiError) // AvailableIntegrationsRepo implementations are expected to cache diff --git a/pkg/query-service/app/integrations/sqlite_repo.go b/pkg/query-service/app/integrations/sqlite_repo.go index 6e2390023c..694d5eb25c 100644 --- a/pkg/query-service/app/integrations/sqlite_repo.go +++ b/pkg/query-service/app/integrations/sqlite_repo.go @@ -3,39 +3,37 @@ package integrations import ( "context" "fmt" - "strings" "github.com/SigNoz/signoz/pkg/query-service/model" - "github.com/jmoiron/sqlx" + "github.com/SigNoz/signoz/pkg/sqlstore" + "github.com/SigNoz/signoz/pkg/types" + "github.com/SigNoz/signoz/pkg/valuer" + "github.com/uptrace/bun" ) type InstalledIntegrationsSqliteRepo struct { - db *sqlx.DB + store sqlstore.SQLStore } -func NewInstalledIntegrationsSqliteRepo(db *sqlx.DB) ( +func NewInstalledIntegrationsSqliteRepo(store sqlstore.SQLStore) ( *InstalledIntegrationsSqliteRepo, error, ) { return &InstalledIntegrationsSqliteRepo{ - db: db, + store: store, }, nil } func (r *InstalledIntegrationsSqliteRepo) list( ctx context.Context, -) ([]InstalledIntegration, *model.ApiError) { - integrations := []InstalledIntegration{} + orgId string, +) ([]types.InstalledIntegration, *model.ApiError) { + integrations := []types.InstalledIntegration{} - err := r.db.SelectContext( - ctx, &integrations, ` - select - integration_id, - config_json, - installed_at - from integrations_installed - order by installed_at - `, - ) + err := r.store.BunDB().NewSelect(). + Model(&integrations). + Where("org_id = ?", orgId). + Order("installed_at"). + Scan(ctx) if err != nil { return nil, model.InternalError(fmt.Errorf( "could not query installed integrations: %w", err, @@ -45,38 +43,28 @@ func (r *InstalledIntegrationsSqliteRepo) list( } func (r *InstalledIntegrationsSqliteRepo) get( - ctx context.Context, integrationIds []string, -) (map[string]InstalledIntegration, *model.ApiError) { - integrations := []InstalledIntegration{} + ctx context.Context, orgId string, integrationTypes []string, +) (map[string]types.InstalledIntegration, *model.ApiError) { + integrations := []types.InstalledIntegration{} - idPlaceholders := []string{} - idValues := []interface{}{} - for _, id := range integrationIds { - idPlaceholders = append(idPlaceholders, "?") - idValues = append(idValues, id) + typeValues := []interface{}{} + for _, integrationType := range integrationTypes { + typeValues = append(typeValues, integrationType) } - err := r.db.SelectContext( - ctx, &integrations, fmt.Sprintf(` - select - integration_id, - config_json, - installed_at - from integrations_installed - where integration_id in (%s)`, - strings.Join(idPlaceholders, ", "), - ), - idValues..., - ) + err := r.store.BunDB().NewSelect().Model(&integrations). + Where("org_id = ?", orgId). + Where("type IN (?)", bun.In(typeValues)). + Scan(ctx) if err != nil { return nil, model.InternalError(fmt.Errorf( "could not query installed integrations: %w", err, )) } - result := map[string]InstalledIntegration{} + result := map[string]types.InstalledIntegration{} for _, ii := range integrations { - result[ii.IntegrationId] = ii + result[ii.Type] = ii } return result, nil @@ -84,55 +72,57 @@ func (r *InstalledIntegrationsSqliteRepo) get( func (r *InstalledIntegrationsSqliteRepo) upsert( ctx context.Context, - integrationId string, - config InstalledIntegrationConfig, -) (*InstalledIntegration, *model.ApiError) { - serializedConfig, err := config.Value() - if err != nil { - return nil, model.BadRequest(fmt.Errorf( - "could not serialize integration config: %w", err, - )) + orgId string, + integrationType string, + config types.InstalledIntegrationConfig, +) (*types.InstalledIntegration, *model.ApiError) { + + integration := types.InstalledIntegration{ + Identifiable: types.Identifiable{ + ID: valuer.GenerateUUID(), + }, + OrgID: orgId, + Type: integrationType, + Config: config, } - _, dbErr := r.db.ExecContext( - ctx, ` - INSERT INTO integrations_installed ( - integration_id, - config_json - ) values ($1, $2) - on conflict(integration_id) do update - set config_json=excluded.config_json - `, integrationId, serializedConfig, - ) + _, dbErr := r.store.BunDB().NewInsert(). + Model(&integration). + On("conflict (type, org_id) DO UPDATE"). + Set("config = EXCLUDED.config"). + Exec(ctx) + if dbErr != nil { return nil, model.InternalError(fmt.Errorf( "could not insert record for integration installation: %w", dbErr, )) } - res, apiErr := r.get(ctx, []string{integrationId}) + res, apiErr := r.get(ctx, orgId, []string{integrationType}) if apiErr != nil || len(res) < 1 { return nil, model.WrapApiError( apiErr, "could not fetch installed integration", ) } - installed := res[integrationId] + installed := res[integrationType] return &installed, nil } func (r *InstalledIntegrationsSqliteRepo) delete( - ctx context.Context, integrationId string, + ctx context.Context, orgId string, integrationType string, ) *model.ApiError { - _, dbErr := r.db.ExecContext(ctx, ` - DELETE FROM integrations_installed where integration_id = ? - `, integrationId) + _, dbErr := r.store.BunDB().NewDelete(). + Model(&types.InstalledIntegration{}). + Where("type = ?", integrationType). + Where("org_id = ?", orgId). + Exec(ctx) if dbErr != nil { return model.InternalError(fmt.Errorf( "could not delete installed integration record for %s: %w", - integrationId, dbErr, + integrationType, dbErr, )) } diff --git a/pkg/query-service/app/integrations/test_utils.go b/pkg/query-service/app/integrations/test_utils.go index 03b0a536bc..178ad75f45 100644 --- a/pkg/query-service/app/integrations/test_utils.go +++ b/pkg/query-service/app/integrations/test_utils.go @@ -5,18 +5,22 @@ import ( "slices" "testing" + "github.com/SigNoz/signoz/pkg/query-service/auth" + "github.com/SigNoz/signoz/pkg/query-service/constants" + "github.com/SigNoz/signoz/pkg/query-service/dao" "github.com/SigNoz/signoz/pkg/query-service/model" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" "github.com/SigNoz/signoz/pkg/query-service/rules" "github.com/SigNoz/signoz/pkg/query-service/utils" "github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types/pipelinetypes" + "github.com/google/uuid" ) func NewTestIntegrationsManager(t *testing.T) *Manager { testDB := utils.NewQueryServiceDBForTests(t) - installedIntegrationsRepo, err := NewInstalledIntegrationsSqliteRepo(testDB.SQLxDB()) + installedIntegrationsRepo, err := NewInstalledIntegrationsSqliteRepo(testDB) if err != nil { t.Fatalf("could not init sqlite DB for installed integrations: %v", err) } @@ -27,6 +31,38 @@ func NewTestIntegrationsManager(t *testing.T) *Manager { } } +func createTestUser() (*types.User, *model.ApiError) { + // Create a test user for auth + ctx := context.Background() + org, apiErr := dao.DB().CreateOrg(ctx, &types.Organization{ + Name: "test", + }) + if apiErr != nil { + return nil, apiErr + } + + group, apiErr := dao.DB().GetGroupByName(ctx, constants.AdminGroup) + if apiErr != nil { + return nil, apiErr + } + + auth.InitAuthCache(ctx) + + userId := uuid.NewString() + return dao.DB().CreateUser( + ctx, + &types.User{ + ID: userId, + Name: "test", + Email: userId[:8] + "test@test.com", + Password: "test", + OrgID: org.ID, + GroupID: group.ID, + }, + true, + ) +} + type TestAvailableIntegrationsRepo struct{} func (t *TestAvailableIntegrationsRepo) list( diff --git a/pkg/query-service/app/logparsingpipeline/controller.go b/pkg/query-service/app/logparsingpipeline/controller.go index 1431fbb0cf..4b66fbb61c 100644 --- a/pkg/query-service/app/logparsingpipeline/controller.go +++ b/pkg/query-service/app/logparsingpipeline/controller.go @@ -25,12 +25,12 @@ import ( type LogParsingPipelineController struct { Repo - GetIntegrationPipelines func(context.Context) ([]pipelinetypes.GettablePipeline, *model.ApiError) + GetIntegrationPipelines func(context.Context, string) ([]pipelinetypes.GettablePipeline, *model.ApiError) } func NewLogParsingPipelinesController( sqlStore sqlstore.SQLStore, - getIntegrationPipelines func(context.Context) ([]pipelinetypes.GettablePipeline, *model.ApiError), + getIntegrationPipelines func(context.Context, string) ([]pipelinetypes.GettablePipeline, *model.ApiError), ) (*LogParsingPipelineController, error) { repo := NewRepo(sqlStore) return &LogParsingPipelineController{ @@ -164,7 +164,7 @@ func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion( result = savedPipelines } - integrationPipelines, apiErr := ic.GetIntegrationPipelines(ctx) + integrationPipelines, apiErr := ic.GetIntegrationPipelines(ctx, defaultOrgID) if apiErr != nil { return nil, model.WrapApiError( apiErr, "could not get pipelines for installed integrations", diff --git a/pkg/query-service/auth/auth.go b/pkg/query-service/auth/auth.go index 2b4e349d5a..d090c21403 100644 --- a/pkg/query-service/auth/auth.go +++ b/pkg/query-service/auth/auth.go @@ -439,7 +439,7 @@ func RegisterFirstUser(ctx context.Context, req *RegisterRequest) (*types.User, } user := &types.User{ - ID: uuid.NewString(), + ID: uuid.New().String(), Name: req.Name, Email: req.Email, Password: hash, @@ -519,7 +519,7 @@ func RegisterInvitedUser(ctx context.Context, req *RegisterRequest, nopassword b } user := &types.User{ - ID: uuid.NewString(), + ID: uuid.New().String(), Name: req.Name, Email: req.Email, Password: hash, diff --git a/pkg/query-service/auth/rbac.go b/pkg/query-service/auth/rbac.go index caaf3ce41e..9ffc02276c 100644 --- a/pkg/query-service/auth/rbac.go +++ b/pkg/query-service/auth/rbac.go @@ -3,6 +3,7 @@ package auth import ( "context" + errorsV2 "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/query-service/constants" "github.com/SigNoz/signoz/pkg/query-service/dao" "github.com/SigNoz/signoz/pkg/types" @@ -51,7 +52,7 @@ func InitAuthCache(ctx context.Context) error { func GetUserFromReqContext(ctx context.Context) (*types.GettableUser, error) { claims, ok := authtypes.ClaimsFromContext(ctx) if !ok { - return nil, errors.New("no claims found in context") + return nil, errorsV2.New(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, "no claims found in context") } user := &types.GettableUser{ diff --git a/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go b/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go index b5932de8cd..1a4268c90b 100644 --- a/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go +++ b/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go @@ -35,7 +35,7 @@ func TestAWSIntegrationAccountLifecycle(t *testing.T) { ) // Should be able to generate a connection url from UI - initializing an integration account - testAccountConfig := cloudintegrations.AccountConfig{ + testAccountConfig := types.AccountConfig{ EnabledRegions: []string{"us-east-1", "us-east-2"}, } connectionUrlResp := testbed.GenerateConnectionUrlFromQS( @@ -65,8 +65,8 @@ func TestAWSIntegrationAccountLifecycle(t *testing.T) { testAWSAccountId := "4563215233" agentCheckInResp := testbed.CheckInAsAgentWithQS( "aws", cloudintegrations.AgentCheckInRequest{ - AccountId: testAccountId, - CloudAccountId: testAWSAccountId, + ID: testAccountId, + AccountID: testAWSAccountId, }, ) require.Equal(testAccountId, agentCheckInResp.AccountId) @@ -91,20 +91,20 @@ func TestAWSIntegrationAccountLifecycle(t *testing.T) { require.Equal(testAWSAccountId, accountsListResp2.Accounts[0].CloudAccountId) // Should be able to update account config from UI - testAccountConfig2 := cloudintegrations.AccountConfig{ + testAccountConfig2 := types.AccountConfig{ EnabledRegions: []string{"us-east-2", "us-west-1"}, } latestAccount := testbed.UpdateAccountConfigWithQS( "aws", testAccountId, testAccountConfig2, ) - require.Equal(testAccountId, latestAccount.Id) + require.Equal(testAccountId, latestAccount.ID.StringValue()) require.Equal(testAccountConfig2, *latestAccount.Config) // The agent should now receive latest account config. agentCheckInResp1 := testbed.CheckInAsAgentWithQS( "aws", cloudintegrations.AgentCheckInRequest{ - AccountId: testAccountId, - CloudAccountId: testAWSAccountId, + ID: testAccountId, + AccountID: testAWSAccountId, }, ) require.Equal(testAccountId, agentCheckInResp1.AccountId) @@ -114,14 +114,14 @@ func TestAWSIntegrationAccountLifecycle(t *testing.T) { // Should be able to disconnect/remove account from UI. tsBeforeDisconnect := time.Now() latestAccount = testbed.DisconnectAccountWithQS("aws", testAccountId) - require.Equal(testAccountId, latestAccount.Id) + require.Equal(testAccountId, latestAccount.ID.StringValue()) require.LessOrEqual(tsBeforeDisconnect, *latestAccount.RemovedAt) // The agent should receive the disconnected status in account config post disconnection agentCheckInResp2 := testbed.CheckInAsAgentWithQS( "aws", cloudintegrations.AgentCheckInRequest{ - AccountId: testAccountId, - CloudAccountId: testAWSAccountId, + ID: testAccountId, + AccountID: testAWSAccountId, }, ) require.Equal(testAccountId, agentCheckInResp2.AccountId) @@ -157,13 +157,13 @@ func TestAWSIntegrationServices(t *testing.T) { testAWSAccountId := "389389489489" testbed.CheckInAsAgentWithQS( "aws", cloudintegrations.AgentCheckInRequest{ - AccountId: testAccountId, - CloudAccountId: testAWSAccountId, + ID: testAccountId, + AccountID: testAWSAccountId, }, ) - testSvcConfig := cloudintegrations.CloudServiceConfig{ - Metrics: &cloudintegrations.CloudServiceMetricsConfig{ + testSvcConfig := types.CloudServiceConfig{ + Metrics: &types.CloudServiceMetricsConfig{ Enabled: true, }, } @@ -199,7 +199,7 @@ func TestConfigReturnedWhenAgentChecksIn(t *testing.T) { testbed := NewCloudIntegrationsTestBed(t, nil) // configure a connected account - testAccountConfig := cloudintegrations.AccountConfig{ + testAccountConfig := types.AccountConfig{ EnabledRegions: []string{"us-east-1", "us-east-2"}, } connectionUrlResp := testbed.GenerateConnectionUrlFromQS( @@ -218,8 +218,8 @@ func TestConfigReturnedWhenAgentChecksIn(t *testing.T) { testAWSAccountId := "389389489489" checkinResp := testbed.CheckInAsAgentWithQS( "aws", cloudintegrations.AgentCheckInRequest{ - AccountId: testAccountId, - CloudAccountId: testAWSAccountId, + ID: testAccountId, + AccountID: testAWSAccountId, }, ) @@ -237,14 +237,14 @@ func TestConfigReturnedWhenAgentChecksIn(t *testing.T) { // helper setServiceConfig := func(svcId string, metricsEnabled bool, logsEnabled bool) { - testSvcConfig := cloudintegrations.CloudServiceConfig{} + testSvcConfig := types.CloudServiceConfig{} if metricsEnabled { - testSvcConfig.Metrics = &cloudintegrations.CloudServiceMetricsConfig{ + testSvcConfig.Metrics = &types.CloudServiceMetricsConfig{ Enabled: metricsEnabled, } } if logsEnabled { - testSvcConfig.Logs = &cloudintegrations.CloudServiceLogsConfig{ + testSvcConfig.Logs = &types.CloudServiceLogsConfig{ Enabled: logsEnabled, } } @@ -262,8 +262,8 @@ func TestConfigReturnedWhenAgentChecksIn(t *testing.T) { checkinResp = testbed.CheckInAsAgentWithQS( "aws", cloudintegrations.AgentCheckInRequest{ - AccountId: testAccountId, - CloudAccountId: testAWSAccountId, + ID: testAccountId, + AccountID: testAWSAccountId, }, ) @@ -292,13 +292,13 @@ func TestConfigReturnedWhenAgentChecksIn(t *testing.T) { require.True(strings.HasPrefix(logGroupPrefixes[0], "/aws/rds")) // change regions and update service configs and validate config changes for agent - testAccountConfig2 := cloudintegrations.AccountConfig{ + testAccountConfig2 := types.AccountConfig{ EnabledRegions: []string{"us-east-2", "us-west-1"}, } latestAccount := testbed.UpdateAccountConfigWithQS( "aws", testAccountId, testAccountConfig2, ) - require.Equal(testAccountId, latestAccount.Id) + require.Equal(testAccountId, latestAccount.ID.StringValue()) require.Equal(testAccountConfig2, *latestAccount.Config) // disable metrics for one and logs for the other. @@ -308,8 +308,8 @@ func TestConfigReturnedWhenAgentChecksIn(t *testing.T) { checkinResp = testbed.CheckInAsAgentWithQS( "aws", cloudintegrations.AgentCheckInRequest{ - AccountId: testAccountId, - CloudAccountId: testAWSAccountId, + ID: testAccountId, + AccountID: testAWSAccountId, }, ) require.Equal(testAccountId, checkinResp.AccountId) @@ -453,8 +453,8 @@ func (tb *CloudIntegrationsTestBed) CheckInAsAgentWithQS( } func (tb *CloudIntegrationsTestBed) UpdateAccountConfigWithQS( - cloudProvider string, accountId string, newConfig cloudintegrations.AccountConfig, -) *cloudintegrations.AccountRecord { + cloudProvider string, accountId string, newConfig types.AccountConfig, +) *types.CloudIntegration { respDataJson := tb.RequestQS( fmt.Sprintf( "/api/v1/cloud-integrations/%s/accounts/%s/config", @@ -464,7 +464,7 @@ func (tb *CloudIntegrationsTestBed) UpdateAccountConfigWithQS( }, ) - var resp cloudintegrations.AccountRecord + var resp types.CloudIntegration err := json.Unmarshal(respDataJson, &resp) if err != nil { tb.t.Fatalf("could not unmarshal apiResponse.Data json into Account") @@ -475,7 +475,7 @@ func (tb *CloudIntegrationsTestBed) UpdateAccountConfigWithQS( func (tb *CloudIntegrationsTestBed) DisconnectAccountWithQS( cloudProvider string, accountId string, -) *cloudintegrations.AccountRecord { +) *types.CloudIntegration { respDataJson := tb.RequestQS( fmt.Sprintf( "/api/v1/cloud-integrations/%s/accounts/%s/disconnect", @@ -483,7 +483,7 @@ func (tb *CloudIntegrationsTestBed) DisconnectAccountWithQS( ), map[string]any{}, ) - var resp cloudintegrations.AccountRecord + var resp types.CloudIntegration err := json.Unmarshal(respDataJson, &resp) if err != nil { tb.t.Fatalf("could not unmarshal apiResponse.Data json into Account") diff --git a/pkg/query-service/tests/integration/test_utils.go b/pkg/query-service/tests/integration/test_utils.go index 61b549848c..dc550a33fd 100644 --- a/pkg/query-service/tests/integration/test_utils.go +++ b/pkg/query-service/tests/integration/test_utils.go @@ -166,6 +166,7 @@ func createTestUser() (*types.User, *model.ApiError) { auth.InitAuthCache(ctx) userId := uuid.NewString() + return dao.DB().CreateUser( ctx, &types.User{ diff --git a/pkg/query-service/utils/testutils.go b/pkg/query-service/utils/testutils.go index 5342e0ee38..e7fbd4a759 100644 --- a/pkg/query-service/utils/testutils.go +++ b/pkg/query-service/utils/testutils.go @@ -48,10 +48,15 @@ func NewTestSqliteDB(t *testing.T) (sqlStore sqlstore.SQLStore, testDBFilePath s sqlmigration.NewModifyDatetimeFactory(), sqlmigration.NewModifyOrgDomainFactory(), sqlmigration.NewUpdateOrganizationFactory(sqlStore), + sqlmigration.NewAddAlertmanagerFactory(sqlStore), sqlmigration.NewUpdateDashboardAndSavedViewsFactory(sqlStore), sqlmigration.NewUpdatePatAndOrgDomainsFactory(sqlStore), sqlmigration.NewUpdatePipelines(sqlStore), + sqlmigration.NewDropLicensesSitesFactory(sqlStore), + sqlmigration.NewUpdateInvitesFactory(sqlStore), + sqlmigration.NewUpdatePatFactory(sqlStore), sqlmigration.NewAddVirtualFieldsFactory(), + sqlmigration.NewUpdateIntegrationsFactory(sqlStore), ), ) if err != nil { diff --git a/pkg/signoz/provider.go b/pkg/signoz/provider.go index 99d524331f..6b3bee93e8 100644 --- a/pkg/signoz/provider.go +++ b/pkg/signoz/provider.go @@ -70,6 +70,7 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM sqlmigration.NewUpdateApdexTtlFactory(sqlstore), sqlmigration.NewUpdateResetPasswordFactory(sqlstore), sqlmigration.NewAddVirtualFieldsFactory(), + sqlmigration.NewUpdateIntegrationsFactory(sqlstore), ) } diff --git a/pkg/sqlmigration/026_update_integrations.go b/pkg/sqlmigration/026_update_integrations.go new file mode 100644 index 0000000000..e37e6a17b3 --- /dev/null +++ b/pkg/sqlmigration/026_update_integrations.go @@ -0,0 +1,439 @@ +package sqlmigration + +import ( + "context" + "database/sql" + "time" + + "github.com/SigNoz/signoz/pkg/factory" + "github.com/SigNoz/signoz/pkg/sqlstore" + "github.com/SigNoz/signoz/pkg/types" + "github.com/SigNoz/signoz/pkg/valuer" + "github.com/google/uuid" + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" + "go.uber.org/zap" +) + +type updateIntegrations struct { + store sqlstore.SQLStore +} + +func NewUpdateIntegrationsFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] { + return factory.NewProviderFactory(factory.MustNewName("update_integrations"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) { + return newUpdateIntegrations(ctx, ps, c, sqlstore) + }) +} + +func newUpdateIntegrations(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) { + return &updateIntegrations{ + store: store, + }, nil +} + +func (migration *updateIntegrations) Register(migrations *migrate.Migrations) error { + if err := migrations.Register(migration.Up, migration.Down); err != nil { + return err + } + + return nil +} + +type existingInstalledIntegration struct { + bun.BaseModel `bun:"table:integrations_installed"` + + IntegrationID string `bun:"integration_id,pk,type:text"` + ConfigJSON string `bun:"config_json,type:text"` + InstalledAt time.Time `bun:"installed_at,default:current_timestamp"` +} + +type newInstalledIntegration struct { + bun.BaseModel `bun:"table:installed_integration"` + + types.Identifiable + Type string `json:"type" bun:"type,type:text,unique:org_id_type"` + Config string `json:"config" bun:"config,type:text"` + InstalledAt time.Time `json:"installed_at" bun:"installed_at,default:current_timestamp"` + OrgID string `json:"org_id" bun:"org_id,type:text,unique:org_id_type"` +} + +type existingCloudIntegration struct { + bun.BaseModel `bun:"table:cloud_integrations_accounts"` + + CloudProvider string `bun:"cloud_provider,type:text,unique:cloud_provider_id"` + ID string `bun:"id,type:text,notnull,unique:cloud_provider_id"` + ConfigJSON string `bun:"config_json,type:text"` + CloudAccountID string `bun:"cloud_account_id,type:text"` + LastAgentReportJSON string `bun:"last_agent_report_json,type:text"` + CreatedAt time.Time `bun:"created_at,notnull,default:current_timestamp"` + RemovedAt *time.Time `bun:"removed_at,type:timestamp"` +} + +type newCloudIntegration struct { + bun.BaseModel `bun:"table:cloud_integration"` + + types.Identifiable + types.TimeAuditable + Provider string `json:"provider" bun:"provider,type:text"` + Config string `json:"config" bun:"config,type:text"` + AccountID string `json:"account_id" bun:"account_id,type:text"` + LastAgentReport string `json:"last_agent_report" bun:"last_agent_report,type:text"` + RemovedAt *time.Time `json:"removed_at" bun:"removed_at,type:timestamp"` + OrgID string `json:"org_id" bun:"org_id,type:text"` +} + +type existingCloudIntegrationService struct { + bun.BaseModel `bun:"table:cloud_integrations_service_configs,alias:c1"` + + CloudProvider string `bun:"cloud_provider,type:text,notnull,unique:service_cloud_provider_account"` + CloudAccountID string `bun:"cloud_account_id,type:text,notnull,unique:service_cloud_provider_account"` + ServiceID string `bun:"service_id,type:text,notnull,unique:service_cloud_provider_account"` + ConfigJSON string `bun:"config_json,type:text"` + CreatedAt time.Time `bun:"created_at,default:current_timestamp"` +} + +type newCloudIntegrationService struct { + bun.BaseModel `bun:"table:cloud_integration_service,alias:cis"` + + types.Identifiable + types.TimeAuditable + Type string `bun:"type,type:text,notnull,unique:cloud_integration_id_type"` + Config string `bun:"config,type:text"` + CloudIntegrationID string `bun:"cloud_integration_id,type:text,notnull,unique:cloud_integration_id_type"` +} + +type StorablePersonalAccessToken struct { + bun.BaseModel `bun:"table:personal_access_token"` + types.Identifiable + types.TimeAuditable + OrgID string `json:"orgId" bun:"org_id,type:text,notnull"` + Role string `json:"role" bun:"role,type:text,notnull,default:'ADMIN'"` + UserID string `json:"userId" bun:"user_id,type:text,notnull"` + Token string `json:"token" bun:"token,type:text,notnull,unique"` + Name string `json:"name" bun:"name,type:text,notnull"` + ExpiresAt int64 `json:"expiresAt" bun:"expires_at,notnull,default:0"` + LastUsed int64 `json:"lastUsed" bun:"last_used,notnull,default:0"` + Revoked bool `json:"revoked" bun:"revoked,notnull,default:false"` + UpdatedByUserID string `json:"updatedByUserId" bun:"updated_by_user_id,type:text,notnull,default:''"` +} + +func (migration *updateIntegrations) Up(ctx context.Context, db *bun.DB) error { + + // begin transaction + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + // don't run the migration if there are multiple org ids + orgIDs := make([]string, 0) + err = migration.store.BunDB().NewSelect().Model((*types.Organization)(nil)).Column("id").Scan(ctx, &orgIDs) + if err != nil { + return err + } + if len(orgIDs) > 1 { + return nil + } + + // --- + // installed integrations + // --- + err = migration. + store. + Dialect(). + RenameTableAndModifyModel(ctx, tx, new(existingInstalledIntegration), new(newInstalledIntegration), []string{OrgReference}, func(ctx context.Context) error { + existingIntegrations := make([]*existingInstalledIntegration, 0) + err = tx. + NewSelect(). + Model(&existingIntegrations). + Scan(ctx) + if err != nil { + if err != sql.ErrNoRows { + return err + } + } + + if err == nil && len(existingIntegrations) > 0 { + newIntegrations := migration. + CopyOldIntegrationsToNewIntegrations(tx, orgIDs[0], existingIntegrations) + _, err = tx. + NewInsert(). + Model(&newIntegrations). + Exec(ctx) + if err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } + + // --- + // cloud integrations + // --- + err = migration. + store. + Dialect(). + RenameTableAndModifyModel(ctx, tx, new(existingCloudIntegration), new(newCloudIntegration), []string{OrgReference}, func(ctx context.Context) error { + existingIntegrations := make([]*existingCloudIntegration, 0) + err = tx. + NewSelect(). + Model(&existingIntegrations). + Where("removed_at IS NULL"). // we will only copy the accounts that are not removed + Scan(ctx) + if err != nil { + if err != sql.ErrNoRows { + return err + } + } + + if err == nil && len(existingIntegrations) > 0 { + newIntegrations := migration. + CopyOldCloudIntegrationsToNewCloudIntegrations(tx, orgIDs[0], existingIntegrations) + _, err = tx. + NewInsert(). + Model(&newIntegrations). + Exec(ctx) + if err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } + + // add unique constraint to cloud_integration table + _, err = tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS unique_cloud_integration ON cloud_integration (id, provider, org_id)`) + if err != nil { + return err + } + + // --- + // cloud integration service + // --- + err = migration. + store. + Dialect(). + RenameTableAndModifyModel(ctx, tx, new(existingCloudIntegrationService), new(newCloudIntegrationService), []string{CloudIntegrationReference}, func(ctx context.Context) error { + existingServices := make([]*existingCloudIntegrationService, 0) + + // only one service per provider,account id and type + // so there won't be any duplicates. + // just that these will be enabled as soon as the integration for the account is enabled + err = tx. + NewSelect(). + Model(&existingServices). + Scan(ctx) + if err != nil { + if err != sql.ErrNoRows { + return err + } + } + + if err == nil && len(existingServices) > 0 { + newServices := migration. + CopyOldCloudIntegrationServicesToNewCloudIntegrationServices(tx, orgIDs[0], existingServices) + _, err = tx. + NewInsert(). + Model(&newServices). + Exec(ctx) + if err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } + + if len(orgIDs) == 0 { + err = tx.Commit() + if err != nil { + return err + } + return nil + } + + // copy the old aws integration user to the new user + err = migration.copyOldAwsIntegrationUser(tx, orgIDs[0]) + if err != nil { + return err + } + + err = tx.Commit() + if err != nil { + return err + } + + return nil +} + +func (migration *updateIntegrations) Down(ctx context.Context, db *bun.DB) error { + return nil +} + +func (migration *updateIntegrations) CopyOldIntegrationsToNewIntegrations(tx bun.IDB, orgID string, existingIntegrations []*existingInstalledIntegration) []*newInstalledIntegration { + newIntegrations := make([]*newInstalledIntegration, 0) + + for _, integration := range existingIntegrations { + newIntegrations = append(newIntegrations, &newInstalledIntegration{ + Identifiable: types.Identifiable{ + ID: valuer.GenerateUUID(), + }, + Type: integration.IntegrationID, + Config: integration.ConfigJSON, + InstalledAt: integration.InstalledAt, + OrgID: orgID, + }) + } + + return newIntegrations +} + +func (migration *updateIntegrations) CopyOldCloudIntegrationsToNewCloudIntegrations(tx bun.IDB, orgID string, existingIntegrations []*existingCloudIntegration) []*newCloudIntegration { + newIntegrations := make([]*newCloudIntegration, 0) + + for _, integration := range existingIntegrations { + newIntegrations = append(newIntegrations, &newCloudIntegration{ + Identifiable: types.Identifiable{ + ID: valuer.GenerateUUID(), + }, + TimeAuditable: types.TimeAuditable{ + CreatedAt: integration.CreatedAt, + UpdatedAt: integration.CreatedAt, + }, + Provider: integration.CloudProvider, + AccountID: integration.CloudAccountID, + Config: integration.ConfigJSON, + LastAgentReport: integration.LastAgentReportJSON, + RemovedAt: integration.RemovedAt, + OrgID: orgID, + }) + } + + return newIntegrations +} + +func (migration *updateIntegrations) CopyOldCloudIntegrationServicesToNewCloudIntegrationServices(tx bun.IDB, orgID string, existingServices []*existingCloudIntegrationService) []*newCloudIntegrationService { + newServices := make([]*newCloudIntegrationService, 0) + + for _, service := range existingServices { + var cloudIntegrationID string + err := tx.NewSelect(). + Model((*newCloudIntegration)(nil)). + Column("id"). + Where("account_id = ?", service.CloudAccountID). + Where("provider = ?", service.CloudProvider). + Where("org_id = ?", orgID). + Scan(context.Background(), &cloudIntegrationID) + if err != nil { + if err == sql.ErrNoRows { + continue + } + zap.L().Error("failed to get cloud integration id", zap.Error(err)) + return nil + } + newServices = append(newServices, &newCloudIntegrationService{ + Identifiable: types.Identifiable{ + ID: valuer.GenerateUUID(), + }, + TimeAuditable: types.TimeAuditable{ + CreatedAt: service.CreatedAt, + UpdatedAt: service.CreatedAt, + }, + Type: service.ServiceID, + Config: service.ConfigJSON, + CloudIntegrationID: cloudIntegrationID, + }) + } + + return newServices +} + +func (migration *updateIntegrations) copyOldAwsIntegrationUser(tx bun.IDB, orgID string) error { + user := &types.User{} + err := tx.NewSelect().Model(user).Where("email = ?", "aws-integration@signoz.io").Scan(context.Background()) + if err != nil { + if err == sql.ErrNoRows { + return nil + } + return err + } + + // check if the id is already an uuid + if _, err := uuid.Parse(user.ID); err == nil { + return nil + } + + // new user + newUser := &types.User{ + ID: uuid.New().String(), + TimeAuditable: types.TimeAuditable{ + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + OrgID: orgID, + Name: user.Name, + Email: user.Email, + GroupID: user.GroupID, + Password: user.Password, + } + + // get the pat for old user + pat := &StorablePersonalAccessToken{} + err = tx.NewSelect().Model(pat).Where("user_id = ? and revoked = false", "aws-integration").Scan(context.Background()) + if err != nil { + if err == sql.ErrNoRows { + // delete the old user + _, err = tx.ExecContext(context.Background(), `DELETE FROM users WHERE id = ?`, user.ID) + if err != nil { + return err + } + return nil + } + return err + } + + // new pat + newPAT := &StorablePersonalAccessToken{ + Identifiable: types.Identifiable{ID: valuer.GenerateUUID()}, + TimeAuditable: types.TimeAuditable{ + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + OrgID: orgID, + UserID: newUser.ID, + Token: pat.Token, + Name: pat.Name, + ExpiresAt: pat.ExpiresAt, + LastUsed: pat.LastUsed, + Revoked: pat.Revoked, + Role: pat.Role, + } + + // delete old user + _, err = tx.ExecContext(context.Background(), `DELETE FROM users WHERE id = ?`, user.ID) + if err != nil { + return err + } + + // insert the new user + _, err = tx.NewInsert().Model(newUser).Exec(context.Background()) + if err != nil { + return err + } + + // insert the new pat + _, err = tx.NewInsert().Model(newPAT).Exec(context.Background()) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/sqlmigration/sqlmigration.go b/pkg/sqlmigration/sqlmigration.go index cd56777a88..5a527b8974 100644 --- a/pkg/sqlmigration/sqlmigration.go +++ b/pkg/sqlmigration/sqlmigration.go @@ -26,8 +26,9 @@ var ( ) var ( - OrgReference = "org" - UserReference = "user" + OrgReference = "org" + UserReference = "user" + CloudIntegrationReference = "cloud_integration" ) func New( diff --git a/pkg/sqlstore/sqlitesqlstore/dialect.go b/pkg/sqlstore/sqlitesqlstore/dialect.go index e15976f914..964644c522 100644 --- a/pkg/sqlstore/sqlitesqlstore/dialect.go +++ b/pkg/sqlstore/sqlitesqlstore/dialect.go @@ -17,13 +17,15 @@ var ( ) var ( - Org = "org" - User = "user" + Org = "org" + User = "user" + CloudIntegration = "cloud_integration" ) var ( - OrgReference = `("org_id") REFERENCES "organizations" ("id")` - UserReference = `("user_id") REFERENCES "users" ("id") ON DELETE CASCADE ON UPDATE CASCADE` + OrgReference = `("org_id") REFERENCES "organizations" ("id")` + UserReference = `("user_id") REFERENCES "users" ("id") ON DELETE CASCADE ON UPDATE CASCADE` + CloudIntegrationReference = `("cloud_integration_id") REFERENCES "cloud_integration" ("id") ON DELETE CASCADE` ) type dialect struct { @@ -202,6 +204,8 @@ func (dialect *dialect) RenameTableAndModifyModel(ctx context.Context, bun bun.I fkReferences = append(fkReferences, OrgReference) } else if reference == User && !slices.Contains(fkReferences, UserReference) { fkReferences = append(fkReferences, UserReference) + } else if reference == CloudIntegration && !slices.Contains(fkReferences, CloudIntegrationReference) { + fkReferences = append(fkReferences, CloudIntegrationReference) } } diff --git a/pkg/types/integration.go b/pkg/types/integration.go index 9ba9d0e62c..60304730e6 100644 --- a/pkg/types/integration.go +++ b/pkg/types/integration.go @@ -1,37 +1,246 @@ package types import ( + "database/sql/driver" + "encoding/json" + "fmt" "time" + "github.com/pkg/errors" "github.com/uptrace/bun" ) -type Integration struct { - bun.BaseModel `bun:"table:integrations_installed"` +type IntegrationUserEmail string - IntegrationID string `bun:"integration_id,pk,type:text"` - ConfigJSON string `bun:"config_json,type:text"` - InstalledAt time.Time `bun:"installed_at,default:current_timestamp"` +const ( + AWSIntegrationUserEmail IntegrationUserEmail = "aws-integration@signoz.io" +) + +var AllIntegrationUserEmails = []IntegrationUserEmail{ + AWSIntegrationUserEmail, } -type CloudIntegrationAccount struct { - bun.BaseModel `bun:"table:cloud_integrations_accounts"` +// -------------------------------------------------------------------------- +// Normal integration uses just the installed_integration table +// -------------------------------------------------------------------------- - CloudProvider string `bun:"cloud_provider,type:text,unique:cloud_provider_id"` - ID string `bun:"id,type:text,notnull,unique:cloud_provider_id"` - ConfigJSON string `bun:"config_json,type:text"` - CloudAccountID string `bun:"cloud_account_id,type:text"` - LastAgentReportJSON string `bun:"last_agent_report_json,type:text"` - CreatedAt time.Time `bun:"created_at,notnull,default:current_timestamp"` - RemovedAt time.Time `bun:"removed_at,type:timestamp"` +type InstalledIntegration struct { + bun.BaseModel `bun:"table:installed_integration"` + + Identifiable + Type string `json:"type" bun:"type,type:text,unique:org_id_type"` + Config InstalledIntegrationConfig `json:"config" bun:"config,type:text"` + InstalledAt time.Time `json:"installed_at" bun:"installed_at,default:current_timestamp"` + OrgID string `json:"org_id" bun:"org_id,type:text,unique:org_id_type,references:organizations(id),on_delete:cascade"` } -type CloudIntegrationServiceConfig struct { - bun.BaseModel `bun:"table:cloud_integrations_service_configs"` +type InstalledIntegrationConfig map[string]interface{} - CloudProvider string `bun:"cloud_provider,type:text,notnull,unique:service_cloud_provider_account"` - CloudAccountID string `bun:"cloud_account_id,type:text,notnull,unique:service_cloud_provider_account"` - ServiceID string `bun:"service_id,type:text,notnull,unique:service_cloud_provider_account"` - ConfigJSON string `bun:"config_json,type:text"` - CreatedAt time.Time `bun:"created_at,default:current_timestamp"` +// For serializing from db +func (c *InstalledIntegrationConfig) Scan(src interface{}) error { + var data []byte + switch v := src.(type) { + case []byte: + data = v + case string: + data = []byte(v) + default: + return fmt.Errorf("tried to scan from %T instead of string or bytes", src) + } + + return json.Unmarshal(data, c) +} + +// For serializing to db +func (c *InstalledIntegrationConfig) Value() (driver.Value, error) { + filterSetJson, err := json.Marshal(c) + if err != nil { + return nil, errors.Wrap(err, "could not serialize integration config to JSON") + } + return filterSetJson, nil +} + +// -------------------------------------------------------------------------- +// Cloud integration uses the cloud_integration table +// and cloud_integrations_service table +// -------------------------------------------------------------------------- + +type CloudIntegration struct { + bun.BaseModel `bun:"table:cloud_integration"` + + Identifiable + TimeAuditable + Provider string `json:"provider" bun:"provider,type:text,unique:provider_id"` + Config *AccountConfig `json:"config" bun:"config,type:text"` + AccountID *string `json:"account_id" bun:"account_id,type:text"` + LastAgentReport *AgentReport `json:"last_agent_report" bun:"last_agent_report,type:text"` + RemovedAt *time.Time `json:"removed_at" bun:"removed_at,type:timestamp,nullzero"` + OrgID string `bun:"org_id,type:text,unique:provider_id"` +} + +func (a *CloudIntegration) Status() AccountStatus { + status := AccountStatus{} + if a.LastAgentReport != nil { + lastHeartbeat := a.LastAgentReport.TimestampMillis + status.Integration.LastHeartbeatTsMillis = &lastHeartbeat + } + return status +} + +func (a *CloudIntegration) Account() Account { + ca := Account{Id: a.ID.StringValue(), Status: a.Status()} + + if a.AccountID != nil { + ca.CloudAccountId = *a.AccountID + } + + if a.Config != nil { + ca.Config = *a.Config + } else { + ca.Config = DefaultAccountConfig() + } + return ca +} + +type Account struct { + Id string `json:"id"` + CloudAccountId string `json:"cloud_account_id"` + Config AccountConfig `json:"config"` + Status AccountStatus `json:"status"` +} + +type AccountStatus struct { + Integration AccountIntegrationStatus `json:"integration"` +} + +type AccountIntegrationStatus struct { + LastHeartbeatTsMillis *int64 `json:"last_heartbeat_ts_ms"` +} + +func DefaultAccountConfig() AccountConfig { + return AccountConfig{ + EnabledRegions: []string{}, + } +} + +type AccountConfig struct { + EnabledRegions []string `json:"regions"` +} + +// For serializing from db +func (c *AccountConfig) Scan(src any) error { + var data []byte + switch v := src.(type) { + case []byte: + data = v + case string: + data = []byte(v) + default: + return fmt.Errorf("tried to scan from %T instead of string or bytes", src) + } + + return json.Unmarshal(data, c) +} + +// For serializing to db +func (c *AccountConfig) Value() (driver.Value, error) { + if c == nil { + return nil, nil + } + + serialized, err := json.Marshal(c) + if err != nil { + return nil, fmt.Errorf( + "couldn't serialize cloud account config to JSON: %w", err, + ) + } + return serialized, nil +} + +type AgentReport struct { + TimestampMillis int64 `json:"timestamp_millis"` + Data map[string]any `json:"data"` +} + +// For serializing from db +func (r *AgentReport) Scan(src any) error { + var data []byte + switch v := src.(type) { + case []byte: + data = v + case string: + data = []byte(v) + default: + return fmt.Errorf("tried to scan from %T instead of string or bytes", src) + } + + return json.Unmarshal(data, r) +} + +// For serializing to db +func (r *AgentReport) Value() (driver.Value, error) { + if r == nil { + return nil, nil + } + + serialized, err := json.Marshal(r) + if err != nil { + return nil, fmt.Errorf( + "couldn't serialize agent report to JSON: %w", err, + ) + } + return serialized, nil +} + +type CloudIntegrationService struct { + bun.BaseModel `bun:"table:cloud_integration_service,alias:cis"` + + Identifiable + TimeAuditable + Type string `bun:"type,type:text,notnull,unique:cloud_integration_id_type"` + Config CloudServiceConfig `bun:"config,type:text"` + CloudIntegrationID string `bun:"cloud_integration_id,type:text,notnull,unique:cloud_integration_id_type,references:cloud_integrations(id),on_delete:cascade"` +} + +type CloudServiceLogsConfig struct { + Enabled bool `json:"enabled"` +} + +type CloudServiceMetricsConfig struct { + Enabled bool `json:"enabled"` +} + +type CloudServiceConfig struct { + Logs *CloudServiceLogsConfig `json:"logs,omitempty"` + Metrics *CloudServiceMetricsConfig `json:"metrics,omitempty"` +} + +// For serializing from db +func (c *CloudServiceConfig) Scan(src any) error { + var data []byte + switch src := src.(type) { + case []byte: + data = src + case string: + data = []byte(src) + default: + return fmt.Errorf("tried to scan from %T instead of string or bytes", src) + } + + return json.Unmarshal(data, c) +} + +// For serializing to db +func (c *CloudServiceConfig) Value() (driver.Value, error) { + if c == nil { + return nil, nil + } + + serialized, err := json.Marshal(c) + if err != nil { + return nil, fmt.Errorf( + "couldn't serialize cloud service config to JSON: %w", err, + ) + } + return serialized, nil }