Feat: integrations v0 metrics connection status (#4715)

* chore: add test expectations for integration metrics connection status

* chore: reorg logs connection status calculation for parallelization

* chore: add interface for reader.GetMetricLastReceivedTsMillis

* chore: add plumbing for calculating integration metrics connection status

* chore: impl and test mocks for reader.GetMetricReceivedLatest

* chore: wrap things up and get test passing

* chore: some cleanup

* chore: some more cleanup

* chore: use prom metric names for integration connection test
This commit is contained in:
Raj Kamal Singh 2024-03-18 10:01:53 +05:30 committed by GitHub
parent 4c2174958f
commit 43f9830e8d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 301 additions and 47 deletions

View File

@ -72,6 +72,7 @@ const (
signozSampleLocalTableName = "samples_v2"
signozSampleTableName = "distributed_samples_v2"
signozTSTableName = "distributed_time_series_v2"
signozTSTableNameV4 = "distributed_time_series_v4"
signozTSTableNameV41Day = "distributed_time_series_v4_1day"
minTimespanForProgressiveSearch = time.Hour
@ -4271,6 +4272,67 @@ func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, metricName, se
}, nil
}
func (r *ClickHouseReader) GetLatestReceivedMetric(
ctx context.Context, metricNames []string,
) (*model.MetricStatus, *model.ApiError) {
if len(metricNames) < 1 {
return nil, nil
}
quotedMetricNames := []string{}
for _, m := range metricNames {
quotedMetricNames = append(quotedMetricNames, fmt.Sprintf(`'%s'`, m))
}
commaSeparatedMetricNames := strings.Join(quotedMetricNames, ", ")
query := fmt.Sprintf(`
SELECT metric_name, labels, unix_milli
from %s.%s
where metric_name in (
%s
)
order by unix_milli desc
limit 1
`, signozMetricDBName, signozTSTableNameV4, commaSeparatedMetricNames,
)
rows, err := r.db.Query(ctx, query)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't query clickhouse for received metrics status: %w", err,
))
}
defer rows.Close()
var result *model.MetricStatus
if rows.Next() {
result = &model.MetricStatus{}
var labelsJson string
err := rows.Scan(
&result.MetricName,
&labelsJson,
&result.LastReceivedTsMillis,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't scan metric status row: %w", err,
))
}
err = json.Unmarshal([]byte(labelsJson), &result.LastReceivedLabels)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't unmarshal metric labels json: %w", err,
))
}
}
return result, nil
}
func isColumn(tableStatement, attrType, field, datType string) bool {
// value of attrType will be `resource` or `tag`, if `tag` change it to `attribute`
name := utils.GetClickhouseColumnName(attrType, datType, field)

View File

@ -2481,11 +2481,25 @@ func (ah *APIHandler) GetIntegrationConnectionStatus(
w http.ResponseWriter, r *http.Request,
) {
integrationId := mux.Vars(r)["integrationId"]
isInstalled, apiErr := ah.IntegrationsController.IsIntegrationInstalled(
r.Context(), integrationId,
)
if apiErr != nil {
RespondError(w, apiErr, "failed to check if integration is installed")
return
}
// Do not spend resources calculating connection status unless installed.
if !isInstalled {
ah.Respond(w, &integrations.IntegrationConnectionStatus{})
return
}
connectionTests, apiErr := ah.IntegrationsController.GetIntegrationConnectionTests(
r.Context(), integrationId,
)
if apiErr != nil {
RespondError(w, apiErr, "Failed to fetch integration connection tests")
RespondError(w, apiErr, "failed to fetch integration connection tests")
return
}
@ -2511,65 +2525,140 @@ func (ah *APIHandler) calculateConnectionStatus(
connectionTests *integrations.IntegrationConnectionTests,
lookbackSeconds int64,
) (*integrations.IntegrationConnectionStatus, *model.ApiError) {
// Calculate connection status for signals in parallel
result := &integrations.IntegrationConnectionStatus{}
errors := []*model.ApiError{}
var resultLock sync.Mutex
if connectionTests.Logs != nil {
qrParams := &v3.QueryRangeParamsV3{
Start: time.Now().UnixMilli() - (lookbackSeconds * 1000),
End: time.Now().UnixMilli(),
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeList,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
PageSize: 1,
Filters: connectionTests.Logs,
QueryName: "A",
DataSource: v3.DataSourceLogs,
Expression: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
},
},
},
}
queryRes, err, _ := ah.querier.QueryRange(
ctx, qrParams, map[string]v3.AttributeKey{},
var wg sync.WaitGroup
// Calculate logs connection status
wg.Add(1)
go func() {
defer wg.Done()
logsConnStatus, apiErr := ah.calculateLogsConnectionStatus(
ctx, connectionTests.Logs, lookbackSeconds,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not query for integration connection status: %w", err,
))
}
if len(queryRes) > 0 && queryRes[0].List != nil && len(queryRes[0].List) > 0 {
lastLog := queryRes[0].List[0]
resultLock.Lock()
defer resultLock.Unlock()
if apiErr != nil {
errors = append(errors, apiErr)
} else {
result.Logs = logsConnStatus
}
}()
// Calculate metrics connection status
wg.Add(1)
go func() {
defer wg.Done()
if connectionTests.Metrics == nil || len(connectionTests.Metrics) < 1 {
return
}
statusForLastReceivedMetric, apiErr := ah.reader.GetLatestReceivedMetric(
ctx, connectionTests.Metrics,
)
resultLock.Lock()
defer resultLock.Unlock()
if apiErr != nil {
errors = append(errors, apiErr)
} else if statusForLastReceivedMetric != nil {
resourceSummaryParts := []string{}
lastLogResourceAttribs := lastLog.Data["resources_string"]
if lastLogResourceAttribs != nil {
resourceAttribs, ok := lastLogResourceAttribs.(*map[string]string)
if !ok {
return nil, model.InternalError(fmt.Errorf(
"could not cast log resource attribs",
))
}
for k, v := range *resourceAttribs {
resourceSummaryParts = append(resourceSummaryParts, fmt.Sprintf(
"%s=%s", k, v,
))
}
for k, v := range statusForLastReceivedMetric.LastReceivedLabels {
resourceSummaryParts = append(resourceSummaryParts, fmt.Sprintf(
"%s=%s", k, v,
))
}
lastLogResourceSummary := strings.Join(resourceSummaryParts, ", ")
result.Logs = &integrations.SignalConnectionStatus{
LastReceivedTsMillis: lastLog.Timestamp.UnixMilli(),
LastReceivedFrom: lastLogResourceSummary,
result.Metrics = &integrations.SignalConnectionStatus{
LastReceivedFrom: strings.Join(resourceSummaryParts, ", "),
LastReceivedTsMillis: statusForLastReceivedMetric.LastReceivedTsMillis,
}
}
}()
wg.Wait()
if len(errors) > 0 {
return nil, errors[0]
}
return result, nil
}
func (ah *APIHandler) calculateLogsConnectionStatus(
ctx context.Context,
logsConnectionTest *v3.FilterSet,
lookbackSeconds int64,
) (*integrations.SignalConnectionStatus, *model.ApiError) {
if logsConnectionTest == nil {
return nil, nil
}
qrParams := &v3.QueryRangeParamsV3{
Start: time.Now().UnixMilli() - (lookbackSeconds * 1000),
End: time.Now().UnixMilli(),
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeList,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
PageSize: 1,
Filters: logsConnectionTest,
QueryName: "A",
DataSource: v3.DataSourceLogs,
Expression: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
},
},
},
}
queryRes, err, _ := ah.querier.QueryRange(
ctx, qrParams, map[string]v3.AttributeKey{},
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not query for integration connection status: %w", err,
))
}
if len(queryRes) > 0 && queryRes[0].List != nil && len(queryRes[0].List) > 0 {
lastLog := queryRes[0].List[0]
resourceSummaryParts := []string{}
lastLogResourceAttribs := lastLog.Data["resources_string"]
if lastLogResourceAttribs != nil {
resourceAttribs, ok := lastLogResourceAttribs.(*map[string]string)
if !ok {
return nil, model.InternalError(fmt.Errorf(
"could not cast log resource attribs",
))
}
for k, v := range *resourceAttribs {
resourceSummaryParts = append(resourceSummaryParts, fmt.Sprintf(
"%s=%s", k, v,
))
}
}
lastLogResourceSummary := strings.Join(resourceSummaryParts, ", ")
return &integrations.SignalConnectionStatus{
LastReceivedTsMillis: lastLog.Timestamp.UnixMilli(),
LastReceivedFrom: lastLogResourceSummary,
}, nil
}
return nil, nil
}
func (ah *APIHandler) InstallIntegration(
w http.ResponseWriter, r *http.Request,
) {

View File

@ -4,6 +4,7 @@ import (
"context"
"embed"
"strings"
"unicode"
"encoding/base64"
"encoding/json"
@ -133,6 +134,14 @@ func readBuiltInIntegration(dirpath string) (
}
integration.Id = "builtin-" + integration.Id
if len(integration.DataCollected.Metrics) > 0 {
metricsForConnTest := []string{}
for _, collectedMetric := range integration.DataCollected.Metrics {
promName := toPromMetricName(collectedMetric.Name)
metricsForConnTest = append(metricsForConnTest, promName)
}
integration.ConnectionTests.Metrics = metricsForConnTest
}
return &integration, nil
}
@ -223,3 +232,34 @@ func readFileIfUri(maybeFileUri string, basedir string) (interface{}, error) {
return nil, fmt.Errorf("unsupported file type %s", maybeFileUri)
}
// copied from signoz clickhouse exporter's `sanitize` which
// in turn is copied from prometheus-go-metric-exporter
//
// replaces non-alphanumeric characters with underscores in s.
func toPromMetricName(s string) string {
if len(s) == 0 {
return s
}
// Note: No length limit for label keys because Prometheus doesn't
// define a length limit, thus we should NOT be truncating label keys.
// See https://github.com/orijtech/prometheus-go-metrics-exporter/issues/4.
s = strings.Map(func(r rune) rune {
// sanitizeRune converts anything that is not a letter or digit to an underscore
if unicode.IsLetter(r) || unicode.IsDigit(r) {
return r
}
// Everything else turns into an underscore
return '_'
}, s)
if unicode.IsDigit(rune(s[0])) {
s = "key" + "_" + s
}
if s[0] == '_' {
s = "key" + s
}
return s
}

View File

@ -63,6 +63,18 @@ func (c *Controller) GetIntegration(
return c.mgr.GetIntegration(ctx, integrationId)
}
func (c *Controller) IsIntegrationInstalled(
ctx context.Context,
integrationId string,
) (bool, *model.ApiError) {
installation, apiErr := c.mgr.getInstalledIntegration(ctx, integrationId)
if apiErr != nil {
return false, apiErr
}
isInstalled := installation != nil
return isInstalled, nil
}
func (c *Controller) GetIntegrationConnectionTests(
ctx context.Context, integrationId string,
) (*IntegrationConnectionTests, *model.ApiError) {

View File

@ -76,9 +76,11 @@ type IntegrationConnectionStatus struct {
}
type IntegrationConnectionTests struct {
// Filter to use for finding logs for the integration.
Logs *v3.FilterSet `json:"logs"`
// TODO(Raj): Add connection tests for other signals.
// Metric names expected to have been received for the integration.
Metrics []string `json:"metrics"`
}
type IntegrationDetails struct {

View File

@ -67,6 +67,9 @@ type Reader interface {
GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
// Returns `MetricStatus` for latest received metric among `metricNames`. Useful for status calculations
GetLatestReceivedMetric(ctx context.Context, metricNames []string) (*model.MetricStatus, *model.ApiError)
// QB V3 metrics/traces/logs
GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error)
GetListResultV3(ctx context.Context, query string) ([]*v3.Row, error)

View File

@ -511,6 +511,12 @@ type MetricPoint struct {
Value float64
}
type MetricStatus struct {
MetricName string
LastReceivedTsMillis int64
LastReceivedLabels map[string]string
}
// MarshalJSON implements json.Marshaler.
func (p *MetricPoint) MarshalJSON() ([]byte, error) {
v := strconv.FormatFloat(p.Value, 'f', -1, 64)

View File

@ -9,6 +9,7 @@ import (
"runtime/debug"
"slices"
"testing"
"time"
"github.com/jmoiron/sqlx"
mockhouse "github.com/srikanthccv/ClickHouse-go-mock"
@ -65,18 +66,30 @@ func TestSignozIntegrationLifeCycle(t *testing.T) {
// Integration connection status should get updated after signal data has been received.
testbed.mockLogQueryResponse([]model.SignozLog{})
testbed.mockMetricStatusQueryResponse(nil)
connectionStatus := testbed.GetIntegrationConnectionStatus(ii.Id)
require.NotNil(connectionStatus)
require.Nil(connectionStatus.Logs)
require.Nil(connectionStatus.Metrics)
testLog := makeTestSignozLog("test log body", map[string]interface{}{
"source": "nginx",
})
testbed.mockLogQueryResponse([]model.SignozLog{testLog})
testMetricName := ii.ConnectionTests.Metrics[0]
testMetricLastReceivedTs := time.Now().UnixMilli()
testbed.mockMetricStatusQueryResponse(&model.MetricStatus{
MetricName: testMetricName,
LastReceivedTsMillis: testMetricLastReceivedTs,
})
connectionStatus = testbed.GetIntegrationConnectionStatus(ii.Id)
require.NotNil(connectionStatus)
require.NotNil(connectionStatus.Logs)
require.Equal(connectionStatus.Logs.LastReceivedTsMillis, int64(testLog.Timestamp/1000000))
require.NotNil(connectionStatus.Metrics)
require.Equal(connectionStatus.Metrics.LastReceivedTsMillis, testMetricLastReceivedTs)
// Should be able to uninstall integration
require.True(availableIntegrations[0].IsInstalled)
@ -516,6 +529,32 @@ func (tb *IntegrationsTestBed) mockLogQueryResponse(logsInResponse []model.Signo
addLogsQueryExpectation(tb.mockClickhouse, logsInResponse)
}
func (tb *IntegrationsTestBed) mockMetricStatusQueryResponse(expectation *model.MetricStatus) {
cols := []mockhouse.ColumnType{}
cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "metric_name"})
cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "labels"})
cols = append(cols, mockhouse.ColumnType{Type: "Int64", Name: "unix_milli"})
values := [][]any{}
if expectation != nil {
rowValues := []any{}
rowValues = append(rowValues, expectation.MetricName)
labelsJson, err := json.Marshal(expectation.LastReceivedLabels)
require.Nil(tb.t, err)
rowValues = append(rowValues, labelsJson)
rowValues = append(rowValues, expectation.LastReceivedTsMillis)
values = append(values, rowValues)
}
tb.mockClickhouse.ExpectQuery(
`SELECT.*metric_name, labels, unix_milli.*from.*signoz_metrics.*where metric_name in.*limit 1.*`,
).WillReturnRows(mockhouse.NewRows(cols, values))
}
// testDB can be injected for sharing a DB across multiple integration testbeds.
func NewIntegrationsTestBed(t *testing.T, testDB *sqlx.DB) *IntegrationsTestBed {
if testDB == nil {
@ -529,6 +568,7 @@ func NewIntegrationsTestBed(t *testing.T, testDB *sqlx.DB) *IntegrationsTestBed
fm := featureManager.StartManager()
reader, mockClickhouse := NewMockClickhouseReader(t, testDB, fm)
mockClickhouse.MatchExpectationsInOrder(false)
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{
Reader: reader,