feat: S3 Sync (AWS Integrations) (#7718)

This commit is contained in:
Piyush Singariya 2025-05-14 05:12:41 +05:30 committed by GitHub
parent 9c0134da54
commit 03ab6e704b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
46 changed files with 495 additions and 346 deletions

View File

@ -141,3 +141,27 @@ func Join(errs ...error) error {
func As(err error, target any) bool {
return errors.As(err, target)
}
func WrapNotFoundf(cause error, code Code, format string, args ...interface{}) *base {
return Wrapf(cause, TypeNotFound, code, format, args...)
}
func NewNotFoundf(code Code, format string, args ...interface{}) *base {
return Newf(TypeNotFound, code, format, args...)
}
func WrapInternalf(cause error, code Code, format string, args ...interface{}) *base {
return Wrapf(cause, TypeInternal, code, format, args...)
}
func NewInternalf(code Code, format string, args ...interface{}) *base {
return Newf(TypeInternal, code, format, args...)
}
func WrapInvalidInputf(cause error, code Code, format string, args ...interface{}) *base {
return Wrapf(cause, TypeInvalidInput, code, format, args...)
}
func NewInvalidInputf(code Code, format string, args ...interface{}) *base {
return Newf(TypeInvalidInput, code, format, args...)
}

View File

@ -0,0 +1,44 @@
package cloudintegrations
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/aws/aws-sdk-go/aws/endpoints"
)
var (
CodeInvalidCloudRegion = errors.MustNewCode("invalid_cloud_region")
CodeMismatchCloudProvider = errors.MustNewCode("cloud_provider_mismatch")
)
// List of all valid cloud regions on Amazon Web Services
var ValidAWSRegions = map[string]bool{
endpoints.AfSouth1RegionID: true,
endpoints.ApEast1RegionID: true,
endpoints.ApNortheast1RegionID: true,
endpoints.ApNortheast2RegionID: true,
endpoints.ApNortheast3RegionID: true,
endpoints.ApSouth1RegionID: true,
endpoints.ApSouth2RegionID: true,
endpoints.ApSoutheast1RegionID: true,
endpoints.ApSoutheast2RegionID: true,
endpoints.ApSoutheast3RegionID: true,
endpoints.ApSoutheast4RegionID: true,
endpoints.CaCentral1RegionID: true,
endpoints.CaWest1RegionID: true,
endpoints.EuCentral1RegionID: true,
endpoints.EuCentral2RegionID: true,
endpoints.EuNorth1RegionID: true,
endpoints.EuSouth1RegionID: true,
endpoints.EuSouth2RegionID: true,
endpoints.EuWest1RegionID: true,
endpoints.EuWest2RegionID: true,
endpoints.EuWest3RegionID: true,
endpoints.IlCentral1RegionID: true,
endpoints.MeCentral1RegionID: true,
endpoints.MeSouth1RegionID: true,
endpoints.SaEast1RegionID: true,
endpoints.UsEast1RegionID: true,
endpoints.UsEast2RegionID: true,
endpoints.UsWest1RegionID: true,
endpoints.UsWest2RegionID: true,
}

View File

@ -8,6 +8,8 @@ import (
"strings"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
@ -27,7 +29,7 @@ func validateCloudProviderName(name string) *model.ApiError {
type Controller struct {
accountsRepo cloudProviderAccountsRepository
serviceConfigRepo serviceConfigRepository
serviceConfigRepo ServiceConfigDatabase
}
func NewController(sqlStore sqlstore.SQLStore) (
@ -117,7 +119,7 @@ func (c *Controller) GenerateConnectionUrl(
}
// TODO(Raj): parameterized this in follow up changes
agentVersion := "0.0.3"
agentVersion := "v0.0.4"
connectionUrl := fmt.Sprintf(
"https://%s.console.aws.amazon.com/cloudformation/home?region=%s#/stacks/quickcreate?",
@ -193,12 +195,12 @@ type AgentCheckInResponse struct {
type IntegrationConfigForAgent struct {
EnabledRegions []string `json:"enabled_regions"`
TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry,omitempty"`
TelemetryCollectionStrategy *CompiledCollectionStrategy `json:"telemetry,omitempty"`
}
func (c *Controller) CheckInAsAgent(
ctx context.Context, orgId string, cloudProvider string, req AgentCheckInRequest,
) (*AgentCheckInResponse, *model.ApiError) {
) (*AgentCheckInResponse, error) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
@ -232,7 +234,7 @@ func (c *Controller) CheckInAsAgent(
}
// prepare and return integration config to be consumed by agent
telemetryCollectionStrategy, err := NewCloudTelemetryCollectionStrategy(cloudProvider)
compiledStrategy, err := NewCompiledCollectionStrategy(cloudProvider)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't init telemetry collection strategy: %w", err,
@ -241,20 +243,16 @@ func (c *Controller) CheckInAsAgent(
agentConfig := IntegrationConfigForAgent{
EnabledRegions: []string{},
TelemetryCollectionStrategy: telemetryCollectionStrategy,
TelemetryCollectionStrategy: compiledStrategy,
}
if account.Config != nil && account.Config.EnabledRegions != nil {
agentConfig.EnabledRegions = account.Config.EnabledRegions
}
services, apiErr := listCloudProviderServices(cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud services")
}
svcDetailsById := map[string]*CloudServiceDetails{}
for _, svcDetails := range services {
svcDetailsById[svcDetails.Id] = &svcDetails
services, err := services.Map(cloudProvider)
if err != nil {
return nil, err
}
svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount(
@ -267,26 +265,19 @@ func (c *Controller) CheckInAsAgent(
}
// accumulate config in a fixed order to ensure same config generated across runs
configuredSvcIds := maps.Keys(svcConfigs)
slices.Sort(configuredSvcIds)
configuredServices := maps.Keys(svcConfigs)
slices.Sort(configuredServices)
for _, svcId := range configuredSvcIds {
svcDetails := svcDetailsById[svcId]
svcConfig := svcConfigs[svcId]
for _, svcType := range configuredServices {
definition, ok := services[svcType]
if !ok {
continue
}
config := svcConfigs[svcType]
if svcDetails != nil {
metricsEnabled := svcConfig.Metrics != nil && svcConfig.Metrics.Enabled
logsEnabled := svcConfig.Logs != nil && svcConfig.Logs.Enabled
if logsEnabled || metricsEnabled {
err := agentConfig.TelemetryCollectionStrategy.AddServiceStrategy(
svcDetails.TelemetryCollectionStrategy, logsEnabled, metricsEnabled,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't add service telemetry collection strategy: %w", err,
))
}
}
err := AddServiceStrategy(svcType, compiledStrategy, definition.Strategy, config)
if err != nil {
return nil, err
}
}
@ -349,7 +340,7 @@ func (c *Controller) DisconnectAccount(
}
type ListServicesResponse struct {
Services []CloudServiceSummary `json:"services"`
Services []ServiceSummary `json:"services"`
}
func (c *Controller) ListServices(
@ -358,12 +349,11 @@ func (c *Controller) ListServices(
cloudProvider string,
cloudAccountId *string,
) (*ListServicesResponse, *model.ApiError) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
services, apiErr := listCloudProviderServices(cloudProvider)
definitions, apiErr := services.List(cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud services")
}
@ -386,9 +376,11 @@ func (c *Controller) ListServices(
}
}
summaries := []CloudServiceSummary{}
for _, s := range services {
summary := s.CloudServiceSummary
summaries := []ServiceSummary{}
for _, def := range definitions {
summary := ServiceSummary{
Metadata: def.Metadata,
}
summary.Config = svcConfigs[summary.Id]
summaries = append(summaries, summary)
@ -405,15 +397,18 @@ func (c *Controller) GetServiceDetails(
cloudProvider string,
serviceId string,
cloudAccountId *string,
) (*CloudServiceDetails, *model.ApiError) {
) (*ServiceDetails, error) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
service, apiErr := getCloudProviderService(cloudProvider, serviceId)
if apiErr != nil {
return nil, apiErr
definition, err := services.GetServiceDefinition(cloudProvider, serviceId)
if err != nil {
return nil, err
}
details := ServiceDetails{
Definition: *definition,
}
if cloudAccountId != nil {
@ -433,7 +428,7 @@ func (c *Controller) GetServiceDetails(
}
if config != nil {
service.Config = config
details.Config = config
enabled := false
if config.Metrics != nil && config.Metrics.Enabled {
@ -441,22 +436,20 @@ func (c *Controller) GetServiceDetails(
}
// add links to service dashboards, making them clickable.
for i, d := range service.Assets.Dashboards {
for i, d := range definition.Assets.Dashboards {
dashboardUuid := c.dashboardUuid(
cloudProvider, serviceId, d.Id,
)
if enabled {
service.Assets.Dashboards[i].Url = fmt.Sprintf(
"/dashboard/%s", dashboardUuid,
)
definition.Assets.Dashboards[i].Url = fmt.Sprintf("/dashboard/%s", dashboardUuid)
} else {
service.Assets.Dashboards[i].Url = ""
definition.Assets.Dashboards[i].Url = "" // to unset the in-memory URL if enabled once and disabled afterwards
}
}
}
}
return service, nil
return &details, nil
}
type UpdateServiceConfigRequest struct {
@ -464,6 +457,20 @@ type UpdateServiceConfigRequest struct {
Config types.CloudServiceConfig `json:"config"`
}
func (u *UpdateServiceConfigRequest) Validate(def *services.Definition) error {
if def.Id != services.S3Sync && u.Config.Logs != nil && u.Config.Logs.S3Buckets != nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "s3 buckets can only be added to service-type[%s]", services.S3Sync)
} else if def.Id == services.S3Sync && u.Config.Logs != nil && u.Config.Logs.S3Buckets != nil {
for region := range u.Config.Logs.S3Buckets {
if _, found := ValidAWSRegions[region]; !found {
return errors.NewInvalidInputf(CodeInvalidCloudRegion, "invalid cloud region: %s", region)
}
}
}
return nil
}
type UpdateServiceConfigResponse struct {
Id string `json:"id"`
Config types.CloudServiceConfig `json:"config"`
@ -473,14 +480,23 @@ func (c *Controller) UpdateServiceConfig(
ctx context.Context,
orgID string,
cloudProvider string,
serviceId string,
req UpdateServiceConfigRequest,
) (*UpdateServiceConfigResponse, *model.ApiError) {
serviceType string,
req *UpdateServiceConfigRequest,
) (*UpdateServiceConfigResponse, error) {
if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
return nil, apiErr
}
// can only update config for a valid service.
definition, err := services.GetServiceDefinition(cloudProvider, serviceType)
if err != nil {
return nil, err
}
if err := req.Validate(definition); err != nil {
return nil, err
}
// can only update config for a connected cloud account id
_, apiErr := c.accountsRepo.getConnectedCloudAccount(
ctx, orgID, cloudProvider, req.CloudAccountId,
@ -489,21 +505,15 @@ func (c *Controller) UpdateServiceConfig(
return nil, model.WrapApiError(apiErr, "couldn't find connected cloud account")
}
// can only update config for a valid service.
_, apiErr = getCloudProviderService(cloudProvider, serviceId)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "unsupported service")
}
updatedConfig, apiErr := c.serviceConfigRepo.upsert(
ctx, orgID, cloudProvider, req.CloudAccountId, serviceId, req.Config,
ctx, orgID, cloudProvider, req.CloudAccountId, serviceType, req.Config,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't update service config")
}
return &UpdateServiceConfigResponse{
Id: serviceId,
Id: serviceType,
Config: *updatedConfig,
}, nil
}
@ -558,7 +568,7 @@ func (c *Controller) AvailableDashboardsForCloudProvider(
}
}
allServices, apiErr := listCloudProviderServices(cloudProvider)
allServices, apiErr := services.List(cloudProvider)
if apiErr != nil {
return nil, apiErr
}

View File

@ -76,26 +76,26 @@ func TestAgentCheckIns(t *testing.T) {
// if no connection url was requested (no account with agent's account id exists)
testAccountId1 := uuid.NewString()
testCloudAccountId1 := "546311234"
resp1, apiErr := controller.CheckInAsAgent(
resp1, err := controller.CheckInAsAgent(
context.TODO(), user.OrgID, "aws", AgentCheckInRequest{
ID: testAccountId1,
AccountID: testCloudAccountId1,
},
)
require.Nil(apiErr)
require.Nil(err)
require.Equal(testAccountId1, resp1.AccountId)
require.Equal(testCloudAccountId1, resp1.CloudAccountId)
// The agent should not be able to check in with a different
// cloud account id for the same account.
testCloudAccountId2 := "99999999"
_, apiErr = controller.CheckInAsAgent(
_, err = controller.CheckInAsAgent(
context.TODO(), user.OrgID, "aws", AgentCheckInRequest{
ID: testAccountId1,
AccountID: testCloudAccountId2,
},
)
require.NotNil(apiErr)
require.NotNil(err)
// The agent should not be able to check-in with a particular cloud account id
// if another connected AccountRecord exists for same cloud account
@ -110,13 +110,13 @@ func TestAgentCheckIns(t *testing.T) {
require.Nil(existingConnected.RemovedAt)
testAccountId2 := uuid.NewString()
_, apiErr = controller.CheckInAsAgent(
_, err = controller.CheckInAsAgent(
context.TODO(), user.OrgID, "aws", AgentCheckInRequest{
ID: testAccountId2,
AccountID: testCloudAccountId1,
},
)
require.NotNil(apiErr)
require.NotNil(err)
// After disconnecting existing account record, the agent should be able to
// connected for a particular cloud account id
@ -131,22 +131,22 @@ func TestAgentCheckIns(t *testing.T) {
require.NotNil(apiErr)
require.Equal(model.ErrorNotFound, apiErr.Type())
_, apiErr = controller.CheckInAsAgent(
_, err = controller.CheckInAsAgent(
context.TODO(), user.OrgID, "aws", AgentCheckInRequest{
ID: testAccountId2,
AccountID: testCloudAccountId1,
},
)
require.Nil(apiErr)
require.Nil(err)
// should be able to keep checking in
_, apiErr = controller.CheckInAsAgent(
_, err = controller.CheckInAsAgent(
context.TODO(), user.OrgID, "aws", AgentCheckInRequest{
ID: testAccountId2,
AccountID: testCloudAccountId1,
},
)
require.Nil(apiErr)
require.Nil(err)
}
func TestCantDisconnectNonExistentAccount(t *testing.T) {
@ -194,10 +194,10 @@ func TestConfigureService(t *testing.T) {
testSvcId := svcListResp.Services[0].Id
require.Nil(svcListResp.Services[0].Config)
svcDetails, apiErr := controller.GetServiceDetails(
svcDetails, err := controller.GetServiceDetails(
context.TODO(), user.OrgID, "aws", testSvcId, &testCloudAccountId,
)
require.Nil(apiErr)
require.Nil(err)
require.Equal(testSvcId, svcDetails.Id)
require.Nil(svcDetails.Config)
@ -207,20 +207,20 @@ func TestConfigureService(t *testing.T) {
Enabled: true,
},
}
updateSvcConfigResp, apiErr := controller.UpdateServiceConfig(
context.TODO(), user.OrgID, "aws", testSvcId, UpdateServiceConfigRequest{
updateSvcConfigResp, err := controller.UpdateServiceConfig(
context.TODO(), user.OrgID, "aws", testSvcId, &UpdateServiceConfigRequest{
CloudAccountId: testCloudAccountId,
Config: testSvcConfig,
},
)
require.Nil(apiErr)
require.Nil(err)
require.Equal(testSvcId, updateSvcConfigResp.Id)
require.Equal(testSvcConfig, updateSvcConfigResp.Config)
svcDetails, apiErr = controller.GetServiceDetails(
svcDetails, err = controller.GetServiceDetails(
context.TODO(), user.OrgID, "aws", testSvcId, &testCloudAccountId,
)
require.Nil(apiErr)
require.Nil(err)
require.Equal(testSvcId, svcDetails.Id)
require.Equal(testSvcConfig, *svcDetails.Config)
@ -240,33 +240,33 @@ func TestConfigureService(t *testing.T) {
)
require.Nil(apiErr)
_, apiErr = controller.UpdateServiceConfig(
_, err = controller.UpdateServiceConfig(
context.TODO(), user.OrgID, "aws", testSvcId,
UpdateServiceConfigRequest{
&UpdateServiceConfigRequest{
CloudAccountId: testCloudAccountId,
Config: testSvcConfig,
},
)
require.NotNil(apiErr)
require.NotNil(err)
// should not be able to configure a service for a cloud account id that is not connected yet
_, apiErr = controller.UpdateServiceConfig(
_, err = controller.UpdateServiceConfig(
context.TODO(), user.OrgID, "aws", testSvcId,
UpdateServiceConfigRequest{
&UpdateServiceConfigRequest{
CloudAccountId: "9999999999",
Config: testSvcConfig,
},
)
require.NotNil(apiErr)
require.NotNil(err)
// should not be able to set config for an unsupported service
_, apiErr = controller.UpdateServiceConfig(
context.TODO(), user.OrgID, "aws", "bad-service", UpdateServiceConfigRequest{
_, err = controller.UpdateServiceConfig(
context.TODO(), user.OrgID, "aws", "bad-service", &UpdateServiceConfigRequest{
CloudAccountId: testCloudAccountId,
Config: testSvcConfig,
},
)
require.NotNil(apiErr)
require.NotNil(err)
}

View File

@ -1,183 +0,0 @@
package cloudintegrations
import (
"fmt"
"github.com/SigNoz/signoz/pkg/types"
)
type CloudServiceSummary struct {
Id string `json:"id"`
Title string `json:"title"`
Icon string `json:"icon"`
// Present only if the service has been configured in the
// context of a cloud provider account.
Config *types.CloudServiceConfig `json:"config,omitempty"`
}
type CloudServiceDetails struct {
CloudServiceSummary
Overview string `json:"overview"` // markdown
Assets CloudServiceAssets `json:"assets"`
SupportedSignals SupportedSignals `json:"supported_signals"`
DataCollected DataCollectedForService `json:"data_collected"`
ConnectionStatus *CloudServiceConnectionStatus `json:"status,omitempty"`
TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry_collection_strategy"`
}
type CloudServiceAssets struct {
Dashboards []CloudServiceDashboard `json:"dashboards"`
}
type CloudServiceDashboard struct {
Id string `json:"id"`
Url string `json:"url"`
Title string `json:"title"`
Description string `json:"description"`
Image string `json:"image"`
Definition *types.DashboardData `json:"definition,omitempty"`
}
type SupportedSignals struct {
Logs bool `json:"logs"`
Metrics bool `json:"metrics"`
}
type DataCollectedForService struct {
Logs []CollectedLogAttribute `json:"logs"`
Metrics []CollectedMetric `json:"metrics"`
}
type CollectedLogAttribute struct {
Name string `json:"name"`
Path string `json:"path"`
Type string `json:"type"`
}
type CollectedMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Unit string `json:"unit"`
Description string `json:"description"`
}
type CloudServiceConnectionStatus struct {
Logs *SignalConnectionStatus `json:"logs"`
Metrics *SignalConnectionStatus `json:"metrics"`
}
type SignalConnectionStatus struct {
LastReceivedTsMillis int64 `json:"last_received_ts_ms"` // epoch milliseconds
LastReceivedFrom string `json:"last_received_from"` // resource identifier
}
type CloudTelemetryCollectionStrategy struct {
Provider string `json:"provider"`
AWSMetrics *AWSMetricsCollectionStrategy `json:"aws_metrics,omitempty"`
AWSLogs *AWSLogsCollectionStrategy `json:"aws_logs,omitempty"`
}
func NewCloudTelemetryCollectionStrategy(provider string) (*CloudTelemetryCollectionStrategy, error) {
if provider == "aws" {
return &CloudTelemetryCollectionStrategy{
Provider: "aws",
AWSMetrics: &AWSMetricsCollectionStrategy{
CloudwatchMetricsStreamFilters: []CloudwatchMetricStreamFilter{},
},
AWSLogs: &AWSLogsCollectionStrategy{
CloudwatchLogsSubscriptions: []CloudwatchLogsSubscriptionConfig{},
},
}, nil
}
return nil, fmt.Errorf("unsupported cloud provider: %s", provider)
}
// Helper for accumulating strategies for enabled services.
func (cs *CloudTelemetryCollectionStrategy) AddServiceStrategy(
svcStrategy *CloudTelemetryCollectionStrategy,
logsEnabled bool,
metricsEnabled bool,
) error {
if svcStrategy.Provider != cs.Provider {
return fmt.Errorf(
"can't add %s service strategy to strategy for %s",
svcStrategy.Provider, cs.Provider,
)
}
if cs.Provider == "aws" {
if logsEnabled {
cs.AWSLogs.AddServiceStrategy(svcStrategy.AWSLogs)
}
if metricsEnabled {
cs.AWSMetrics.AddServiceStrategy(svcStrategy.AWSMetrics)
}
return nil
}
return fmt.Errorf("unsupported cloud provider: %s", cs.Provider)
}
type AWSMetricsCollectionStrategy struct {
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
CloudwatchMetricsStreamFilters []CloudwatchMetricStreamFilter `json:"cloudwatch_metric_stream_filters"`
}
type CloudwatchMetricStreamFilter struct {
// json tags here are in the shape expected by AWS API as detailed at
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
Namespace string `json:"Namespace"`
MetricNames []string `json:"MetricNames,omitempty"`
}
func (amc *AWSMetricsCollectionStrategy) AddServiceStrategy(
svcStrategy *AWSMetricsCollectionStrategy,
) error {
if svcStrategy == nil {
return nil
}
amc.CloudwatchMetricsStreamFilters = append(
amc.CloudwatchMetricsStreamFilters,
svcStrategy.CloudwatchMetricsStreamFilters...,
)
return nil
}
type AWSLogsCollectionStrategy struct {
CloudwatchLogsSubscriptions []CloudwatchLogsSubscriptionConfig `json:"cloudwatch_logs_subscriptions"`
}
type CloudwatchLogsSubscriptionConfig struct {
// subscribe to all logs groups with specified prefix.
// eg: `/aws/rds/`
LogGroupNamePrefix string `json:"log_group_name_prefix"`
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
// "" implies no filtering is required.
FilterPattern string `json:"filter_pattern"`
}
func (alc *AWSLogsCollectionStrategy) AddServiceStrategy(
svcStrategy *AWSLogsCollectionStrategy,
) error {
if svcStrategy == nil {
return nil
}
alc.CloudwatchLogsSubscriptions = append(
alc.CloudwatchLogsSubscriptions,
svcStrategy.CloudwatchLogsSubscriptions...,
)
return nil
}

View File

@ -0,0 +1,94 @@
package cloudintegrations
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/types"
)
type ServiceSummary struct {
services.Metadata
Config *types.CloudServiceConfig `json:"config"`
}
type ServiceDetails struct {
services.Definition
Config *types.CloudServiceConfig `json:"config"`
ConnectionStatus *ServiceConnectionStatus `json:"status,omitempty"`
}
type AccountStatus struct {
Integration AccountIntegrationStatus `json:"integration"`
}
type AccountIntegrationStatus struct {
LastHeartbeatTsMillis *int64 `json:"last_heartbeat_ts_ms"`
}
type LogsConfig struct {
Enabled bool `json:"enabled"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"`
}
type MetricsConfig struct {
Enabled bool `json:"enabled"`
}
type ServiceConnectionStatus struct {
Logs *SignalConnectionStatus `json:"logs"`
Metrics *SignalConnectionStatus `json:"metrics"`
}
type SignalConnectionStatus struct {
LastReceivedTsMillis int64 `json:"last_received_ts_ms"` // epoch milliseconds
LastReceivedFrom string `json:"last_received_from"` // resource identifier
}
type CompiledCollectionStrategy = services.CollectionStrategy
func NewCompiledCollectionStrategy(provider string) (*CompiledCollectionStrategy, error) {
if provider == "aws" {
return &CompiledCollectionStrategy{
Provider: "aws",
AWSMetrics: &services.AWSMetricsStrategy{},
AWSLogs: &services.AWSLogsStrategy{},
}, nil
}
return nil, errors.NewNotFoundf(services.CodeUnsupportedCloudProvider, "unsupported cloud provider: %s", provider)
}
// Helper for accumulating strategies for enabled services.
func AddServiceStrategy(serviceType string, cs *CompiledCollectionStrategy,
definitionStrat *services.CollectionStrategy, config *types.CloudServiceConfig) error {
if definitionStrat.Provider != cs.Provider {
return errors.NewInternalf(CodeMismatchCloudProvider, "can't add %s service strategy to compiled strategy for %s",
definitionStrat.Provider, cs.Provider)
}
if cs.Provider == "aws" {
if config.Logs != nil && config.Logs.Enabled {
if serviceType == services.S3Sync {
// S3 bucket sync; No cloudwatch logs are appended for this service type;
// Though definition is populated with a custom cloudwatch group that helps in calculating logs connection status
cs.S3Buckets = config.Logs.S3Buckets
} else if definitionStrat.AWSLogs != nil { // services that includes a logs subscription
cs.AWSLogs.Subscriptions = append(
cs.AWSLogs.Subscriptions,
definitionStrat.AWSLogs.Subscriptions...,
)
}
}
if config.Metrics != nil && config.Metrics.Enabled && definitionStrat.AWSMetrics != nil {
cs.AWSMetrics.StreamFilters = append(
cs.AWSMetrics.StreamFilters,
definitionStrat.AWSMetrics.StreamFilters...,
)
}
return nil
}
return errors.NewNotFoundf(services.CodeUnsupportedCloudProvider, "unsupported cloud provider: %s", cs.Provider)
}

View File

@ -12,7 +12,7 @@ import (
"github.com/SigNoz/signoz/pkg/valuer"
)
type serviceConfigRepository interface {
type ServiceConfigDatabase interface {
get(
ctx context.Context,
orgID string,
@ -140,7 +140,6 @@ func (r *serviceConfigSQLRepository) getAllForAccount(
orgID string,
cloudAccountId string,
) (map[string]*types.CloudServiceConfig, *model.ApiError) {
serviceConfigs := []types.CloudIntegrationService{}
err := r.store.BunDB().NewSelect().

View File

@ -0,0 +1 @@
<svg width="256" height="256" viewBox="0 0 256 256" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" preserveAspectRatio="xMidYMid"><title>AWS Simple Storage Service (S3)</title><defs><linearGradient x1="0%" y1="100%" x2="100%" y2="0%" id="a"><stop stop-color="#1B660F" offset="0%"/><stop stop-color="#6CAE3E" offset="100%"/></linearGradient></defs><g><rect fill="url(#a)" width="256" height="256"/><path d="M194.67488,137.25632 L195.90368,128.60352 C207.23488,135.39072 207.38208,138.19392 207.378964,138.27072 C207.35968,138.28672 205.42688,139.89952 194.67488,137.25632 L194.67488,137.25632 Z M188.45728,135.52832 C168.87328,129.60192 141.59968,117.08992 130.56288,111.87392 C130.56288,111.82912 130.57568,111.78752 130.57568,111.74272 C130.57568,107.50272 127.12608,104.05312 122.88288,104.05312 C118.64608,104.05312 115.19648,107.50272 115.19648,111.74272 C115.19648,115.98272 118.64608,119.43232 122.88288,119.43232 C124.74528,119.43232 126.43488,118.73792 127.76928,117.63392 C140.75488,123.78112 167.81728,136.11072 187.54528,141.93472 L179.74368,196.99392 C179.72128,197.14432 179.71168,197.29472 179.71168,197.44512 C179.71168,202.29312 158.24928,211.19872 123.18048,211.19872 C87.74048,211.19872 66.05088,202.29312 66.05088,197.44512 C66.05088,197.29792 66.04128,197.15392 66.02208,197.00992 L49.72128,77.94752 C63.83008,87.65952 94.17568,92.79872 123.19968,92.79872 C152.17888,92.79872 182.47328,87.67872 196.61088,77.99552 L188.45728,135.52832 Z M47.99968,65.52832 C48.23008,61.31712 72.42848,44.79872 123.19968,44.79872 C173.96448,44.79872 198.16608,61.31392 198.39968,65.52832 L198.39968,66.96512 C195.61568,76.40832 164.25568,86.39872 123.19968,86.39872 C82.07328,86.39872 50.69728,76.37632 47.99968,66.92032 L47.99968,65.52832 Z M204.79968,65.59872 C204.79968,54.51072 173.01088,38.39872 123.19968,38.39872 C73.38848,38.39872 41.59968,54.51072 41.59968,65.59872 L41.90048,68.01152 L59.65408,197.68832 C60.07968,212.19072 98.75488,217.59872 123.18048,217.59872 C153.49088,217.59872 185.69248,210.62912 186.10848,197.69792 L193.77568,143.62752 C198.04128,144.64832 201.55168,145.16992 204.37088,145.16992 C208.15648,145.16992 210.71648,144.24512 212.26848,142.39552 C213.54208,140.87872 214.02848,139.04192 213.66368,137.08672 C212.83488,132.65792 207.57728,127.88352 196.87008,121.77472 L204.47328,68.13632 L204.79968,65.59872 Z" fill="#FFFFFF"/></g></svg>

After

Width:  |  Height:  |  Size: 2.3 KiB

View File

@ -0,0 +1,52 @@
{
"id": "s3sync",
"title": "S3 Sync",
"icon": "file://icon.svg",
"overview": "file://overview.md",
"supported_signals": {
"metrics": false,
"logs": true
},
"data_collected": {
"logs": [
{
"name": "Account ID",
"path": "resources.cloud.account.id",
"type": "string"
},
{
"name": "Account Region",
"path": "resources.cloud.account.region",
"type": "string"
},
{
"name": "Bucket Name",
"path": "resources.cloud.bucket.name",
"type": "string"
},
{
"name": "Object Key",
"path": "attributes.object.key",
"type": "string"
},
{
"name": "Object S3 Event Time",
"path": "attributes.object.event_time",
"type": "string"
}
]
},
"telemetry_collection_strategy": {
"aws_logs": {
"cloudwatch_logs_subscriptions": [
{
"log_group_name_prefix": "x/signoz/forwarder",
"filter_pattern": ""
}
]
}
},
"assets": {
"dashboards": []
}
}

View File

@ -0,0 +1,3 @@
### Sync logs stored in S3 Object Store with SigNoz
Collect logs stored by AWS Services in S3 Object Store and explore them in SigNoz.

View File

@ -0,0 +1,91 @@
package services
import (
"github.com/SigNoz/signoz/pkg/types"
)
type Metadata struct {
Id string `json:"id"`
Title string `json:"title"`
Icon string `json:"icon"`
}
type Definition struct {
Metadata
Overview string `json:"overview"` // markdown
Assets Assets `json:"assets"`
SupportedSignals SupportedSignals `json:"supported_signals"`
DataCollected DataCollected `json:"data_collected"`
Strategy *CollectionStrategy `json:"telemetry_collection_strategy"`
}
type Assets struct {
Dashboards []Dashboard `json:"dashboards"`
}
type SupportedSignals struct {
Logs bool `json:"logs"`
Metrics bool `json:"metrics"`
}
type DataCollected struct {
Logs []CollectedLogAttribute `json:"logs"`
Metrics []CollectedMetric `json:"metrics"`
}
type CollectedLogAttribute struct {
Name string `json:"name"`
Path string `json:"path"`
Type string `json:"type"`
}
type CollectedMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Unit string `json:"unit"`
Description string `json:"description"`
}
type CollectionStrategy struct {
Provider string `json:"provider"`
AWSMetrics *AWSMetricsStrategy `json:"aws_metrics,omitempty"`
AWSLogs *AWSLogsStrategy `json:"aws_logs,omitempty"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"` // Only available in S3 Sync Service Type
}
type AWSMetricsStrategy struct {
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
StreamFilters []struct {
// json tags here are in the shape expected by AWS API as detailed at
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
Namespace string `json:"Namespace"`
MetricNames []string `json:"MetricNames,omitempty"`
} `json:"cloudwatch_metric_stream_filters"`
}
type AWSLogsStrategy struct {
Subscriptions []struct {
// subscribe to all logs groups with specified prefix.
// eg: `/aws/rds/`
LogGroupNamePrefix string `json:"log_group_name_prefix"`
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
// "" implies no filtering is required.
FilterPattern string `json:"filter_pattern"`
} `json:"cloudwatch_logs_subscriptions"`
}
type Dashboard struct {
Id string `json:"id"`
Url string `json:"url"`
Title string `json:"title"`
Description string `json:"description"`
Image string `json:"image"`
Definition *types.DashboardData `json:"definition,omitempty"`
}

View File

@ -1,4 +1,4 @@
package cloudintegrations
package services
import (
"bytes"
@ -9,17 +9,25 @@ import (
"path"
"sort"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/model"
koanfJson "github.com/knadh/koanf/parsers/json"
"golang.org/x/exp/maps"
)
func listCloudProviderServices(
cloudProvider string,
) ([]CloudServiceDetails, *model.ApiError) {
cloudServices := availableServices[cloudProvider]
if cloudServices == nil {
const (
S3Sync = "s3sync"
)
var (
CodeUnsupportedCloudProvider = errors.MustNewCode("unsupported_cloud_provider")
CodeUnsupportedServiceType = errors.MustNewCode("unsupported_service_type")
)
func List(cloudProvider string) ([]Definition, *model.ApiError) {
cloudServices, found := supportedServices[cloudProvider]
if !found || cloudServices == nil {
return nil, model.NotFoundError(fmt.Errorf(
"unsupported cloud provider: %s", cloudProvider,
))
@ -33,21 +41,24 @@ func listCloudProviderServices(
return services, nil
}
func getCloudProviderService(
cloudProvider string, serviceId string,
) (*CloudServiceDetails, *model.ApiError) {
cloudServices := availableServices[cloudProvider]
if cloudServices == nil {
return nil, model.NotFoundError(fmt.Errorf(
"unsupported cloud provider: %s", cloudProvider,
))
func Map(cloudProvider string) (map[string]Definition, error) {
cloudServices, found := supportedServices[cloudProvider]
if !found || cloudServices == nil {
return nil, errors.Newf(errors.TypeNotFound, CodeUnsupportedCloudProvider, "unsupported cloud provider: %s", cloudProvider)
}
svc, exists := cloudServices[serviceId]
return cloudServices, nil
}
func GetServiceDefinition(cloudProvider, serviceType string) (*Definition, error) {
cloudServices := supportedServices[cloudProvider]
if cloudServices == nil {
return nil, errors.Newf(errors.TypeNotFound, CodeUnsupportedCloudProvider, "unsupported cloud provider: %s", cloudProvider)
}
svc, exists := cloudServices[serviceType]
if !exists {
return nil, model.NotFoundError(fmt.Errorf(
"%s service not found: %s", cloudProvider, serviceId,
))
return nil, errors.Newf(errors.TypeNotFound, CodeUnsupportedServiceType, "%s service not found: %s", cloudProvider, serviceType)
}
return &svc, nil
@ -57,7 +68,7 @@ func getCloudProviderService(
// Service details read from ./serviceDefinitions
// { "providerName": { "service_id": {...}} }
var availableServices map[string]map[string]CloudServiceDetails
var supportedServices map[string]map[string]Definition
func init() {
err := readAllServiceDefinitions()
@ -68,15 +79,15 @@ func init() {
}
}
//go:embed serviceDefinitions/*
var serviceDefinitionFiles embed.FS
//go:embed definitions/*
var definitionFiles embed.FS
func readAllServiceDefinitions() error {
availableServices = map[string]map[string]CloudServiceDetails{}
supportedServices = map[string]map[string]Definition{}
rootDirName := "serviceDefinitions"
rootDirName := "definitions"
cloudProviderDirs, err := fs.ReadDir(serviceDefinitionFiles, rootDirName)
cloudProviderDirs, err := fs.ReadDir(definitionFiles, rootDirName)
if err != nil {
return fmt.Errorf("couldn't read dirs in %s: %w", rootDirName, err)
}
@ -98,21 +109,21 @@ func readAllServiceDefinitions() error {
return fmt.Errorf("no %s services could be read", cloudProvider)
}
availableServices[cloudProvider] = cloudServices
supportedServices[cloudProvider] = cloudServices
}
return nil
}
func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath string) (
map[string]CloudServiceDetails, error,
map[string]Definition, error,
) {
svcDefDirs, err := fs.ReadDir(serviceDefinitionFiles, cloudProviderDirPath)
svcDefDirs, err := fs.ReadDir(definitionFiles, cloudProviderDirPath)
if err != nil {
return nil, fmt.Errorf("couldn't list integrations dirs: %w", err)
}
svcDefs := map[string]CloudServiceDetails{}
svcDefs := map[string]Definition{}
for _, d := range svcDefDirs {
if !d.IsDir() {
@ -137,10 +148,10 @@ func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath st
return svcDefs, nil
}
func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServiceDetails, error) {
func readServiceDefinition(cloudProvider string, svcDirpath string) (*Definition, error) {
integrationJsonPath := path.Join(svcDirpath, "integration.json")
serializedSpec, err := serviceDefinitionFiles.ReadFile(integrationJsonPath)
serializedSpec, err := definitionFiles.ReadFile(integrationJsonPath)
if err != nil {
return nil, fmt.Errorf(
"couldn't find integration.json in %s: %w",
@ -157,7 +168,7 @@ func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServi
}
hydrated, err := integrations.HydrateFileUris(
integrationSpec, serviceDefinitionFiles, svcDirpath,
integrationSpec, definitionFiles, svcDirpath,
)
if err != nil {
return nil, fmt.Errorf(
@ -167,7 +178,7 @@ func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServi
}
hydratedSpec := hydrated.(map[string]any)
serviceDef, err := ParseStructWithJsonTagsFromMap[CloudServiceDetails](hydratedSpec)
serviceDef, err := ParseStructWithJsonTagsFromMap[Definition](hydratedSpec)
if err != nil {
return nil, fmt.Errorf(
"couldn't parse hydrated JSON spec read from %s: %w",
@ -180,13 +191,13 @@ func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServi
return nil, fmt.Errorf("invalid service definition %s: %w", serviceDef.Id, err)
}
serviceDef.TelemetryCollectionStrategy.Provider = cloudProvider
serviceDef.Strategy.Provider = cloudProvider
return serviceDef, nil
}
func validateServiceDefinition(s *CloudServiceDetails) error {
func validateServiceDefinition(s *Definition) error {
// Validate dashboard data
seenDashboardIds := map[string]interface{}{}
for _, dd := range s.Assets.Dashboards {
@ -196,7 +207,7 @@ func validateServiceDefinition(s *CloudServiceDetails) error {
seenDashboardIds[dd.Id] = nil
}
if s.TelemetryCollectionStrategy == nil {
if s.Strategy == nil {
return fmt.Errorf("telemetry_collection_strategy is required")
}

View File

@ -1,8 +1,9 @@
package cloudintegrations
package services
import (
"testing"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/stretchr/testify/require"
)
@ -11,24 +12,24 @@ func TestAvailableServices(t *testing.T) {
require := require.New(t)
// should be able to list available services.
_, apiErr := listCloudProviderServices("bad-cloud-provider")
_, apiErr := List("bad-cloud-provider")
require.NotNil(apiErr)
require.Equal(model.ErrorNotFound, apiErr.Type())
awsSvcs, apiErr := listCloudProviderServices("aws")
awsSvcs, apiErr := List("aws")
require.Nil(apiErr)
require.Greater(len(awsSvcs), 0)
// should be able to get details of a service
_, apiErr = getCloudProviderService(
_, err := GetServiceDefinition(
"aws", "bad-service-id",
)
require.NotNil(apiErr)
require.Equal(model.ErrorNotFound, apiErr.Type())
require.NotNil(err)
require.True(errors.Ast(err, errors.TypeNotFound))
svc, apiErr := getCloudProviderService(
svc, err := GetServiceDefinition(
"aws", awsSvcs[0].Id,
)
require.Nil(apiErr)
require.Nil(err)
require.Equal(*svc, awsSvcs[0])
}

View File

@ -24,6 +24,7 @@ import (
"github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/quickfilter"
"github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer"
"github.com/SigNoz/signoz/pkg/signoz"
@ -3985,12 +3986,12 @@ func (aH *APIHandler) CloudIntegrationsAgentCheckIn(
return
}
result, apiErr := aH.CloudIntegrationsController.CheckInAsAgent(
result, err := aH.CloudIntegrationsController.CheckInAsAgent(
r.Context(), claims.OrgID, cloudProvider, req,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
if err != nil {
render.Error(w, err)
return
}
@ -4110,11 +4111,11 @@ func (aH *APIHandler) CloudIntegrationsGetServiceDetails(
return
}
resp, apiErr := aH.CloudIntegrationsController.GetServiceDetails(
resp, err := aH.CloudIntegrationsController.GetServiceDetails(
r.Context(), claims.OrgID, cloudProvider, serviceId, cloudAccountId,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
if err != nil {
render.Error(w, err)
return
}
@ -4138,8 +4139,8 @@ func (aH *APIHandler) calculateCloudIntegrationServiceConnectionStatus(
orgID valuer.UUID,
cloudProvider string,
cloudAccountId string,
svcDetails *cloudintegrations.CloudServiceDetails,
) (*cloudintegrations.CloudServiceConnectionStatus, *model.ApiError) {
svcDetails *cloudintegrations.ServiceDetails,
) (*cloudintegrations.ServiceConnectionStatus, *model.ApiError) {
if cloudProvider != "aws" {
// TODO(Raj): Make connection check generic for all providers in a follow up change
return nil, model.BadRequest(
@ -4147,14 +4148,14 @@ func (aH *APIHandler) calculateCloudIntegrationServiceConnectionStatus(
)
}
telemetryCollectionStrategy := svcDetails.TelemetryCollectionStrategy
telemetryCollectionStrategy := svcDetails.Strategy
if telemetryCollectionStrategy == nil {
return nil, model.InternalError(fmt.Errorf(
"service doesn't have telemetry collection strategy: %s", svcDetails.Id,
))
}
result := &cloudintegrations.CloudServiceConnectionStatus{}
result := &cloudintegrations.ServiceConnectionStatus{}
errors := []*model.ApiError{}
var resultLock sync.Mutex
@ -4214,10 +4215,10 @@ func (aH *APIHandler) calculateCloudIntegrationServiceConnectionStatus(
func (aH *APIHandler) calculateAWSIntegrationSvcMetricsConnectionStatus(
ctx context.Context,
cloudAccountId string,
strategy *cloudintegrations.AWSMetricsCollectionStrategy,
metricsCollectedBySvc []cloudintegrations.CollectedMetric,
strategy *services.AWSMetricsStrategy,
metricsCollectedBySvc []services.CollectedMetric,
) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.CloudwatchMetricsStreamFilters) < 1 {
if strategy == nil || len(strategy.StreamFilters) < 1 {
return nil, nil
}
@ -4226,7 +4227,7 @@ func (aH *APIHandler) calculateAWSIntegrationSvcMetricsConnectionStatus(
"cloud_account_id": cloudAccountId,
}
metricsNamespace := strategy.CloudwatchMetricsStreamFilters[0].Namespace
metricsNamespace := strategy.StreamFilters[0].Namespace
metricsNamespaceParts := strings.Split(metricsNamespace, "/")
if len(metricsNamespaceParts) >= 2 {
@ -4264,13 +4265,13 @@ func (aH *APIHandler) calculateAWSIntegrationSvcLogsConnectionStatus(
ctx context.Context,
orgID valuer.UUID,
cloudAccountId string,
strategy *cloudintegrations.AWSLogsCollectionStrategy,
strategy *services.AWSLogsStrategy,
) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.CloudwatchLogsSubscriptions) < 1 {
if strategy == nil || len(strategy.Subscriptions) < 1 {
return nil, nil
}
logGroupNamePrefix := strategy.CloudwatchLogsSubscriptions[0].LogGroupNamePrefix
logGroupNamePrefix := strategy.Subscriptions[0].LogGroupNamePrefix
if len(logGroupNamePrefix) < 1 {
return nil, nil
}
@ -4358,12 +4359,12 @@ func (aH *APIHandler) CloudIntegrationsUpdateServiceConfig(
return
}
result, apiErr := aH.CloudIntegrationsController.UpdateServiceConfig(
r.Context(), claims.OrgID, cloudProvider, serviceId, req,
result, err := aH.CloudIntegrationsController.UpdateServiceConfig(
r.Context(), claims.OrgID, cloudProvider, serviceId, &req,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
if err != nil {
render.Error(w, err)
return
}

View File

@ -236,9 +236,9 @@ func TestConfigReturnedWhenAgentChecksIn(t *testing.T) {
telemetryCollectionStrategy := checkinResp.IntegrationConfig.TelemetryCollectionStrategy
require.Equal("aws", telemetryCollectionStrategy.Provider)
require.NotNil(telemetryCollectionStrategy.AWSMetrics)
require.Empty(telemetryCollectionStrategy.AWSMetrics.CloudwatchMetricsStreamFilters)
require.Empty(telemetryCollectionStrategy.AWSMetrics.StreamFilters)
require.NotNil(telemetryCollectionStrategy.AWSLogs)
require.Empty(telemetryCollectionStrategy.AWSLogs.CloudwatchLogsSubscriptions)
require.Empty(telemetryCollectionStrategy.AWSLogs.Subscriptions)
// helper
setServiceConfig := func(svcId string, metricsEnabled bool, logsEnabled bool) {
@ -283,14 +283,14 @@ func TestConfigReturnedWhenAgentChecksIn(t *testing.T) {
require.Equal("aws", telemetryCollectionStrategy.Provider)
require.NotNil(telemetryCollectionStrategy.AWSMetrics)
metricStreamNamespaces := []string{}
for _, f := range telemetryCollectionStrategy.AWSMetrics.CloudwatchMetricsStreamFilters {
for _, f := range telemetryCollectionStrategy.AWSMetrics.StreamFilters {
metricStreamNamespaces = append(metricStreamNamespaces, f.Namespace)
}
require.Equal([]string{"AWS/EC2", "CWAgent", "AWS/RDS"}, metricStreamNamespaces)
require.NotNil(telemetryCollectionStrategy.AWSLogs)
logGroupPrefixes := []string{}
for _, f := range telemetryCollectionStrategy.AWSLogs.CloudwatchLogsSubscriptions {
for _, f := range telemetryCollectionStrategy.AWSLogs.Subscriptions {
logGroupPrefixes = append(logGroupPrefixes, f.LogGroupNamePrefix)
}
require.Equal(1, len(logGroupPrefixes))
@ -327,14 +327,14 @@ func TestConfigReturnedWhenAgentChecksIn(t *testing.T) {
require.Equal("aws", telemetryCollectionStrategy.Provider)
require.NotNil(telemetryCollectionStrategy.AWSMetrics)
metricStreamNamespaces = []string{}
for _, f := range telemetryCollectionStrategy.AWSMetrics.CloudwatchMetricsStreamFilters {
for _, f := range telemetryCollectionStrategy.AWSMetrics.StreamFilters {
metricStreamNamespaces = append(metricStreamNamespaces, f.Namespace)
}
require.Equal([]string{"AWS/RDS"}, metricStreamNamespaces)
require.NotNil(telemetryCollectionStrategy.AWSLogs)
logGroupPrefixes = []string{}
for _, f := range telemetryCollectionStrategy.AWSLogs.CloudwatchLogsSubscriptions {
for _, f := range telemetryCollectionStrategy.AWSLogs.Subscriptions {
logGroupPrefixes = append(logGroupPrefixes, f.LogGroupNamePrefix)
}
require.Equal(0, len(logGroupPrefixes))
@ -522,7 +522,7 @@ func (tb *CloudIntegrationsTestBed) GetServicesFromQS(
func (tb *CloudIntegrationsTestBed) GetServiceDetailFromQS(
cloudProvider string, serviceId string, cloudAccountId *string,
) *cloudintegrations.CloudServiceDetails {
) *cloudintegrations.ServiceDetails {
path := fmt.Sprintf("/api/v1/cloud-integrations/%s/services/%s", cloudProvider, serviceId)
if cloudAccountId != nil {
path = fmt.Sprintf("%s?cloud_account_id=%s", path, *cloudAccountId)
@ -537,7 +537,7 @@ func (tb *CloudIntegrationsTestBed) GetServiceDetailFromQS(
`SELECT.*from.*signoz_metrics.*`,
).WillReturnRows(mockhouse.NewRows(metricCols, [][]any{}))
return RequestQSAndParseResp[cloudintegrations.CloudServiceDetails](
return RequestQSAndParseResp[cloudintegrations.ServiceDetails](
tb, path, nil,
)
}

View File

@ -203,7 +203,8 @@ type CloudIntegrationService struct {
}
type CloudServiceLogsConfig struct {
Enabled bool `json:"enabled"`
Enabled bool `json:"enabled"`
S3Buckets map[string][]string `json:"s3_buckets,omitempty"`
}
type CloudServiceMetricsConfig struct {