Feat: aws integrations: service connection status (#7032)

* feat: add ability to get latest metric received ts by labelValues filter

* feat: svc metrics connection status check

* feat: aws integration svc logs connection check

* chore: fix broken test

* chore: address PR review comments

* chore: address PR feedback

* chore: fix broken test expectation

* fix: use resource filter for logs connection status
This commit is contained in:
Raj Kamal Singh 2025-02-06 13:08:47 +05:30 committed by GitHub
parent c5219ac157
commit acd9b97ee3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 266 additions and 16 deletions

View File

@ -3900,27 +3900,46 @@ func (r *ClickHouseReader) GetCountOfThings(ctx context.Context, query string) (
}
func (r *ClickHouseReader) GetLatestReceivedMetric(
ctx context.Context, metricNames []string,
ctx context.Context, metricNames []string, labelValues map[string]string,
) (*model.MetricStatus, *model.ApiError) {
// at least 1 metric name must be specified.
// this query can be too slow otherwise.
if len(metricNames) < 1 {
return nil, nil
return nil, model.BadRequest(fmt.Errorf("atleast 1 metric name must be specified"))
}
quotedMetricNames := []string{}
for _, m := range metricNames {
quotedMetricNames = append(quotedMetricNames, fmt.Sprintf(`'%s'`, m))
quotedMetricNames = append(quotedMetricNames, utils.ClickHouseFormattedValue(m))
}
commaSeparatedMetricNames := strings.Join(quotedMetricNames, ", ")
whereClauseParts := []string{
fmt.Sprintf(`metric_name in (%s)`, commaSeparatedMetricNames),
}
if labelValues != nil {
for label, val := range labelValues {
whereClauseParts = append(
whereClauseParts,
fmt.Sprintf(`JSONExtractString(labels, '%s') = '%s'`, label, val),
)
}
}
if len(whereClauseParts) < 1 {
return nil, nil
}
whereClause := strings.Join(whereClauseParts, " AND ")
query := fmt.Sprintf(`
SELECT metric_name, labels, unix_milli
SELECT metric_name, anyLast(labels), max(unix_milli)
from %s.%s
where metric_name in (
%s
)
order by unix_milli desc
where %s
group by metric_name
limit 1
`, signozMetricDBName, signozTSTableNameV4, commaSeparatedMetricNames,
`, signozMetricDBName, signozTSTableNameV4, whereClause,
)
rows, err := r.db.Query(ctx, query)

View File

@ -3739,7 +3739,7 @@ func (aH *APIHandler) calculateConnectionStatus(
}
statusForLastReceivedMetric, apiErr := aH.reader.GetLatestReceivedMetric(
ctx, connectionTests.Metrics,
ctx, connectionTests.Metrics, nil,
)
resultLock.Lock()
@ -4105,14 +4105,229 @@ func (aH *APIHandler) CloudIntegrationsGetServiceDetails(
resp, apiErr := aH.CloudIntegrationsController.GetServiceDetails(
r.Context(), cloudProvider, serviceId, cloudAccountId,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
// Add connection status for the 2 signals.
if cloudAccountId != nil {
connStatus, apiErr := aH.calculateCloudIntegrationServiceConnectionStatus(
r.Context(), cloudProvider, *cloudAccountId, resp,
)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
resp.ConnectionStatus = connStatus
}
aH.Respond(w, resp)
}
func (aH *APIHandler) calculateCloudIntegrationServiceConnectionStatus(
ctx context.Context,
cloudProvider string,
cloudAccountId string,
svcDetails *cloudintegrations.CloudServiceDetails,
) (*cloudintegrations.CloudServiceConnectionStatus, *model.ApiError) {
if cloudProvider != "aws" {
// TODO(Raj): Make connection check generic for all providers in a follow up change
return nil, model.BadRequest(
fmt.Errorf("unsupported cloud provider: %s", cloudProvider),
)
}
telemetryCollectionStrategy := svcDetails.TelemetryCollectionStrategy
if telemetryCollectionStrategy == nil {
return nil, model.InternalError(fmt.Errorf(
"service doesn't have telemetry collection strategy: %s", svcDetails.Id,
))
}
result := &cloudintegrations.CloudServiceConnectionStatus{}
errors := []*model.ApiError{}
var resultLock sync.Mutex
var wg sync.WaitGroup
// Calculate metrics connection status
if telemetryCollectionStrategy.AWSMetrics != nil {
wg.Add(1)
go func() {
defer wg.Done()
metricsConnStatus, apiErr := aH.calculateAWSIntegrationSvcMetricsConnectionStatus(
ctx, cloudAccountId, telemetryCollectionStrategy.AWSMetrics, svcDetails.DataCollected.Metrics,
)
resultLock.Lock()
defer resultLock.Unlock()
if apiErr != nil {
errors = append(errors, apiErr)
} else {
result.Metrics = metricsConnStatus
}
}()
}
// Calculate logs connection status
if telemetryCollectionStrategy.AWSLogs != nil {
wg.Add(1)
go func() {
defer wg.Done()
logsConnStatus, apiErr := aH.calculateAWSIntegrationSvcLogsConnectionStatus(
ctx, cloudAccountId, telemetryCollectionStrategy.AWSLogs,
)
resultLock.Lock()
defer resultLock.Unlock()
if apiErr != nil {
errors = append(errors, apiErr)
} else {
result.Logs = logsConnStatus
}
}()
}
wg.Wait()
if len(errors) > 0 {
return nil, errors[0]
}
return result, nil
}
func (aH *APIHandler) calculateAWSIntegrationSvcMetricsConnectionStatus(
ctx context.Context,
cloudAccountId string,
strategy *cloudintegrations.AWSMetricsCollectionStrategy,
metricsCollectedBySvc []cloudintegrations.CollectedMetric,
) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.CloudwatchMetricsStreamFilters) < 1 {
return nil, nil
}
metricsNamespace := strategy.CloudwatchMetricsStreamFilters[0].Namespace
metricsNamespaceParts := strings.Split(metricsNamespace, "/")
if len(metricsNamespaceParts) < 2 {
return nil, model.InternalError(fmt.Errorf(
"unexpected metric namespace: %s", metricsNamespace,
))
}
expectedLabelValues := map[string]string{
"cloud_provider": "aws",
"cloud_account_id": cloudAccountId,
"service_namespace": metricsNamespaceParts[0],
"service_name": metricsNamespaceParts[1],
}
metricNamesCollectedBySvc := []string{}
for _, cm := range metricsCollectedBySvc {
metricNamesCollectedBySvc = append(metricNamesCollectedBySvc, cm.Name)
}
statusForLastReceivedMetric, apiErr := aH.reader.GetLatestReceivedMetric(
ctx, metricNamesCollectedBySvc, expectedLabelValues,
)
if apiErr != nil {
return nil, apiErr
}
if statusForLastReceivedMetric != nil {
return &cloudintegrations.SignalConnectionStatus{
LastReceivedTsMillis: statusForLastReceivedMetric.LastReceivedTsMillis,
LastReceivedFrom: "signoz-aws-integration",
}, nil
}
return nil, nil
}
func (aH *APIHandler) calculateAWSIntegrationSvcLogsConnectionStatus(
ctx context.Context,
cloudAccountId string,
strategy *cloudintegrations.AWSLogsCollectionStrategy,
) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) {
if strategy == nil || len(strategy.CloudwatchLogsSubscriptions) < 1 {
return nil, nil
}
logGroupNamePrefix := strategy.CloudwatchLogsSubscriptions[0].LogGroupNamePrefix
if len(logGroupNamePrefix) < 1 {
return nil, nil
}
logsConnTestFilter := &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "cloud.account.id",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: "=",
Value: cloudAccountId,
},
{
Key: v3.AttributeKey{
Key: "aws.cloudwatch.log_group_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
Operator: "like",
Value: logGroupNamePrefix + "%",
},
},
}
// TODO(Raj): Receive this as a param from UI in the future.
lookbackSeconds := int64(30 * 60)
qrParams := &v3.QueryRangeParamsV3{
Start: time.Now().UnixMilli() - (lookbackSeconds * 1000),
End: time.Now().UnixMilli(),
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeList,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
PageSize: 1,
Filters: logsConnTestFilter,
QueryName: "A",
DataSource: v3.DataSourceLogs,
Expression: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
},
},
},
}
queryRes, _, err := aH.querier.QueryRange(
ctx, qrParams,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not query for integration connection status: %w", err,
))
}
if len(queryRes) > 0 && queryRes[0].List != nil && len(queryRes[0].List) > 0 {
lastLog := queryRes[0].List[0]
return &cloudintegrations.SignalConnectionStatus{
LastReceivedTsMillis: lastLog.Timestamp.UnixMilli(),
LastReceivedFrom: "signoz-aws-integration",
}, nil
}
return nil, nil
}
func (aH *APIHandler) CloudIntegrationsUpdateServiceConfig(
w http.ResponseWriter, r *http.Request,
) {

View File

@ -52,7 +52,9 @@ type Reader interface {
GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
// Returns `MetricStatus` for latest received metric among `metricNames`. Useful for status calculations
GetLatestReceivedMetric(ctx context.Context, metricNames []string) (*model.MetricStatus, *model.ApiError)
GetLatestReceivedMetric(
ctx context.Context, metricNames []string, labelValues map[string]string,
) (*model.MetricStatus, *model.ApiError)
// QB V3 metrics/traces/logs
GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error)

View File

@ -353,7 +353,11 @@ func NewCloudIntegrationsTestBed(t *testing.T, testDB *sqlx.DB) *CloudIntegratio
}
fm := featureManager.StartManager()
reader, mockClickhouse := NewMockClickhouseReader(t, testDB, fm)
mockClickhouse.MatchExpectationsInOrder(false)
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{
Reader: reader,
AppDao: dao.DB(),
CloudIntegrationsController: controller,
FeatureFlags: fm,
@ -373,9 +377,10 @@ func NewCloudIntegrationsTestBed(t *testing.T, testDB *sqlx.DB) *CloudIntegratio
}
return &CloudIntegrationsTestBed{
t: t,
testUser: user,
qsHttpHandler: router,
t: t,
testUser: user,
qsHttpHandler: router,
mockClickhouse: mockClickhouse,
}
}
@ -504,6 +509,15 @@ func (tb *CloudIntegrationsTestBed) GetServiceDetailFromQS(
path = fmt.Sprintf("%s?cloud_account_id=%s", path, *cloudAccountId)
}
// add mock expectations for connection status queries
metricCols := []mockhouse.ColumnType{}
metricCols = append(metricCols, mockhouse.ColumnType{Type: "String", Name: "metric_name"})
metricCols = append(metricCols, mockhouse.ColumnType{Type: "String", Name: "labels"})
metricCols = append(metricCols, mockhouse.ColumnType{Type: "Int64", Name: "unix_milli"})
tb.mockClickhouse.ExpectQuery(
`SELECT.*from.*signoz_metrics.*`,
).WillReturnRows(mockhouse.NewRows(metricCols, [][]any{}))
return RequestQSAndParseResp[cloudintegrations.CloudServiceDetails](
tb, path, nil,
)

View File

@ -538,7 +538,7 @@ func (tb *IntegrationsTestBed) mockMetricStatusQueryResponse(expectation *model.
}
tb.mockClickhouse.ExpectQuery(
`SELECT.*metric_name, labels, unix_milli.*from.*signoz_metrics.*where metric_name in.*limit 1.*`,
`SELECT.*from.*signoz_metrics.*`,
).WillReturnRows(mockhouse.NewRows(cols, values))
}