mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-14 20:45:54 +08:00
Feat: cloud integrations: agent check-in api (#7004)
* chore: cloudintegrations: rename rds def * feat: cloudintegrations: shape of agent check in response for v0 release * chore: cloudintegrations: add validation for response expected after agent check in * chore: cloudintegrations: accumulate teletry collection strategies for enabled services * chore: cloudintegrations: use map struct to parse from map to struct with json tags * chore: cloudintegrations: telemetry collection strategy for services * chore: cloudintegrations: wrap up test for agent check in resp * chore: some cleanup * chore: some cleanup * chore: some minor renaming * chore: address review comment * chore: some cleanup --------- Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
parent
784dccf298
commit
3b550c485d
@ -86,23 +86,25 @@ func readAllServiceDefinitions() error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
cloudProviderDirPath := path.Join(rootDirName, d.Name())
|
cloudProvider := d.Name()
|
||||||
cloudServices, err := readServiceDefinitionsFromDir(cloudProviderDirPath)
|
|
||||||
|
cloudProviderDirPath := path.Join(rootDirName, cloudProvider)
|
||||||
|
cloudServices, err := readServiceDefinitionsFromDir(cloudProvider, cloudProviderDirPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("couldn't read %s service definitions", d.Name())
|
return fmt.Errorf("couldn't read %s service definitions: %w", cloudProvider, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(cloudServices) < 1 {
|
if len(cloudServices) < 1 {
|
||||||
return fmt.Errorf("no %s services could be read", d.Name())
|
return fmt.Errorf("no %s services could be read", cloudProvider)
|
||||||
}
|
}
|
||||||
|
|
||||||
availableServices[d.Name()] = cloudServices
|
availableServices[cloudProvider] = cloudServices
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
|
func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath string) (
|
||||||
map[string]CloudServiceDetails, error,
|
map[string]CloudServiceDetails, error,
|
||||||
) {
|
) {
|
||||||
svcDefDirs, err := fs.ReadDir(serviceDefinitionFiles, cloudProviderDirPath)
|
svcDefDirs, err := fs.ReadDir(serviceDefinitionFiles, cloudProviderDirPath)
|
||||||
@ -118,7 +120,7 @@ func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
svcDirPath := path.Join(cloudProviderDirPath, d.Name())
|
svcDirPath := path.Join(cloudProviderDirPath, d.Name())
|
||||||
s, err := readServiceDefinition(svcDirPath)
|
s, err := readServiceDefinition(cloudProvider, svcDirPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't read svc definition for %s: %w", d.Name(), err)
|
return nil, fmt.Errorf("couldn't read svc definition for %s: %w", d.Name(), err)
|
||||||
}
|
}
|
||||||
@ -135,14 +137,14 @@ func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
|
|||||||
return svcDefs, nil
|
return svcDefs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
|
func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServiceDetails, error) {
|
||||||
integrationJsonPath := path.Join(dirpath, "integration.json")
|
integrationJsonPath := path.Join(svcDirpath, "integration.json")
|
||||||
|
|
||||||
serializedSpec, err := serviceDefinitionFiles.ReadFile(integrationJsonPath)
|
serializedSpec, err := serviceDefinitionFiles.ReadFile(integrationJsonPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf(
|
return nil, fmt.Errorf(
|
||||||
"couldn't find integration.json in %s: %w",
|
"couldn't find integration.json in %s: %w",
|
||||||
dirpath, err,
|
svcDirpath, err,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -155,7 +157,7 @@ func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
hydrated, err := integrations.HydrateFileUris(
|
hydrated, err := integrations.HydrateFileUris(
|
||||||
integrationSpec, serviceDefinitionFiles, dirpath,
|
integrationSpec, serviceDefinitionFiles, svcDirpath,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf(
|
return nil, fmt.Errorf(
|
||||||
@ -163,20 +165,9 @@ func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
|
|||||||
integrationJsonPath, err,
|
integrationJsonPath, err,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
hydratedSpec := hydrated.(map[string]any)
|
||||||
|
|
||||||
hydratedSpec := hydrated.(map[string]interface{})
|
serviceDef, err := ParseStructWithJsonTagsFromMap[CloudServiceDetails](hydratedSpec)
|
||||||
hydratedSpecJson, err := koanfJson.Parser().Marshal(hydratedSpec)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf(
|
|
||||||
"couldn't serialize hydrated integration spec back to JSON %s: %w",
|
|
||||||
integrationJsonPath, err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
var serviceDef CloudServiceDetails
|
|
||||||
decoder := json.NewDecoder(bytes.NewReader(hydratedSpecJson))
|
|
||||||
decoder.DisallowUnknownFields()
|
|
||||||
err = decoder.Decode(&serviceDef)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf(
|
return nil, fmt.Errorf(
|
||||||
"couldn't parse hydrated JSON spec read from %s: %w",
|
"couldn't parse hydrated JSON spec read from %s: %w",
|
||||||
@ -189,11 +180,13 @@ func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
|
|||||||
return nil, fmt.Errorf("invalid service definition %s: %w", serviceDef.Id, err)
|
return nil, fmt.Errorf("invalid service definition %s: %w", serviceDef.Id, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &serviceDef, nil
|
serviceDef.TelemetryCollectionStrategy.Provider = cloudProvider
|
||||||
|
|
||||||
|
return serviceDef, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateServiceDefinition(s CloudServiceDetails) error {
|
func validateServiceDefinition(s *CloudServiceDetails) error {
|
||||||
// Validate dashboard data
|
// Validate dashboard data
|
||||||
seenDashboardIds := map[string]interface{}{}
|
seenDashboardIds := map[string]interface{}{}
|
||||||
for _, dd := range s.Assets.Dashboards {
|
for _, dd := range s.Assets.Dashboards {
|
||||||
@ -211,7 +204,29 @@ func validateServiceDefinition(s CloudServiceDetails) error {
|
|||||||
seenDashboardIds[dashboardId] = nil
|
seenDashboardIds[dashboardId] = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s.TelemetryCollectionStrategy == nil {
|
||||||
|
return fmt.Errorf("telemetry_collection_strategy is required")
|
||||||
|
}
|
||||||
|
|
||||||
// potentially more to follow
|
// potentially more to follow
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ParseStructWithJsonTagsFromMap[StructType any](data map[string]any) (
|
||||||
|
*StructType, error,
|
||||||
|
) {
|
||||||
|
mapJson, err := json.Marshal(data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("couldn't marshal map to json: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var res StructType
|
||||||
|
decoder := json.NewDecoder(bytes.NewReader(mapJson))
|
||||||
|
decoder.DisallowUnknownFields()
|
||||||
|
err = decoder.Decode(&res)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("couldn't unmarshal json back to struct: %w", err)
|
||||||
|
}
|
||||||
|
return &res, nil
|
||||||
|
}
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
"go.signoz.io/signoz/pkg/query-service/model"
|
"go.signoz.io/signoz/pkg/query-service/model"
|
||||||
|
"golang.org/x/exp/maps"
|
||||||
)
|
)
|
||||||
|
|
||||||
var SupportedCloudProviders = []string{
|
var SupportedCloudProviders = []string{
|
||||||
@ -92,6 +93,11 @@ type GenerateConnectionUrlRequest struct {
|
|||||||
type SigNozAgentConfig struct {
|
type SigNozAgentConfig struct {
|
||||||
// The region in which SigNoz agent should be installed.
|
// The region in which SigNoz agent should be installed.
|
||||||
Region string `json:"region"`
|
Region string `json:"region"`
|
||||||
|
|
||||||
|
IngestionUrl string `json:"ingestion_url"`
|
||||||
|
IngestionKey string `json:"ingestion_key"`
|
||||||
|
SigNozAPIUrl string `json:"signoz_api_url"`
|
||||||
|
SigNozAPIKey string `json:"signoz_api_key"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type GenerateConnectionUrlResponse struct {
|
type GenerateConnectionUrlResponse struct {
|
||||||
@ -163,7 +169,17 @@ type AgentCheckInRequest struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type AgentCheckInResponse struct {
|
type AgentCheckInResponse struct {
|
||||||
Account AccountRecord `json:"account"`
|
AccountId string `json:"account_id"`
|
||||||
|
CloudAccountId string `json:"cloud_account_id"`
|
||||||
|
RemovedAt *time.Time `json:"removed_at"`
|
||||||
|
|
||||||
|
IntegrationConfig IntegrationConfigForAgent `json:"integration_config"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type IntegrationConfigForAgent struct {
|
||||||
|
EnabledRegions []string `json:"enabled_regions"`
|
||||||
|
|
||||||
|
TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) CheckInAsAgent(
|
func (c *Controller) CheckInAsAgent(
|
||||||
@ -201,8 +217,70 @@ func (c *Controller) CheckInAsAgent(
|
|||||||
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
|
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// prepare and return integration config to be consumed by agent
|
||||||
|
telemetryCollectionStrategy, err := NewCloudTelemetryCollectionStrategy(cloudProvider)
|
||||||
|
if err != nil {
|
||||||
|
return nil, model.InternalError(fmt.Errorf(
|
||||||
|
"couldn't init telemetry collection strategy: %w", err,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
agentConfig := IntegrationConfigForAgent{
|
||||||
|
EnabledRegions: []string{},
|
||||||
|
TelemetryCollectionStrategy: telemetryCollectionStrategy,
|
||||||
|
}
|
||||||
|
|
||||||
|
if account.Config != nil && account.Config.EnabledRegions != nil {
|
||||||
|
agentConfig.EnabledRegions = account.Config.EnabledRegions
|
||||||
|
}
|
||||||
|
|
||||||
|
services, apiErr := listCloudProviderServices(cloudProvider)
|
||||||
|
if apiErr != nil {
|
||||||
|
return nil, model.WrapApiError(apiErr, "couldn't list cloud services")
|
||||||
|
}
|
||||||
|
svcDetailsById := map[string]*CloudServiceDetails{}
|
||||||
|
for _, svcDetails := range services {
|
||||||
|
svcDetailsById[svcDetails.Id] = &svcDetails
|
||||||
|
}
|
||||||
|
|
||||||
|
svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount(
|
||||||
|
ctx, cloudProvider, *account.CloudAccountId,
|
||||||
|
)
|
||||||
|
if apiErr != nil {
|
||||||
|
return nil, model.WrapApiError(
|
||||||
|
apiErr, "couldn't get service configs for cloud account",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// accumulate config in a fixed order to ensure same config generated across runs
|
||||||
|
configuredSvcIds := maps.Keys(svcConfigs)
|
||||||
|
slices.Sort(configuredSvcIds)
|
||||||
|
|
||||||
|
for _, svcId := range configuredSvcIds {
|
||||||
|
svcDetails := svcDetailsById[svcId]
|
||||||
|
svcConfig := svcConfigs[svcId]
|
||||||
|
|
||||||
|
if svcDetails != nil {
|
||||||
|
metricsEnabled := svcConfig.Metrics != nil && svcConfig.Metrics.Enabled
|
||||||
|
logsEnabled := svcConfig.Logs != nil && svcConfig.Logs.Enabled
|
||||||
|
if logsEnabled || metricsEnabled {
|
||||||
|
err := agentConfig.TelemetryCollectionStrategy.AddServiceStrategy(
|
||||||
|
svcDetails.TelemetryCollectionStrategy, logsEnabled, metricsEnabled,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, model.InternalError(fmt.Errorf(
|
||||||
|
"couldn't add service telemetry collection strategy: %w", err,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return &AgentCheckInResponse{
|
return &AgentCheckInResponse{
|
||||||
Account: *account,
|
AccountId: account.Id,
|
||||||
|
CloudAccountId: *account.CloudAccountId,
|
||||||
|
RemovedAt: account.RemovedAt,
|
||||||
|
IntegrationConfig: agentConfig,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,8 +71,8 @@ func TestAgentCheckIns(t *testing.T) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
require.Nil(apiErr)
|
require.Nil(apiErr)
|
||||||
require.Equal(testAccountId1, resp1.Account.Id)
|
require.Equal(testAccountId1, resp1.AccountId)
|
||||||
require.Equal(testCloudAccountId1, *resp1.Account.CloudAccountId)
|
require.Equal(testCloudAccountId1, resp1.CloudAccountId)
|
||||||
|
|
||||||
// The agent should not be able to check in with a different
|
// The agent should not be able to check in with a different
|
||||||
// cloud account id for the same account.
|
// cloud account id for the same account.
|
||||||
@ -262,9 +262,10 @@ func makeTestConnectedAccount(t *testing.T, controller *Controller, cloudAccount
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
require.Nil(apiErr)
|
require.Nil(apiErr)
|
||||||
require.Equal(testAccountId, resp.Account.Id)
|
require.Equal(testAccountId, resp.AccountId)
|
||||||
require.Equal(cloudAccountId, *resp.Account.CloudAccountId)
|
require.Equal(cloudAccountId, resp.CloudAccountId)
|
||||||
|
|
||||||
return &resp.Account
|
|
||||||
|
|
||||||
|
acc, err := controller.accountsRepo.get(context.TODO(), "aws", resp.AccountId)
|
||||||
|
require.Nil(err)
|
||||||
|
return acc
|
||||||
}
|
}
|
||||||
|
@ -140,6 +140,8 @@ type CloudServiceDetails struct {
|
|||||||
DataCollected DataCollectedForService `json:"data_collected"`
|
DataCollected DataCollectedForService `json:"data_collected"`
|
||||||
|
|
||||||
ConnectionStatus *CloudServiceConnectionStatus `json:"status,omitempty"`
|
ConnectionStatus *CloudServiceConnectionStatus `json:"status,omitempty"`
|
||||||
|
|
||||||
|
TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry_collection_strategy"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CloudServiceConfig struct {
|
type CloudServiceConfig struct {
|
||||||
@ -216,3 +218,107 @@ type SignalConnectionStatus struct {
|
|||||||
LastReceivedTsMillis int64 `json:"last_received_ts_ms"` // epoch milliseconds
|
LastReceivedTsMillis int64 `json:"last_received_ts_ms"` // epoch milliseconds
|
||||||
LastReceivedFrom string `json:"last_received_from"` // resource identifier
|
LastReceivedFrom string `json:"last_received_from"` // resource identifier
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CloudTelemetryCollectionStrategy struct {
|
||||||
|
Provider string `json:"provider"`
|
||||||
|
|
||||||
|
AWSMetrics *AWSMetricsCollectionStrategy `json:"aws_metrics,omitempty"`
|
||||||
|
AWSLogs *AWSLogsCollectionStrategy `json:"aws_logs,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCloudTelemetryCollectionStrategy(provider string) (*CloudTelemetryCollectionStrategy, error) {
|
||||||
|
if provider == "aws" {
|
||||||
|
return &CloudTelemetryCollectionStrategy{
|
||||||
|
Provider: "aws",
|
||||||
|
AWSMetrics: &AWSMetricsCollectionStrategy{
|
||||||
|
CloudwatchMetricsStreamFilters: []CloudwatchMetricStreamFilter{},
|
||||||
|
},
|
||||||
|
AWSLogs: &AWSLogsCollectionStrategy{
|
||||||
|
CloudwatchLogsSubscriptions: []CloudwatchLogsSubscriptionConfig{},
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("unsupported cloud provider: %s", provider)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper for accumulating strategies for enabled services.
|
||||||
|
func (cs *CloudTelemetryCollectionStrategy) AddServiceStrategy(
|
||||||
|
svcStrategy *CloudTelemetryCollectionStrategy,
|
||||||
|
logsEnabled bool,
|
||||||
|
metricsEnabled bool,
|
||||||
|
) error {
|
||||||
|
if svcStrategy.Provider != cs.Provider {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"can't add %s service strategy to strategy for %s",
|
||||||
|
svcStrategy.Provider, cs.Provider,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cs.Provider == "aws" {
|
||||||
|
if logsEnabled {
|
||||||
|
cs.AWSLogs.AddServiceStrategy(svcStrategy.AWSLogs)
|
||||||
|
}
|
||||||
|
if metricsEnabled {
|
||||||
|
cs.AWSMetrics.AddServiceStrategy(svcStrategy.AWSMetrics)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("unsupported cloud provider: %s", cs.Provider)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
type AWSMetricsCollectionStrategy struct {
|
||||||
|
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
|
||||||
|
CloudwatchMetricsStreamFilters []CloudwatchMetricStreamFilter `json:"cloudwatch_metric_stream_filters"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CloudwatchMetricStreamFilter struct {
|
||||||
|
// json tags here are in the shape expected by AWS API as detailed at
|
||||||
|
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
|
||||||
|
Namespace string `json:"Namespace"`
|
||||||
|
MetricNames []string `json:"MetricNames,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (amc *AWSMetricsCollectionStrategy) AddServiceStrategy(
|
||||||
|
svcStrategy *AWSMetricsCollectionStrategy,
|
||||||
|
) error {
|
||||||
|
if svcStrategy == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
amc.CloudwatchMetricsStreamFilters = append(
|
||||||
|
amc.CloudwatchMetricsStreamFilters,
|
||||||
|
svcStrategy.CloudwatchMetricsStreamFilters...,
|
||||||
|
)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type AWSLogsCollectionStrategy struct {
|
||||||
|
CloudwatchLogsSubscriptions []CloudwatchLogsSubscriptionConfig `json:"cloudwatch_logs_subscriptions"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CloudwatchLogsSubscriptionConfig struct {
|
||||||
|
// subscribe to all logs groups with specified prefix.
|
||||||
|
// eg: `/aws/rds/`
|
||||||
|
LogGroupNamePrefix string `json:"log_group_name_prefix"`
|
||||||
|
|
||||||
|
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
|
||||||
|
// "" implies no filtering is required.
|
||||||
|
FilterPattern string `json:"filter_pattern"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (alc *AWSLogsCollectionStrategy) AddServiceStrategy(
|
||||||
|
svcStrategy *AWSLogsCollectionStrategy,
|
||||||
|
) error {
|
||||||
|
if svcStrategy == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
alc.CloudwatchLogsSubscriptions = append(
|
||||||
|
alc.CloudwatchLogsSubscriptions,
|
||||||
|
svcStrategy.CloudwatchLogsSubscriptions...,
|
||||||
|
)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -26,5 +26,14 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"logs": []
|
"logs": []
|
||||||
|
},
|
||||||
|
"telemetry_collection_strategy": {
|
||||||
|
"aws_metrics": {
|
||||||
|
"cloudwatch_metric_stream_filters": [
|
||||||
|
{
|
||||||
|
"Namespace": "AWS/EC2"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
Before Width: | Height: | Size: 2.7 KiB After Width: | Height: | Size: 2.7 KiB |
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"id": "rds-postgres",
|
"id": "rds",
|
||||||
"title": "RDS Postgres",
|
"title": "Amazon RDS",
|
||||||
"icon": "file://icon.svg",
|
"icon": "file://icon.svg",
|
||||||
"overview": "file://overview.md",
|
"overview": "file://overview.md",
|
||||||
"assets": {
|
"assets": {
|
||||||
@ -26,5 +26,22 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"logs": []
|
"logs": []
|
||||||
|
},
|
||||||
|
"telemetry_collection_strategy": {
|
||||||
|
"aws_metrics": {
|
||||||
|
"cloudwatch_metric_stream_filters": [
|
||||||
|
{
|
||||||
|
"Namespace": "AWS/RDS"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"aws_logs": {
|
||||||
|
"cloudwatch_logs_subscriptions": [
|
||||||
|
{
|
||||||
|
"log_group_name_prefix": "/aws/rds",
|
||||||
|
"filter_pattern": ""
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -4,6 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -23,7 +24,6 @@ import (
|
|||||||
func TestAWSIntegrationAccountLifecycle(t *testing.T) {
|
func TestAWSIntegrationAccountLifecycle(t *testing.T) {
|
||||||
// Test for happy path of connecting and managing AWS integration accounts
|
// Test for happy path of connecting and managing AWS integration accounts
|
||||||
|
|
||||||
t0 := time.Now()
|
|
||||||
require := require.New(t)
|
require := require.New(t)
|
||||||
testbed := NewCloudIntegrationsTestBed(t, nil)
|
testbed := NewCloudIntegrationsTestBed(t, nil)
|
||||||
|
|
||||||
@ -67,11 +67,9 @@ func TestAWSIntegrationAccountLifecycle(t *testing.T) {
|
|||||||
CloudAccountId: testAWSAccountId,
|
CloudAccountId: testAWSAccountId,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
require.Equal(testAccountId, agentCheckInResp.Account.Id)
|
require.Equal(testAccountId, agentCheckInResp.AccountId)
|
||||||
require.Equal(testAccountConfig, *agentCheckInResp.Account.Config)
|
require.Equal(testAWSAccountId, agentCheckInResp.CloudAccountId)
|
||||||
require.Equal(testAWSAccountId, *agentCheckInResp.Account.CloudAccountId)
|
require.Nil(agentCheckInResp.RemovedAt)
|
||||||
require.LessOrEqual(t0.Unix(), agentCheckInResp.Account.CreatedAt.Unix())
|
|
||||||
require.Nil(agentCheckInResp.Account.RemovedAt)
|
|
||||||
|
|
||||||
// Polling for connection status from UI should now return latest status
|
// Polling for connection status from UI should now return latest status
|
||||||
accountStatusResp1 := testbed.GetAccountStatusFromQS("aws", testAccountId)
|
accountStatusResp1 := testbed.GetAccountStatusFromQS("aws", testAccountId)
|
||||||
@ -107,10 +105,9 @@ func TestAWSIntegrationAccountLifecycle(t *testing.T) {
|
|||||||
CloudAccountId: testAWSAccountId,
|
CloudAccountId: testAWSAccountId,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
require.Equal(testAccountId, agentCheckInResp1.Account.Id)
|
require.Equal(testAccountId, agentCheckInResp1.AccountId)
|
||||||
require.Equal(testAccountConfig2, *agentCheckInResp1.Account.Config)
|
require.Equal(testAWSAccountId, agentCheckInResp1.CloudAccountId)
|
||||||
require.Equal(testAWSAccountId, *agentCheckInResp1.Account.CloudAccountId)
|
require.Nil(agentCheckInResp1.RemovedAt)
|
||||||
require.Nil(agentCheckInResp1.Account.RemovedAt)
|
|
||||||
|
|
||||||
// Should be able to disconnect/remove account from UI.
|
// Should be able to disconnect/remove account from UI.
|
||||||
tsBeforeDisconnect := time.Now()
|
tsBeforeDisconnect := time.Now()
|
||||||
@ -125,9 +122,9 @@ func TestAWSIntegrationAccountLifecycle(t *testing.T) {
|
|||||||
CloudAccountId: testAWSAccountId,
|
CloudAccountId: testAWSAccountId,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
require.Equal(testAccountId, agentCheckInResp2.Account.Id)
|
require.Equal(testAccountId, agentCheckInResp2.AccountId)
|
||||||
require.Equal(testAWSAccountId, *agentCheckInResp2.Account.CloudAccountId)
|
require.Equal(testAWSAccountId, agentCheckInResp2.CloudAccountId)
|
||||||
require.LessOrEqual(tsBeforeDisconnect, *agentCheckInResp2.Account.RemovedAt)
|
require.LessOrEqual(tsBeforeDisconnect, *agentCheckInResp2.RemovedAt)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAWSIntegrationServices(t *testing.T) {
|
func TestAWSIntegrationServices(t *testing.T) {
|
||||||
@ -194,6 +191,149 @@ func TestAWSIntegrationServices(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConfigReturnedWhenAgentChecksIn(t *testing.T) {
|
||||||
|
require := require.New(t)
|
||||||
|
|
||||||
|
testbed := NewCloudIntegrationsTestBed(t, nil)
|
||||||
|
|
||||||
|
// configure a connected account
|
||||||
|
testAccountConfig := cloudintegrations.AccountConfig{
|
||||||
|
EnabledRegions: []string{"us-east-1", "us-east-2"},
|
||||||
|
}
|
||||||
|
connectionUrlResp := testbed.GenerateConnectionUrlFromQS(
|
||||||
|
"aws", cloudintegrations.GenerateConnectionUrlRequest{
|
||||||
|
AgentConfig: cloudintegrations.SigNozAgentConfig{
|
||||||
|
Region: "us-east-1",
|
||||||
|
SigNozAPIKey: "test-api-key",
|
||||||
|
},
|
||||||
|
AccountConfig: testAccountConfig,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
testAccountId := connectionUrlResp.AccountId
|
||||||
|
require.NotEmpty(testAccountId)
|
||||||
|
require.NotEmpty(connectionUrlResp.ConnectionUrl)
|
||||||
|
|
||||||
|
testAWSAccountId := "389389489489"
|
||||||
|
checkinResp := testbed.CheckInAsAgentWithQS(
|
||||||
|
"aws", cloudintegrations.AgentCheckInRequest{
|
||||||
|
AccountId: testAccountId,
|
||||||
|
CloudAccountId: testAWSAccountId,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
require.Equal(testAccountId, checkinResp.AccountId)
|
||||||
|
require.Equal(testAWSAccountId, checkinResp.CloudAccountId)
|
||||||
|
require.Nil(checkinResp.RemovedAt)
|
||||||
|
require.Equal(testAccountConfig.EnabledRegions, checkinResp.IntegrationConfig.EnabledRegions)
|
||||||
|
|
||||||
|
telemetryCollectionStrategy := checkinResp.IntegrationConfig.TelemetryCollectionStrategy
|
||||||
|
require.Equal("aws", telemetryCollectionStrategy.Provider)
|
||||||
|
require.NotNil(telemetryCollectionStrategy.AWSMetrics)
|
||||||
|
require.Empty(telemetryCollectionStrategy.AWSMetrics.CloudwatchMetricsStreamFilters)
|
||||||
|
require.NotNil(telemetryCollectionStrategy.AWSLogs)
|
||||||
|
require.Empty(telemetryCollectionStrategy.AWSLogs.CloudwatchLogsSubscriptions)
|
||||||
|
|
||||||
|
// helper
|
||||||
|
setServiceConfig := func(svcId string, metricsEnabled bool, logsEnabled bool) {
|
||||||
|
testSvcConfig := cloudintegrations.CloudServiceConfig{}
|
||||||
|
if metricsEnabled {
|
||||||
|
testSvcConfig.Metrics = &cloudintegrations.CloudServiceMetricsConfig{
|
||||||
|
Enabled: metricsEnabled,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if logsEnabled {
|
||||||
|
testSvcConfig.Logs = &cloudintegrations.CloudServiceLogsConfig{
|
||||||
|
Enabled: logsEnabled,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
updateSvcConfigResp := testbed.UpdateServiceConfigWithQS("aws", svcId, cloudintegrations.UpdateServiceConfigRequest{
|
||||||
|
CloudAccountId: testAWSAccountId,
|
||||||
|
Config: testSvcConfig,
|
||||||
|
})
|
||||||
|
require.Equal(svcId, updateSvcConfigResp.Id)
|
||||||
|
require.Equal(testSvcConfig, updateSvcConfigResp.Config)
|
||||||
|
}
|
||||||
|
|
||||||
|
setServiceConfig("ec2", true, false)
|
||||||
|
setServiceConfig("rds", true, true)
|
||||||
|
|
||||||
|
checkinResp = testbed.CheckInAsAgentWithQS(
|
||||||
|
"aws", cloudintegrations.AgentCheckInRequest{
|
||||||
|
AccountId: testAccountId,
|
||||||
|
CloudAccountId: testAWSAccountId,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
require.Equal(testAccountId, checkinResp.AccountId)
|
||||||
|
require.Equal(testAWSAccountId, checkinResp.CloudAccountId)
|
||||||
|
require.Nil(checkinResp.RemovedAt)
|
||||||
|
|
||||||
|
integrationConf := checkinResp.IntegrationConfig
|
||||||
|
require.Equal(testAccountConfig.EnabledRegions, integrationConf.EnabledRegions)
|
||||||
|
|
||||||
|
telemetryCollectionStrategy = integrationConf.TelemetryCollectionStrategy
|
||||||
|
require.Equal("aws", telemetryCollectionStrategy.Provider)
|
||||||
|
require.NotNil(telemetryCollectionStrategy.AWSMetrics)
|
||||||
|
metricStreamNamespaces := []string{}
|
||||||
|
for _, f := range telemetryCollectionStrategy.AWSMetrics.CloudwatchMetricsStreamFilters {
|
||||||
|
metricStreamNamespaces = append(metricStreamNamespaces, f.Namespace)
|
||||||
|
}
|
||||||
|
require.Equal([]string{"AWS/EC2", "AWS/RDS"}, metricStreamNamespaces)
|
||||||
|
|
||||||
|
require.NotNil(telemetryCollectionStrategy.AWSLogs)
|
||||||
|
logGroupPrefixes := []string{}
|
||||||
|
for _, f := range telemetryCollectionStrategy.AWSLogs.CloudwatchLogsSubscriptions {
|
||||||
|
logGroupPrefixes = append(logGroupPrefixes, f.LogGroupNamePrefix)
|
||||||
|
}
|
||||||
|
require.Equal(1, len(logGroupPrefixes))
|
||||||
|
require.True(strings.HasPrefix(logGroupPrefixes[0], "/aws/rds"))
|
||||||
|
|
||||||
|
// change regions and update service configs and validate config changes for agent
|
||||||
|
testAccountConfig2 := cloudintegrations.AccountConfig{
|
||||||
|
EnabledRegions: []string{"us-east-2", "us-west-1"},
|
||||||
|
}
|
||||||
|
latestAccount := testbed.UpdateAccountConfigWithQS(
|
||||||
|
"aws", testAccountId, testAccountConfig2,
|
||||||
|
)
|
||||||
|
require.Equal(testAccountId, latestAccount.Id)
|
||||||
|
require.Equal(testAccountConfig2, *latestAccount.Config)
|
||||||
|
|
||||||
|
// disable metrics for one and logs for the other.
|
||||||
|
// config should be as expected.
|
||||||
|
setServiceConfig("ec2", false, false)
|
||||||
|
setServiceConfig("rds", true, false)
|
||||||
|
|
||||||
|
checkinResp = testbed.CheckInAsAgentWithQS(
|
||||||
|
"aws", cloudintegrations.AgentCheckInRequest{
|
||||||
|
AccountId: testAccountId,
|
||||||
|
CloudAccountId: testAWSAccountId,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
require.Equal(testAccountId, checkinResp.AccountId)
|
||||||
|
require.Equal(testAWSAccountId, checkinResp.CloudAccountId)
|
||||||
|
require.Nil(checkinResp.RemovedAt)
|
||||||
|
integrationConf = checkinResp.IntegrationConfig
|
||||||
|
require.Equal(testAccountConfig2.EnabledRegions, integrationConf.EnabledRegions)
|
||||||
|
|
||||||
|
telemetryCollectionStrategy = integrationConf.TelemetryCollectionStrategy
|
||||||
|
require.Equal("aws", telemetryCollectionStrategy.Provider)
|
||||||
|
require.NotNil(telemetryCollectionStrategy.AWSMetrics)
|
||||||
|
metricStreamNamespaces = []string{}
|
||||||
|
for _, f := range telemetryCollectionStrategy.AWSMetrics.CloudwatchMetricsStreamFilters {
|
||||||
|
metricStreamNamespaces = append(metricStreamNamespaces, f.Namespace)
|
||||||
|
}
|
||||||
|
require.Equal([]string{"AWS/RDS"}, metricStreamNamespaces)
|
||||||
|
|
||||||
|
require.NotNil(telemetryCollectionStrategy.AWSLogs)
|
||||||
|
logGroupPrefixes = []string{}
|
||||||
|
for _, f := range telemetryCollectionStrategy.AWSLogs.CloudwatchLogsSubscriptions {
|
||||||
|
logGroupPrefixes = append(logGroupPrefixes, f.LogGroupNamePrefix)
|
||||||
|
}
|
||||||
|
require.Equal(0, len(logGroupPrefixes))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
type CloudIntegrationsTestBed struct {
|
type CloudIntegrationsTestBed struct {
|
||||||
t *testing.T
|
t *testing.T
|
||||||
testUser *model.User
|
testUser *model.User
|
||||||
@ -412,7 +552,7 @@ func RequestQSAndParseResp[ResponseType any](
|
|||||||
|
|
||||||
err := json.Unmarshal(respDataJson, &resp)
|
err := json.Unmarshal(respDataJson, &resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tb.t.Fatalf("could not unmarshal apiResponse.Data json into %T", resp)
|
tb.t.Fatalf("could not unmarshal apiResponse.Data json into %T: %v", resp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &resp
|
return &resp
|
||||||
|
Loading…
x
Reference in New Issue
Block a user