mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-07-28 05:51:58 +08:00

* feat: cloud service integrations: get model and repo interface started * feat: cloud service integrations: flesh out more of cloud services model * feat: cloud integrations: reorganize things a little * feat: cloud integrations: get svc controller started * feat: cloud integrations: add stubs for EC2 and RDS postgres services * feat: cloud integrations: add validation for listing and getting available svcs and some cleanup * feat: cloud integrations: refactor helpers in existing integrations code for reuse * feat: cloud integrations: parsing of cloud service definitions * feat: cloud integrations: impl for getCloudProviderService * feat: cloud integrations: some reorganization * feat: cloud integrations: some more cleanup * feat: cloud integrations: add validation for listing available cloud provider services * feat: cloud integrations: API endpoint for listing available cloud provider services * feat: cloud integrations: add validation for getting details of a particular service * feat: cloud integrations: API endpoint for getting details of a service * feat: cloud integrations: add controller validation for configuring cloud services * feat: cloud integrations: get serviceConfigRepo started * feat: cloud integrations: service config in service list summaries when queried for cloud account id * feat: cloud integrations: only a supported service for a connected cloud account can be configured * feat: cloud integrations: add validation for configuring services via the API * feat: cloud integrations: API for configuring services * feat: cloud integrations: some cleanup * feat: cloud integrations: fix broken test --------- Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
271 lines
6.0 KiB
Go
271 lines
6.0 KiB
Go
package cloudintegrations
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/jmoiron/sqlx"
|
|
"go.signoz.io/signoz/pkg/query-service/model"
|
|
)
|
|
|
|
type cloudProviderAccountsRepository interface {
|
|
listConnected(ctx context.Context, cloudProvider string) ([]AccountRecord, *model.ApiError)
|
|
|
|
get(ctx context.Context, cloudProvider string, id string) (*AccountRecord, *model.ApiError)
|
|
|
|
getConnectedCloudAccount(
|
|
ctx context.Context, cloudProvider string, cloudAccountId string,
|
|
) (*AccountRecord, *model.ApiError)
|
|
|
|
// Insert an account or update it by (cloudProvider, id)
|
|
// for specified non-empty fields
|
|
upsert(
|
|
ctx context.Context,
|
|
cloudProvider string,
|
|
id *string,
|
|
config *AccountConfig,
|
|
cloudAccountId *string,
|
|
agentReport *AgentReport,
|
|
removedAt *time.Time,
|
|
) (*AccountRecord, *model.ApiError)
|
|
}
|
|
|
|
func newCloudProviderAccountsRepository(db *sqlx.DB) (
|
|
*cloudProviderAccountsSQLRepository, error,
|
|
) {
|
|
if err := initAccountsSqliteDBIfNeeded(db); err != nil {
|
|
return nil, fmt.Errorf("could not init sqlite DB for cloudintegrations accounts: %w", err)
|
|
}
|
|
|
|
return &cloudProviderAccountsSQLRepository{
|
|
db: db,
|
|
}, nil
|
|
}
|
|
|
|
func initAccountsSqliteDBIfNeeded(db *sqlx.DB) error {
|
|
if db == nil {
|
|
return fmt.Errorf("db is required")
|
|
}
|
|
|
|
createTablesStatements := `
|
|
CREATE TABLE IF NOT EXISTS cloud_integrations_accounts(
|
|
cloud_provider TEXT NOT NULL,
|
|
id TEXT NOT NULL,
|
|
config_json TEXT,
|
|
cloud_account_id TEXT,
|
|
last_agent_report_json TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
|
|
removed_at TIMESTAMP,
|
|
UNIQUE(cloud_provider, id)
|
|
)
|
|
`
|
|
_, err := db.Exec(createTablesStatements)
|
|
if err != nil {
|
|
return fmt.Errorf(
|
|
"could not ensure cloud provider accounts schema in sqlite DB: %w", err,
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type cloudProviderAccountsSQLRepository struct {
|
|
db *sqlx.DB
|
|
}
|
|
|
|
func (r *cloudProviderAccountsSQLRepository) listConnected(
|
|
ctx context.Context, cloudProvider string,
|
|
) ([]AccountRecord, *model.ApiError) {
|
|
accounts := []AccountRecord{}
|
|
|
|
err := r.db.SelectContext(
|
|
ctx, &accounts, `
|
|
select
|
|
cloud_provider,
|
|
id,
|
|
config_json,
|
|
cloud_account_id,
|
|
last_agent_report_json,
|
|
created_at,
|
|
removed_at
|
|
from cloud_integrations_accounts
|
|
where
|
|
cloud_provider=$1
|
|
and removed_at is NULL
|
|
and cloud_account_id is not NULL
|
|
and last_agent_report_json is not NULL
|
|
order by created_at
|
|
`, cloudProvider,
|
|
)
|
|
if err != nil {
|
|
return nil, model.InternalError(fmt.Errorf(
|
|
"could not query connected cloud accounts: %w", err,
|
|
))
|
|
}
|
|
|
|
return accounts, nil
|
|
}
|
|
|
|
func (r *cloudProviderAccountsSQLRepository) get(
|
|
ctx context.Context, cloudProvider string, id string,
|
|
) (*AccountRecord, *model.ApiError) {
|
|
var result AccountRecord
|
|
|
|
err := r.db.GetContext(
|
|
ctx, &result, `
|
|
select
|
|
cloud_provider,
|
|
id,
|
|
config_json,
|
|
cloud_account_id,
|
|
last_agent_report_json,
|
|
created_at,
|
|
removed_at
|
|
from cloud_integrations_accounts
|
|
where
|
|
cloud_provider=$1
|
|
and id=$2
|
|
`,
|
|
cloudProvider, id,
|
|
)
|
|
|
|
if err == sql.ErrNoRows {
|
|
return nil, model.NotFoundError(fmt.Errorf(
|
|
"couldn't find account with Id %s", id,
|
|
))
|
|
} else if err != nil {
|
|
return nil, model.InternalError(fmt.Errorf(
|
|
"couldn't query cloud provider accounts: %w", err,
|
|
))
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
func (r *cloudProviderAccountsSQLRepository) getConnectedCloudAccount(
|
|
ctx context.Context, cloudProvider string, cloudAccountId string,
|
|
) (*AccountRecord, *model.ApiError) {
|
|
var result AccountRecord
|
|
|
|
err := r.db.GetContext(
|
|
ctx, &result, `
|
|
select
|
|
cloud_provider,
|
|
id,
|
|
config_json,
|
|
cloud_account_id,
|
|
last_agent_report_json,
|
|
created_at,
|
|
removed_at
|
|
from cloud_integrations_accounts
|
|
where
|
|
cloud_provider=$1
|
|
and cloud_account_id=$2
|
|
and last_agent_report_json is not NULL
|
|
and removed_at is NULL
|
|
`,
|
|
cloudProvider, cloudAccountId,
|
|
)
|
|
|
|
if err == sql.ErrNoRows {
|
|
return nil, model.NotFoundError(fmt.Errorf(
|
|
"couldn't find connected cloud account %s", cloudAccountId,
|
|
))
|
|
} else if err != nil {
|
|
return nil, model.InternalError(fmt.Errorf(
|
|
"couldn't query cloud provider accounts: %w", err,
|
|
))
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
func (r *cloudProviderAccountsSQLRepository) upsert(
|
|
ctx context.Context,
|
|
cloudProvider string,
|
|
id *string,
|
|
config *AccountConfig,
|
|
cloudAccountId *string,
|
|
agentReport *AgentReport,
|
|
removedAt *time.Time,
|
|
) (*AccountRecord, *model.ApiError) {
|
|
// Insert
|
|
if id == nil {
|
|
newId := uuid.NewString()
|
|
id = &newId
|
|
}
|
|
|
|
// Prepare clause for setting values in `on conflict do update`
|
|
onConflictSetStmts := []string{}
|
|
setColStatement := func(col string) string {
|
|
return fmt.Sprintf("%s=excluded.%s", col, col)
|
|
}
|
|
|
|
if config != nil {
|
|
onConflictSetStmts = append(
|
|
onConflictSetStmts, setColStatement("config_json"),
|
|
)
|
|
}
|
|
|
|
if cloudAccountId != nil {
|
|
onConflictSetStmts = append(
|
|
onConflictSetStmts, setColStatement("cloud_account_id"),
|
|
)
|
|
}
|
|
|
|
if agentReport != nil {
|
|
onConflictSetStmts = append(
|
|
onConflictSetStmts, setColStatement("last_agent_report_json"),
|
|
)
|
|
}
|
|
|
|
if removedAt != nil {
|
|
onConflictSetStmts = append(
|
|
onConflictSetStmts, setColStatement("removed_at"),
|
|
)
|
|
}
|
|
|
|
onConflictClause := ""
|
|
if len(onConflictSetStmts) > 0 {
|
|
onConflictClause = fmt.Sprintf(
|
|
"on conflict(cloud_provider, id) do update SET\n%s",
|
|
strings.Join(onConflictSetStmts, ",\n"),
|
|
)
|
|
}
|
|
|
|
insertQuery := fmt.Sprintf(`
|
|
INSERT INTO cloud_integrations_accounts (
|
|
cloud_provider,
|
|
id,
|
|
config_json,
|
|
cloud_account_id,
|
|
last_agent_report_json,
|
|
removed_at
|
|
) values ($1, $2, $3, $4, $5, $6)
|
|
%s`, onConflictClause,
|
|
)
|
|
|
|
_, dbErr := r.db.ExecContext(
|
|
ctx, insertQuery,
|
|
cloudProvider, id, config, cloudAccountId, agentReport, removedAt,
|
|
)
|
|
if dbErr != nil {
|
|
return nil, model.InternalError(fmt.Errorf(
|
|
"could not upsert cloud account record: %w", dbErr,
|
|
))
|
|
}
|
|
|
|
upsertedAccount, apiErr := r.get(ctx, cloudProvider, *id)
|
|
if apiErr != nil {
|
|
return nil, model.InternalError(fmt.Errorf(
|
|
"couldn't fetch upserted account by id: %w", apiErr.ToError(),
|
|
))
|
|
}
|
|
|
|
return upsertedAccount, nil
|
|
}
|