diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index df41572155..56a901a7fb 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -72,6 +72,7 @@ const ( signozSampleLocalTableName = "samples_v2" signozSampleTableName = "distributed_samples_v2" signozTSTableName = "distributed_time_series_v2" + signozTSTableNameV4 = "distributed_time_series_v4" signozTSTableNameV41Day = "distributed_time_series_v4_1day" minTimespanForProgressiveSearch = time.Hour @@ -4271,6 +4272,67 @@ func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, metricName, se }, nil } +func (r *ClickHouseReader) GetLatestReceivedMetric( + ctx context.Context, metricNames []string, +) (*model.MetricStatus, *model.ApiError) { + if len(metricNames) < 1 { + return nil, nil + } + + quotedMetricNames := []string{} + for _, m := range metricNames { + quotedMetricNames = append(quotedMetricNames, fmt.Sprintf(`'%s'`, m)) + } + commaSeparatedMetricNames := strings.Join(quotedMetricNames, ", ") + + query := fmt.Sprintf(` + SELECT metric_name, labels, unix_milli + from %s.%s + where metric_name in ( + %s + ) + order by unix_milli desc + limit 1 + `, signozMetricDBName, signozTSTableNameV4, commaSeparatedMetricNames, + ) + + rows, err := r.db.Query(ctx, query) + if err != nil { + return nil, model.InternalError(fmt.Errorf( + "couldn't query clickhouse for received metrics status: %w", err, + )) + } + defer rows.Close() + + var result *model.MetricStatus + + if rows.Next() { + + result = &model.MetricStatus{} + var labelsJson string + + err := rows.Scan( + &result.MetricName, + &labelsJson, + &result.LastReceivedTsMillis, + ) + if err != nil { + return nil, model.InternalError(fmt.Errorf( + "couldn't scan metric status row: %w", err, + )) + } + + err = json.Unmarshal([]byte(labelsJson), &result.LastReceivedLabels) + if err != nil { + return nil, model.InternalError(fmt.Errorf( + "couldn't unmarshal metric labels json: %w", err, + )) + } + } + + return result, nil +} + func isColumn(tableStatement, attrType, field, datType string) bool { // value of attrType will be `resource` or `tag`, if `tag` change it to `attribute` name := utils.GetClickhouseColumnName(attrType, datType, field) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index e8200635f7..6bc70c43bb 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2481,11 +2481,25 @@ func (ah *APIHandler) GetIntegrationConnectionStatus( w http.ResponseWriter, r *http.Request, ) { integrationId := mux.Vars(r)["integrationId"] + isInstalled, apiErr := ah.IntegrationsController.IsIntegrationInstalled( + r.Context(), integrationId, + ) + if apiErr != nil { + RespondError(w, apiErr, "failed to check if integration is installed") + return + } + + // Do not spend resources calculating connection status unless installed. + if !isInstalled { + ah.Respond(w, &integrations.IntegrationConnectionStatus{}) + return + } + connectionTests, apiErr := ah.IntegrationsController.GetIntegrationConnectionTests( r.Context(), integrationId, ) if apiErr != nil { - RespondError(w, apiErr, "Failed to fetch integration connection tests") + RespondError(w, apiErr, "failed to fetch integration connection tests") return } @@ -2511,65 +2525,140 @@ func (ah *APIHandler) calculateConnectionStatus( connectionTests *integrations.IntegrationConnectionTests, lookbackSeconds int64, ) (*integrations.IntegrationConnectionStatus, *model.ApiError) { + // Calculate connection status for signals in parallel + result := &integrations.IntegrationConnectionStatus{} + errors := []*model.ApiError{} + var resultLock sync.Mutex - if connectionTests.Logs != nil { - qrParams := &v3.QueryRangeParamsV3{ - Start: time.Now().UnixMilli() - (lookbackSeconds * 1000), - End: time.Now().UnixMilli(), - CompositeQuery: &v3.CompositeQuery{ - PanelType: v3.PanelTypeList, - QueryType: v3.QueryTypeBuilder, - BuilderQueries: map[string]*v3.BuilderQuery{ - "A": { - PageSize: 1, - Filters: connectionTests.Logs, - QueryName: "A", - DataSource: v3.DataSourceLogs, - Expression: "A", - AggregateOperator: v3.AggregateOperatorNoOp, - }, - }, - }, - } - queryRes, err, _ := ah.querier.QueryRange( - ctx, qrParams, map[string]v3.AttributeKey{}, + var wg sync.WaitGroup + + // Calculate logs connection status + wg.Add(1) + go func() { + defer wg.Done() + + logsConnStatus, apiErr := ah.calculateLogsConnectionStatus( + ctx, connectionTests.Logs, lookbackSeconds, ) - if err != nil { - return nil, model.InternalError(fmt.Errorf( - "could not query for integration connection status: %w", err, - )) - } - if len(queryRes) > 0 && queryRes[0].List != nil && len(queryRes[0].List) > 0 { - lastLog := queryRes[0].List[0] + resultLock.Lock() + defer resultLock.Unlock() + + if apiErr != nil { + errors = append(errors, apiErr) + } else { + result.Logs = logsConnStatus + } + }() + + // Calculate metrics connection status + wg.Add(1) + go func() { + defer wg.Done() + + if connectionTests.Metrics == nil || len(connectionTests.Metrics) < 1 { + return + } + + statusForLastReceivedMetric, apiErr := ah.reader.GetLatestReceivedMetric( + ctx, connectionTests.Metrics, + ) + + resultLock.Lock() + defer resultLock.Unlock() + + if apiErr != nil { + errors = append(errors, apiErr) + + } else if statusForLastReceivedMetric != nil { resourceSummaryParts := []string{} - lastLogResourceAttribs := lastLog.Data["resources_string"] - if lastLogResourceAttribs != nil { - resourceAttribs, ok := lastLogResourceAttribs.(*map[string]string) - if !ok { - return nil, model.InternalError(fmt.Errorf( - "could not cast log resource attribs", - )) - } - for k, v := range *resourceAttribs { - resourceSummaryParts = append(resourceSummaryParts, fmt.Sprintf( - "%s=%s", k, v, - )) - } + for k, v := range statusForLastReceivedMetric.LastReceivedLabels { + resourceSummaryParts = append(resourceSummaryParts, fmt.Sprintf( + "%s=%s", k, v, + )) } - lastLogResourceSummary := strings.Join(resourceSummaryParts, ", ") - result.Logs = &integrations.SignalConnectionStatus{ - LastReceivedTsMillis: lastLog.Timestamp.UnixMilli(), - LastReceivedFrom: lastLogResourceSummary, + result.Metrics = &integrations.SignalConnectionStatus{ + LastReceivedFrom: strings.Join(resourceSummaryParts, ", "), + LastReceivedTsMillis: statusForLastReceivedMetric.LastReceivedTsMillis, } } + }() + + wg.Wait() + + if len(errors) > 0 { + return nil, errors[0] } return result, nil } +func (ah *APIHandler) calculateLogsConnectionStatus( + ctx context.Context, + logsConnectionTest *v3.FilterSet, + lookbackSeconds int64, +) (*integrations.SignalConnectionStatus, *model.ApiError) { + if logsConnectionTest == nil { + return nil, nil + } + + qrParams := &v3.QueryRangeParamsV3{ + Start: time.Now().UnixMilli() - (lookbackSeconds * 1000), + End: time.Now().UnixMilli(), + CompositeQuery: &v3.CompositeQuery{ + PanelType: v3.PanelTypeList, + QueryType: v3.QueryTypeBuilder, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + PageSize: 1, + Filters: logsConnectionTest, + QueryName: "A", + DataSource: v3.DataSourceLogs, + Expression: "A", + AggregateOperator: v3.AggregateOperatorNoOp, + }, + }, + }, + } + queryRes, err, _ := ah.querier.QueryRange( + ctx, qrParams, map[string]v3.AttributeKey{}, + ) + if err != nil { + return nil, model.InternalError(fmt.Errorf( + "could not query for integration connection status: %w", err, + )) + } + if len(queryRes) > 0 && queryRes[0].List != nil && len(queryRes[0].List) > 0 { + lastLog := queryRes[0].List[0] + + resourceSummaryParts := []string{} + lastLogResourceAttribs := lastLog.Data["resources_string"] + if lastLogResourceAttribs != nil { + resourceAttribs, ok := lastLogResourceAttribs.(*map[string]string) + if !ok { + return nil, model.InternalError(fmt.Errorf( + "could not cast log resource attribs", + )) + } + for k, v := range *resourceAttribs { + resourceSummaryParts = append(resourceSummaryParts, fmt.Sprintf( + "%s=%s", k, v, + )) + } + } + lastLogResourceSummary := strings.Join(resourceSummaryParts, ", ") + + return &integrations.SignalConnectionStatus{ + LastReceivedTsMillis: lastLog.Timestamp.UnixMilli(), + LastReceivedFrom: lastLogResourceSummary, + }, nil + } + + return nil, nil +} + func (ah *APIHandler) InstallIntegration( w http.ResponseWriter, r *http.Request, ) { diff --git a/pkg/query-service/app/integrations/builtin.go b/pkg/query-service/app/integrations/builtin.go index a612e45ed3..cf98b3ff9d 100644 --- a/pkg/query-service/app/integrations/builtin.go +++ b/pkg/query-service/app/integrations/builtin.go @@ -4,6 +4,7 @@ import ( "context" "embed" "strings" + "unicode" "encoding/base64" "encoding/json" @@ -133,6 +134,14 @@ func readBuiltInIntegration(dirpath string) ( } integration.Id = "builtin-" + integration.Id + if len(integration.DataCollected.Metrics) > 0 { + metricsForConnTest := []string{} + for _, collectedMetric := range integration.DataCollected.Metrics { + promName := toPromMetricName(collectedMetric.Name) + metricsForConnTest = append(metricsForConnTest, promName) + } + integration.ConnectionTests.Metrics = metricsForConnTest + } return &integration, nil } @@ -223,3 +232,34 @@ func readFileIfUri(maybeFileUri string, basedir string) (interface{}, error) { return nil, fmt.Errorf("unsupported file type %s", maybeFileUri) } + +// copied from signoz clickhouse exporter's `sanitize` which +// in turn is copied from prometheus-go-metric-exporter +// +// replaces non-alphanumeric characters with underscores in s. +func toPromMetricName(s string) string { + if len(s) == 0 { + return s + } + + // Note: No length limit for label keys because Prometheus doesn't + // define a length limit, thus we should NOT be truncating label keys. + // See https://github.com/orijtech/prometheus-go-metrics-exporter/issues/4. + + s = strings.Map(func(r rune) rune { + // sanitizeRune converts anything that is not a letter or digit to an underscore + if unicode.IsLetter(r) || unicode.IsDigit(r) { + return r + } + // Everything else turns into an underscore + return '_' + }, s) + + if unicode.IsDigit(rune(s[0])) { + s = "key" + "_" + s + } + if s[0] == '_' { + s = "key" + s + } + return s +} diff --git a/pkg/query-service/app/integrations/controller.go b/pkg/query-service/app/integrations/controller.go index a45ab3fb04..8695c4b1cd 100644 --- a/pkg/query-service/app/integrations/controller.go +++ b/pkg/query-service/app/integrations/controller.go @@ -63,6 +63,18 @@ func (c *Controller) GetIntegration( return c.mgr.GetIntegration(ctx, integrationId) } +func (c *Controller) IsIntegrationInstalled( + ctx context.Context, + integrationId string, +) (bool, *model.ApiError) { + installation, apiErr := c.mgr.getInstalledIntegration(ctx, integrationId) + if apiErr != nil { + return false, apiErr + } + isInstalled := installation != nil + return isInstalled, nil +} + func (c *Controller) GetIntegrationConnectionTests( ctx context.Context, integrationId string, ) (*IntegrationConnectionTests, *model.ApiError) { diff --git a/pkg/query-service/app/integrations/manager.go b/pkg/query-service/app/integrations/manager.go index 110d370c1b..c3ebd21cc2 100644 --- a/pkg/query-service/app/integrations/manager.go +++ b/pkg/query-service/app/integrations/manager.go @@ -76,9 +76,11 @@ type IntegrationConnectionStatus struct { } type IntegrationConnectionTests struct { + // Filter to use for finding logs for the integration. Logs *v3.FilterSet `json:"logs"` - // TODO(Raj): Add connection tests for other signals. + // Metric names expected to have been received for the integration. + Metrics []string `json:"metrics"` } type IntegrationDetails struct { diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 1ca1fd9958..eefb10a0c0 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -67,6 +67,9 @@ type Reader interface { GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) + // Returns `MetricStatus` for latest received metric among `metricNames`. Useful for status calculations + GetLatestReceivedMetric(ctx context.Context, metricNames []string) (*model.MetricStatus, *model.ApiError) + // QB V3 metrics/traces/logs GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error) GetListResultV3(ctx context.Context, query string) ([]*v3.Row, error) diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 05da7f5ab7..aad137714c 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -511,6 +511,12 @@ type MetricPoint struct { Value float64 } +type MetricStatus struct { + MetricName string + LastReceivedTsMillis int64 + LastReceivedLabels map[string]string +} + // MarshalJSON implements json.Marshaler. func (p *MetricPoint) MarshalJSON() ([]byte, error) { v := strconv.FormatFloat(p.Value, 'f', -1, 64) diff --git a/pkg/query-service/tests/integration/signoz_integrations_test.go b/pkg/query-service/tests/integration/signoz_integrations_test.go index 5294d06081..292e353401 100644 --- a/pkg/query-service/tests/integration/signoz_integrations_test.go +++ b/pkg/query-service/tests/integration/signoz_integrations_test.go @@ -9,6 +9,7 @@ import ( "runtime/debug" "slices" "testing" + "time" "github.com/jmoiron/sqlx" mockhouse "github.com/srikanthccv/ClickHouse-go-mock" @@ -65,18 +66,30 @@ func TestSignozIntegrationLifeCycle(t *testing.T) { // Integration connection status should get updated after signal data has been received. testbed.mockLogQueryResponse([]model.SignozLog{}) + testbed.mockMetricStatusQueryResponse(nil) connectionStatus := testbed.GetIntegrationConnectionStatus(ii.Id) require.NotNil(connectionStatus) require.Nil(connectionStatus.Logs) + require.Nil(connectionStatus.Metrics) testLog := makeTestSignozLog("test log body", map[string]interface{}{ "source": "nginx", }) testbed.mockLogQueryResponse([]model.SignozLog{testLog}) + + testMetricName := ii.ConnectionTests.Metrics[0] + testMetricLastReceivedTs := time.Now().UnixMilli() + testbed.mockMetricStatusQueryResponse(&model.MetricStatus{ + MetricName: testMetricName, + LastReceivedTsMillis: testMetricLastReceivedTs, + }) + connectionStatus = testbed.GetIntegrationConnectionStatus(ii.Id) require.NotNil(connectionStatus) require.NotNil(connectionStatus.Logs) require.Equal(connectionStatus.Logs.LastReceivedTsMillis, int64(testLog.Timestamp/1000000)) + require.NotNil(connectionStatus.Metrics) + require.Equal(connectionStatus.Metrics.LastReceivedTsMillis, testMetricLastReceivedTs) // Should be able to uninstall integration require.True(availableIntegrations[0].IsInstalled) @@ -516,6 +529,32 @@ func (tb *IntegrationsTestBed) mockLogQueryResponse(logsInResponse []model.Signo addLogsQueryExpectation(tb.mockClickhouse, logsInResponse) } +func (tb *IntegrationsTestBed) mockMetricStatusQueryResponse(expectation *model.MetricStatus) { + cols := []mockhouse.ColumnType{} + cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "metric_name"}) + cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "labels"}) + cols = append(cols, mockhouse.ColumnType{Type: "Int64", Name: "unix_milli"}) + + values := [][]any{} + if expectation != nil { + rowValues := []any{} + + rowValues = append(rowValues, expectation.MetricName) + + labelsJson, err := json.Marshal(expectation.LastReceivedLabels) + require.Nil(tb.t, err) + rowValues = append(rowValues, labelsJson) + + rowValues = append(rowValues, expectation.LastReceivedTsMillis) + + values = append(values, rowValues) + } + + tb.mockClickhouse.ExpectQuery( + `SELECT.*metric_name, labels, unix_milli.*from.*signoz_metrics.*where metric_name in.*limit 1.*`, + ).WillReturnRows(mockhouse.NewRows(cols, values)) +} + // testDB can be injected for sharing a DB across multiple integration testbeds. func NewIntegrationsTestBed(t *testing.T, testDB *sqlx.DB) *IntegrationsTestBed { if testDB == nil { @@ -529,6 +568,7 @@ func NewIntegrationsTestBed(t *testing.T, testDB *sqlx.DB) *IntegrationsTestBed fm := featureManager.StartManager() reader, mockClickhouse := NewMockClickhouseReader(t, testDB, fm) + mockClickhouse.MatchExpectationsInOrder(false) apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{ Reader: reader,