From acd9b97ee38ab24a8848945864db8aa41d9673ad Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+raj-k-singh@users.noreply.github.com> Date: Thu, 6 Feb 2025 13:08:47 +0530 Subject: [PATCH] Feat: aws integrations: service connection status (#7032) * feat: add ability to get latest metric received ts by labelValues filter * feat: svc metrics connection status check * feat: aws integration svc logs connection check * chore: fix broken test * chore: address PR review comments * chore: address PR feedback * chore: fix broken test expectation * fix: use resource filter for logs connection status --- .../app/clickhouseReader/reader.go | 37 ++- pkg/query-service/app/http_handler.go | 219 +++++++++++++++++- pkg/query-service/interfaces/interface.go | 4 +- .../signoz_cloud_integrations_test.go | 20 +- .../integration/signoz_integrations_test.go | 2 +- 5 files changed, 266 insertions(+), 16 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 7b9965e431..c21d490b7b 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -3900,27 +3900,46 @@ func (r *ClickHouseReader) GetCountOfThings(ctx context.Context, query string) ( } func (r *ClickHouseReader) GetLatestReceivedMetric( - ctx context.Context, metricNames []string, + ctx context.Context, metricNames []string, labelValues map[string]string, ) (*model.MetricStatus, *model.ApiError) { + // at least 1 metric name must be specified. + // this query can be too slow otherwise. if len(metricNames) < 1 { - return nil, nil + return nil, model.BadRequest(fmt.Errorf("atleast 1 metric name must be specified")) } quotedMetricNames := []string{} for _, m := range metricNames { - quotedMetricNames = append(quotedMetricNames, fmt.Sprintf(`'%s'`, m)) + quotedMetricNames = append(quotedMetricNames, utils.ClickHouseFormattedValue(m)) } commaSeparatedMetricNames := strings.Join(quotedMetricNames, ", ") + whereClauseParts := []string{ + fmt.Sprintf(`metric_name in (%s)`, commaSeparatedMetricNames), + } + + if labelValues != nil { + for label, val := range labelValues { + whereClauseParts = append( + whereClauseParts, + fmt.Sprintf(`JSONExtractString(labels, '%s') = '%s'`, label, val), + ) + } + } + + if len(whereClauseParts) < 1 { + return nil, nil + } + + whereClause := strings.Join(whereClauseParts, " AND ") + query := fmt.Sprintf(` - SELECT metric_name, labels, unix_milli + SELECT metric_name, anyLast(labels), max(unix_milli) from %s.%s - where metric_name in ( - %s - ) - order by unix_milli desc + where %s + group by metric_name limit 1 - `, signozMetricDBName, signozTSTableNameV4, commaSeparatedMetricNames, + `, signozMetricDBName, signozTSTableNameV4, whereClause, ) rows, err := r.db.Query(ctx, query) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index ceec1725b8..24fc19828c 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -3739,7 +3739,7 @@ func (aH *APIHandler) calculateConnectionStatus( } statusForLastReceivedMetric, apiErr := aH.reader.GetLatestReceivedMetric( - ctx, connectionTests.Metrics, + ctx, connectionTests.Metrics, nil, ) resultLock.Lock() @@ -4105,14 +4105,229 @@ func (aH *APIHandler) CloudIntegrationsGetServiceDetails( resp, apiErr := aH.CloudIntegrationsController.GetServiceDetails( r.Context(), cloudProvider, serviceId, cloudAccountId, ) - if apiErr != nil { RespondError(w, apiErr, nil) return } + + // Add connection status for the 2 signals. + if cloudAccountId != nil { + connStatus, apiErr := aH.calculateCloudIntegrationServiceConnectionStatus( + r.Context(), cloudProvider, *cloudAccountId, resp, + ) + if apiErr != nil { + RespondError(w, apiErr, nil) + return + } + resp.ConnectionStatus = connStatus + } + aH.Respond(w, resp) } +func (aH *APIHandler) calculateCloudIntegrationServiceConnectionStatus( + ctx context.Context, + cloudProvider string, + cloudAccountId string, + svcDetails *cloudintegrations.CloudServiceDetails, +) (*cloudintegrations.CloudServiceConnectionStatus, *model.ApiError) { + if cloudProvider != "aws" { + // TODO(Raj): Make connection check generic for all providers in a follow up change + return nil, model.BadRequest( + fmt.Errorf("unsupported cloud provider: %s", cloudProvider), + ) + } + + telemetryCollectionStrategy := svcDetails.TelemetryCollectionStrategy + if telemetryCollectionStrategy == nil { + return nil, model.InternalError(fmt.Errorf( + "service doesn't have telemetry collection strategy: %s", svcDetails.Id, + )) + } + + result := &cloudintegrations.CloudServiceConnectionStatus{} + errors := []*model.ApiError{} + var resultLock sync.Mutex + + var wg sync.WaitGroup + + // Calculate metrics connection status + if telemetryCollectionStrategy.AWSMetrics != nil { + wg.Add(1) + go func() { + defer wg.Done() + + metricsConnStatus, apiErr := aH.calculateAWSIntegrationSvcMetricsConnectionStatus( + ctx, cloudAccountId, telemetryCollectionStrategy.AWSMetrics, svcDetails.DataCollected.Metrics, + ) + + resultLock.Lock() + defer resultLock.Unlock() + + if apiErr != nil { + errors = append(errors, apiErr) + } else { + result.Metrics = metricsConnStatus + } + }() + } + + // Calculate logs connection status + if telemetryCollectionStrategy.AWSLogs != nil { + wg.Add(1) + go func() { + defer wg.Done() + + logsConnStatus, apiErr := aH.calculateAWSIntegrationSvcLogsConnectionStatus( + ctx, cloudAccountId, telemetryCollectionStrategy.AWSLogs, + ) + + resultLock.Lock() + defer resultLock.Unlock() + + if apiErr != nil { + errors = append(errors, apiErr) + } else { + result.Logs = logsConnStatus + } + }() + } + + wg.Wait() + + if len(errors) > 0 { + return nil, errors[0] + } + + return result, nil + +} +func (aH *APIHandler) calculateAWSIntegrationSvcMetricsConnectionStatus( + ctx context.Context, + cloudAccountId string, + strategy *cloudintegrations.AWSMetricsCollectionStrategy, + metricsCollectedBySvc []cloudintegrations.CollectedMetric, +) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) { + if strategy == nil || len(strategy.CloudwatchMetricsStreamFilters) < 1 { + return nil, nil + } + + metricsNamespace := strategy.CloudwatchMetricsStreamFilters[0].Namespace + metricsNamespaceParts := strings.Split(metricsNamespace, "/") + if len(metricsNamespaceParts) < 2 { + return nil, model.InternalError(fmt.Errorf( + "unexpected metric namespace: %s", metricsNamespace, + )) + } + + expectedLabelValues := map[string]string{ + "cloud_provider": "aws", + "cloud_account_id": cloudAccountId, + "service_namespace": metricsNamespaceParts[0], + "service_name": metricsNamespaceParts[1], + } + + metricNamesCollectedBySvc := []string{} + for _, cm := range metricsCollectedBySvc { + metricNamesCollectedBySvc = append(metricNamesCollectedBySvc, cm.Name) + } + + statusForLastReceivedMetric, apiErr := aH.reader.GetLatestReceivedMetric( + ctx, metricNamesCollectedBySvc, expectedLabelValues, + ) + if apiErr != nil { + return nil, apiErr + } + + if statusForLastReceivedMetric != nil { + return &cloudintegrations.SignalConnectionStatus{ + LastReceivedTsMillis: statusForLastReceivedMetric.LastReceivedTsMillis, + LastReceivedFrom: "signoz-aws-integration", + }, nil + } + + return nil, nil +} + +func (aH *APIHandler) calculateAWSIntegrationSvcLogsConnectionStatus( + ctx context.Context, + cloudAccountId string, + strategy *cloudintegrations.AWSLogsCollectionStrategy, +) (*cloudintegrations.SignalConnectionStatus, *model.ApiError) { + if strategy == nil || len(strategy.CloudwatchLogsSubscriptions) < 1 { + return nil, nil + } + + logGroupNamePrefix := strategy.CloudwatchLogsSubscriptions[0].LogGroupNamePrefix + if len(logGroupNamePrefix) < 1 { + return nil, nil + } + + logsConnTestFilter := &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "cloud.account.id", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: "=", + Value: cloudAccountId, + }, + { + Key: v3.AttributeKey{ + Key: "aws.cloudwatch.log_group_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: "like", + Value: logGroupNamePrefix + "%", + }, + }, + } + + // TODO(Raj): Receive this as a param from UI in the future. + lookbackSeconds := int64(30 * 60) + + qrParams := &v3.QueryRangeParamsV3{ + Start: time.Now().UnixMilli() - (lookbackSeconds * 1000), + End: time.Now().UnixMilli(), + CompositeQuery: &v3.CompositeQuery{ + PanelType: v3.PanelTypeList, + QueryType: v3.QueryTypeBuilder, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + PageSize: 1, + Filters: logsConnTestFilter, + QueryName: "A", + DataSource: v3.DataSourceLogs, + Expression: "A", + AggregateOperator: v3.AggregateOperatorNoOp, + }, + }, + }, + } + queryRes, _, err := aH.querier.QueryRange( + ctx, qrParams, + ) + if err != nil { + return nil, model.InternalError(fmt.Errorf( + "could not query for integration connection status: %w", err, + )) + } + if len(queryRes) > 0 && queryRes[0].List != nil && len(queryRes[0].List) > 0 { + lastLog := queryRes[0].List[0] + + return &cloudintegrations.SignalConnectionStatus{ + LastReceivedTsMillis: lastLog.Timestamp.UnixMilli(), + LastReceivedFrom: "signoz-aws-integration", + }, nil + } + + return nil, nil +} + func (aH *APIHandler) CloudIntegrationsUpdateServiceConfig( w http.ResponseWriter, r *http.Request, ) { diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 830f6ab759..afb1e9f728 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -52,7 +52,9 @@ type Reader interface { GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) // Returns `MetricStatus` for latest received metric among `metricNames`. Useful for status calculations - GetLatestReceivedMetric(ctx context.Context, metricNames []string) (*model.MetricStatus, *model.ApiError) + GetLatestReceivedMetric( + ctx context.Context, metricNames []string, labelValues map[string]string, + ) (*model.MetricStatus, *model.ApiError) // QB V3 metrics/traces/logs GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error) diff --git a/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go b/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go index f3a55645f0..a0d9c99f7f 100644 --- a/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go +++ b/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go @@ -353,7 +353,11 @@ func NewCloudIntegrationsTestBed(t *testing.T, testDB *sqlx.DB) *CloudIntegratio } fm := featureManager.StartManager() + reader, mockClickhouse := NewMockClickhouseReader(t, testDB, fm) + mockClickhouse.MatchExpectationsInOrder(false) + apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{ + Reader: reader, AppDao: dao.DB(), CloudIntegrationsController: controller, FeatureFlags: fm, @@ -373,9 +377,10 @@ func NewCloudIntegrationsTestBed(t *testing.T, testDB *sqlx.DB) *CloudIntegratio } return &CloudIntegrationsTestBed{ - t: t, - testUser: user, - qsHttpHandler: router, + t: t, + testUser: user, + qsHttpHandler: router, + mockClickhouse: mockClickhouse, } } @@ -504,6 +509,15 @@ func (tb *CloudIntegrationsTestBed) GetServiceDetailFromQS( path = fmt.Sprintf("%s?cloud_account_id=%s", path, *cloudAccountId) } + // add mock expectations for connection status queries + metricCols := []mockhouse.ColumnType{} + metricCols = append(metricCols, mockhouse.ColumnType{Type: "String", Name: "metric_name"}) + metricCols = append(metricCols, mockhouse.ColumnType{Type: "String", Name: "labels"}) + metricCols = append(metricCols, mockhouse.ColumnType{Type: "Int64", Name: "unix_milli"}) + tb.mockClickhouse.ExpectQuery( + `SELECT.*from.*signoz_metrics.*`, + ).WillReturnRows(mockhouse.NewRows(metricCols, [][]any{})) + return RequestQSAndParseResp[cloudintegrations.CloudServiceDetails]( tb, path, nil, ) diff --git a/pkg/query-service/tests/integration/signoz_integrations_test.go b/pkg/query-service/tests/integration/signoz_integrations_test.go index 20b55a5551..778a87cd6b 100644 --- a/pkg/query-service/tests/integration/signoz_integrations_test.go +++ b/pkg/query-service/tests/integration/signoz_integrations_test.go @@ -538,7 +538,7 @@ func (tb *IntegrationsTestBed) mockMetricStatusQueryResponse(expectation *model. } tb.mockClickhouse.ExpectQuery( - `SELECT.*metric_name, labels, unix_milli.*from.*signoz_metrics.*where metric_name in.*limit 1.*`, + `SELECT.*from.*signoz_metrics.*`, ).WillReturnRows(mockhouse.NewRows(cols, values)) }