mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-07-27 16:11:59 +08:00

* chore: multitenancy in integrations * chore: multitenancy in cloud integration accounts * chore: changes to cloudintegrationservice * chore: rename migration * chore: update scan function * chore: update scan function * chore: fix migration * chore: fix struct * chore: remove unwanted code * chore: update scan function * chore: migrate user and pat for integrations * fix: changes to the user for integrations * fix: address comments * fix: copy created_at * fix: update non revoked token * chore: don't allow deleting pat and user for integrations * fix: address comments * chore: address comments * chore: add checks for fk in dialect * fix: service migration * fix: don't update user if user is already migrated * fix: update correct service config * fix: remove unwanted code * fix: remove migration for multiple same services which is not required * fix: fix migration and disable disaboard if metrics disabled * fix: don't use ee types --------- Co-authored-by: Vikrant Gupta <vikrant@signoz.io>
218 lines
5.4 KiB
Go
218 lines
5.4 KiB
Go
package cloudintegrations
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/SigNoz/signoz/pkg/query-service/model"
|
|
"github.com/SigNoz/signoz/pkg/sqlstore"
|
|
"github.com/SigNoz/signoz/pkg/types"
|
|
"github.com/SigNoz/signoz/pkg/valuer"
|
|
)
|
|
|
|
type cloudProviderAccountsRepository interface {
|
|
listConnected(ctx context.Context, orgId string, provider string) ([]types.CloudIntegration, *model.ApiError)
|
|
|
|
get(ctx context.Context, orgId string, provider string, id string) (*types.CloudIntegration, *model.ApiError)
|
|
|
|
getConnectedCloudAccount(ctx context.Context, orgId string, provider string, accountID string) (*types.CloudIntegration, *model.ApiError)
|
|
|
|
// Insert an account or update it by (cloudProvider, id)
|
|
// for specified non-empty fields
|
|
upsert(
|
|
ctx context.Context,
|
|
orgId string,
|
|
provider string,
|
|
id *string,
|
|
config *types.AccountConfig,
|
|
accountId *string,
|
|
agentReport *types.AgentReport,
|
|
removedAt *time.Time,
|
|
) (*types.CloudIntegration, *model.ApiError)
|
|
}
|
|
|
|
func newCloudProviderAccountsRepository(store sqlstore.SQLStore) (
|
|
*cloudProviderAccountsSQLRepository, error,
|
|
) {
|
|
return &cloudProviderAccountsSQLRepository{
|
|
store: store,
|
|
}, nil
|
|
}
|
|
|
|
type cloudProviderAccountsSQLRepository struct {
|
|
store sqlstore.SQLStore
|
|
}
|
|
|
|
func (r *cloudProviderAccountsSQLRepository) listConnected(
|
|
ctx context.Context, orgId string, cloudProvider string,
|
|
) ([]types.CloudIntegration, *model.ApiError) {
|
|
accounts := []types.CloudIntegration{}
|
|
|
|
err := r.store.BunDB().NewSelect().
|
|
Model(&accounts).
|
|
Where("org_id = ?", orgId).
|
|
Where("provider = ?", cloudProvider).
|
|
Where("removed_at is NULL").
|
|
Where("account_id is not NULL").
|
|
Where("last_agent_report is not NULL").
|
|
Order("created_at").
|
|
Scan(ctx)
|
|
|
|
if err != nil {
|
|
return nil, model.InternalError(fmt.Errorf(
|
|
"could not query connected cloud accounts: %w", err,
|
|
))
|
|
}
|
|
|
|
return accounts, nil
|
|
}
|
|
|
|
func (r *cloudProviderAccountsSQLRepository) get(
|
|
ctx context.Context, orgId string, provider string, id string,
|
|
) (*types.CloudIntegration, *model.ApiError) {
|
|
var result types.CloudIntegration
|
|
|
|
err := r.store.BunDB().NewSelect().
|
|
Model(&result).
|
|
Where("org_id = ?", orgId).
|
|
Where("provider = ?", provider).
|
|
Where("id = ?", id).
|
|
Scan(ctx)
|
|
|
|
if err == sql.ErrNoRows {
|
|
return nil, model.NotFoundError(fmt.Errorf(
|
|
"couldn't find account with Id %s", id,
|
|
))
|
|
} else if err != nil {
|
|
return nil, model.InternalError(fmt.Errorf(
|
|
"couldn't query cloud provider accounts: %w", err,
|
|
))
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
func (r *cloudProviderAccountsSQLRepository) getConnectedCloudAccount(
|
|
ctx context.Context, orgId string, provider string, accountId string,
|
|
) (*types.CloudIntegration, *model.ApiError) {
|
|
var result types.CloudIntegration
|
|
|
|
err := r.store.BunDB().NewSelect().
|
|
Model(&result).
|
|
Where("org_id = ?", orgId).
|
|
Where("provider = ?", provider).
|
|
Where("account_id = ?", accountId).
|
|
Where("last_agent_report is not NULL").
|
|
Where("removed_at is NULL").
|
|
Scan(ctx)
|
|
|
|
if err == sql.ErrNoRows {
|
|
return nil, model.NotFoundError(fmt.Errorf(
|
|
"couldn't find connected cloud account %s", accountId,
|
|
))
|
|
} else if err != nil {
|
|
return nil, model.InternalError(fmt.Errorf(
|
|
"couldn't query cloud provider accounts: %w", err,
|
|
))
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
func (r *cloudProviderAccountsSQLRepository) upsert(
|
|
ctx context.Context,
|
|
orgId string,
|
|
provider string,
|
|
id *string,
|
|
config *types.AccountConfig,
|
|
accountId *string,
|
|
agentReport *types.AgentReport,
|
|
removedAt *time.Time,
|
|
) (*types.CloudIntegration, *model.ApiError) {
|
|
// Insert
|
|
if id == nil {
|
|
temp := valuer.GenerateUUID().StringValue()
|
|
id = &temp
|
|
}
|
|
|
|
// Prepare clause for setting values in `on conflict do update`
|
|
onConflictSetStmts := []string{}
|
|
setColStatement := func(col string) string {
|
|
return fmt.Sprintf("%s=excluded.%s", col, col)
|
|
}
|
|
|
|
if config != nil {
|
|
onConflictSetStmts = append(
|
|
onConflictSetStmts, setColStatement("config"),
|
|
)
|
|
}
|
|
|
|
if accountId != nil {
|
|
onConflictSetStmts = append(
|
|
onConflictSetStmts, setColStatement("account_id"),
|
|
)
|
|
}
|
|
|
|
if agentReport != nil {
|
|
onConflictSetStmts = append(
|
|
onConflictSetStmts, setColStatement("last_agent_report"),
|
|
)
|
|
}
|
|
|
|
if removedAt != nil {
|
|
onConflictSetStmts = append(
|
|
onConflictSetStmts, setColStatement("removed_at"),
|
|
)
|
|
}
|
|
|
|
// set updated_at to current timestamp if it's an upsert
|
|
onConflictSetStmts = append(
|
|
onConflictSetStmts, setColStatement("updated_at"),
|
|
)
|
|
|
|
onConflictClause := ""
|
|
if len(onConflictSetStmts) > 0 {
|
|
onConflictClause = fmt.Sprintf(
|
|
"conflict(id, provider, org_id) do update SET\n%s",
|
|
strings.Join(onConflictSetStmts, ",\n"),
|
|
)
|
|
}
|
|
|
|
integration := types.CloudIntegration{
|
|
OrgID: orgId,
|
|
Provider: provider,
|
|
Identifiable: types.Identifiable{ID: valuer.MustNewUUID(*id)},
|
|
TimeAuditable: types.TimeAuditable{
|
|
CreatedAt: time.Now(),
|
|
UpdatedAt: time.Now(),
|
|
},
|
|
Config: config,
|
|
AccountID: accountId,
|
|
LastAgentReport: agentReport,
|
|
RemovedAt: removedAt,
|
|
}
|
|
|
|
_, dbErr := r.store.BunDB().NewInsert().
|
|
Model(&integration).
|
|
On(onConflictClause).
|
|
Exec(ctx)
|
|
|
|
if dbErr != nil {
|
|
return nil, model.InternalError(fmt.Errorf(
|
|
"could not upsert cloud account record: %w", dbErr,
|
|
))
|
|
}
|
|
|
|
upsertedAccount, apiErr := r.get(ctx, orgId, provider, *id)
|
|
if apiErr != nil {
|
|
return nil, model.InternalError(fmt.Errorf(
|
|
"couldn't fetch upserted account by id: %w", apiErr.ToError(),
|
|
))
|
|
}
|
|
|
|
return upsertedAccount, nil
|
|
}
|