Feat: qs api integration connection status (#4628)

* chore: add integration attribs for connection tests and status

* chore: add connection status to integration details response

* chore: update integration lifecycle test to check for connection status too

* feat: add GetIntegrationConnectionTests to integrations manager and controller

* chore: add tests for querying integration connection status

* feat: add http API support for integration connection status

* chore: some cleanups

* chore: use PostableRule for integration alerts

* chore: some more cleanup
This commit is contained in:
Raj Kamal Singh 2024-03-05 15:23:56 +05:30 committed by GitHub
parent fdd7e022e9
commit ab5285dee6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 392 additions and 17 deletions

View File

@ -143,6 +143,17 @@ func NewReader(
os.Exit(1)
}
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster)
}
func NewReaderFromClickhouseConnection(
db driver.Conn,
options *Options,
localDB *sqlx.DB,
configFile string,
featureFlag interfaces.FeatureLookup,
cluster string,
) *ClickHouseReader {
alertManager, err := am.New("")
if err != nil {
zap.S().Errorf("msg: failed to initialize alert manager: ", "/t error:", err)

View File

@ -2412,6 +2412,11 @@ func (ah *APIHandler) RegisterIntegrationRoutes(router *mux.Router, am *AuthMidd
"/uninstall", am.ViewAccess(ah.UninstallIntegration),
).Methods(http.MethodPost)
// Used for polling for status in v0
subRouter.HandleFunc(
"/{integrationId}/connection_status", am.ViewAccess(ah.GetIntegrationConnectionStatus),
).Methods(http.MethodGet)
subRouter.HandleFunc(
"/{integrationId}", am.ViewAccess(ah.GetIntegration),
).Methods(http.MethodGet)
@ -2443,14 +2448,112 @@ func (ah *APIHandler) GetIntegration(
w http.ResponseWriter, r *http.Request,
) {
integrationId := mux.Vars(r)["integrationId"]
resp, apiErr := ah.IntegrationsController.GetIntegration(
integration, apiErr := ah.IntegrationsController.GetIntegration(
r.Context(), integrationId,
)
if apiErr != nil {
RespondError(w, apiErr, "Failed to fetch integration details")
return
}
ah.Respond(w, resp)
// Add connection status details.
connectionStatus, apiErr := ah.calculateConnectionStatus(
r.Context(), integration.ConnectionTests,
)
if apiErr != nil {
RespondError(w, apiErr, "Failed to calculate integration connection status")
return
}
integration.ConnectionStatus = connectionStatus
ah.Respond(w, integration)
}
func (ah *APIHandler) GetIntegrationConnectionStatus(
w http.ResponseWriter, r *http.Request,
) {
integrationId := mux.Vars(r)["integrationId"]
connectionTests, apiErr := ah.IntegrationsController.GetIntegrationConnectionTests(
r.Context(), integrationId,
)
if apiErr != nil {
RespondError(w, apiErr, "Failed to fetch integration connection tests")
return
}
connectionStatus, apiErr := ah.calculateConnectionStatus(
r.Context(), connectionTests,
)
if apiErr != nil {
RespondError(w, apiErr, "Failed to calculate integration connection status")
return
}
ah.Respond(w, connectionStatus)
}
func (ah *APIHandler) calculateConnectionStatus(
ctx context.Context,
connectionTests *integrations.IntegrationConnectionTests,
) (*integrations.IntegrationConnectionStatus, *model.ApiError) {
result := &integrations.IntegrationConnectionStatus{}
if connectionTests.Logs != nil {
qrParams := &v3.QueryRangeParamsV3{
// Look back up to 7 days for integration logs
Start: time.Now().UnixMilli() - (7 * 86400000),
End: time.Now().UnixMilli(),
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeList,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
PageSize: 1,
Filters: connectionTests.Logs,
QueryName: "A",
DataSource: v3.DataSourceLogs,
Expression: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
},
},
},
}
queryRes, err, _ := ah.querier.QueryRange(
ctx, qrParams, map[string]v3.AttributeKey{},
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not query for integration connection status: %w", err,
))
}
if len(queryRes) > 0 && queryRes[0].List != nil && len(queryRes[0].List) > 0 {
lastLog := queryRes[0].List[0]
resourceSummaryParts := []string{}
lastLogResourceAttribs := lastLog.Data["resources_string"]
if lastLogResourceAttribs != nil {
resourceAttribs, ok := lastLogResourceAttribs.(*map[string]string)
if !ok {
return nil, model.InternalError(fmt.Errorf(
"could not cast log resource attribs",
))
}
for k, v := range *resourceAttribs {
resourceSummaryParts = append(resourceSummaryParts, fmt.Sprintf(
"%s=%s", k, v,
))
}
}
lastLogResourceSummary := strings.Join(resourceSummaryParts, ", ")
result.Logs = &integrations.SignalConnectionStatus{
LastReceivedTsMillis: lastLog.Timestamp.UnixMilli(),
LastReceivedFrom: lastLogResourceSummary,
}
}
}
return result, nil
}
func (ah *APIHandler) InstallIntegration(

View File

@ -60,6 +60,12 @@ func (c *Controller) GetIntegration(
return c.mgr.GetIntegration(ctx, integrationId)
}
func (c *Controller) GetIntegrationConnectionTests(
ctx context.Context, integrationId string,
) (*IntegrationConnectionTests, *model.ApiError) {
return c.mgr.GetIntegrationConnectionTests(ctx, integrationId)
}
type InstallIntegrationRequest struct {
IntegrationId string `json:"integration_id"`
Config map[string]interface{} `json:"config"`

View File

@ -11,6 +11,8 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/rules"
)
type IntegrationAuthor struct {
@ -32,8 +34,7 @@ type IntegrationAssets struct {
Logs LogsAssets `json:"logs"`
Dashboards []dashboards.Dashboard `json:"dashboards"`
// TODO(Raj): Maybe use a struct for alerts
Alerts []map[string]interface{} `json:"alerts"`
Alerts []rules.PostableRule `json:"alerts"`
}
type LogsAssets struct {
@ -62,6 +63,22 @@ type CollectedMetric struct {
Unit string `json:"unit"`
}
type SignalConnectionStatus struct {
LastReceivedTsMillis int64 `json:"last_received_ts_ms"` // epoch milliseconds
LastReceivedFrom string `json:"last_received_from"` // resource identifier
}
type IntegrationConnectionStatus struct {
Logs *SignalConnectionStatus `json:"logs"`
Metrics *SignalConnectionStatus `json:"metrics"`
}
type IntegrationConnectionTests struct {
Logs *v3.FilterSet `json:"logs"`
// TODO(Raj): Add connection tests for other signals.
}
type IntegrationDetails struct {
IntegrationSummary
@ -70,6 +87,10 @@ type IntegrationDetails struct {
Configuration []IntegrationConfigStep `json:"configuration"`
DataCollected DataCollectedForIntegration `json:"data_collected"`
Assets IntegrationAssets `json:"assets"`
ConnectionTests *IntegrationConnectionTests `json:"connection_tests"`
// ConnectionStatus gets derived using `ConnectionTests`
ConnectionStatus *IntegrationConnectionStatus `json:"connection_status"`
}
type IntegrationsListItem struct {
@ -183,6 +204,19 @@ func (m *Manager) GetIntegration(
}, nil
}
func (m *Manager) GetIntegrationConnectionTests(
ctx context.Context,
integrationId string,
) (*IntegrationConnectionTests, *model.ApiError) {
integrationDetails, apiErr := m.getIntegrationDetails(
ctx, integrationId,
)
if apiErr != nil {
return nil, apiErr
}
return integrationDetails.ConnectionTests, nil
}
func (m *Manager) InstallIntegration(
ctx context.Context,
integrationId string,

View File

@ -11,6 +11,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/rules"
)
func NewTestSqliteDB(t *testing.T) (
@ -88,12 +89,12 @@ func (t *TestAvailableIntegrationsRepo) list(
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "method",
Key: "source",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
Value: "nginx",
},
},
},
@ -112,7 +113,23 @@ func (t *TestAvailableIntegrationsRepo) list(
},
},
Dashboards: []dashboards.Dashboard{},
Alerts: []map[string]interface{}{},
Alerts: []rules.PostableRule{},
},
ConnectionTests: &IntegrationConnectionTests{
Logs: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "source",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "nginx",
},
},
},
},
}, {
IntegrationSummary: IntegrationSummary{
@ -150,12 +167,12 @@ func (t *TestAvailableIntegrationsRepo) list(
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "method",
Key: "source",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
Value: "redis",
},
},
},
@ -174,7 +191,23 @@ func (t *TestAvailableIntegrationsRepo) list(
},
},
Dashboards: []dashboards.Dashboard{},
Alerts: []map[string]interface{}{},
Alerts: []rules.PostableRule{},
},
ConnectionTests: &IntegrationConnectionTests{
Logs: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "source",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "nginx",
},
},
},
},
},
}, nil

View File

@ -9,11 +9,13 @@ import (
"runtime/debug"
"testing"
mockhouse "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/integrations"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/dao"
"go.signoz.io/signoz/pkg/query-service/featureManager"
"go.signoz.io/signoz/pkg/query-service/model"
)
@ -36,14 +38,18 @@ func TestSignozIntegrationLifeCycle(t *testing.T) {
"some integrations should come bundled with SigNoz",
)
// Should be able to install integration
require.False(availableIntegrations[0].IsInstalled)
testbed.RequestQSToInstallIntegration(
availableIntegrations[0].Id, map[string]interface{}{},
)
testbed.mockLogQueryResponse([]model.SignozLog{})
ii := testbed.GetIntegrationDetailsFromQS(availableIntegrations[0].Id)
require.Equal(ii.Id, availableIntegrations[0].Id)
require.NotNil(ii.Installation)
require.NotNil(ii.ConnectionStatus)
require.Nil(ii.ConnectionStatus.Logs)
installedResp = testbed.GetInstalledIntegrationsFromQS()
installedIntegrations := installedResp.Integrations
@ -54,11 +60,29 @@ func TestSignozIntegrationLifeCycle(t *testing.T) {
availableIntegrations = availableResp.Integrations
require.Greater(len(availableIntegrations), 0)
// Integration connection status should get updated after signal data has been received.
testLog := makeTestSignozLog("test log body", map[string]interface{}{
"source": "nginx",
})
testbed.mockLogQueryResponse([]model.SignozLog{testLog})
connectionStatus := testbed.GetIntegrationConnectionStatus(ii.Id)
require.NotNil(connectionStatus)
require.NotNil(connectionStatus.Logs)
require.Equal(connectionStatus.Logs.LastReceivedTsMillis, int64(testLog.Timestamp/1000000))
testbed.mockLogQueryResponse([]model.SignozLog{testLog})
ii = testbed.GetIntegrationDetailsFromQS(ii.Id)
require.NotNil(ii.ConnectionStatus)
require.NotNil(ii.ConnectionStatus.Logs)
require.Equal(connectionStatus.Logs.LastReceivedTsMillis, int64(testLog.Timestamp/1000000))
// Should be able to uninstall integration
require.True(availableIntegrations[0].IsInstalled)
testbed.RequestQSToUninstallIntegration(
availableIntegrations[0].Id,
)
testbed.mockLogQueryResponse([]model.SignozLog{})
ii = testbed.GetIntegrationDetailsFromQS(availableIntegrations[0].Id)
require.Equal(ii.Id, availableIntegrations[0].Id)
require.Nil(ii.Installation)
@ -74,9 +98,10 @@ func TestSignozIntegrationLifeCycle(t *testing.T) {
}
type IntegrationsTestBed struct {
t *testing.T
testUser *model.User
qsHttpHandler http.Handler
t *testing.T
testUser *model.User
qsHttpHandler http.Handler
mockClickhouse mockhouse.ClickConnMockCommon
}
func (tb *IntegrationsTestBed) GetAvailableIntegrationsFromQS() *integrations.IntegrationsListResponse {
@ -125,12 +150,32 @@ func (tb *IntegrationsTestBed) GetIntegrationDetailsFromQS(
var integrationResp integrations.Integration
err = json.Unmarshal(dataJson, &integrationResp)
if err != nil {
tb.t.Fatalf("could not unmarshal apiResponse.Data json into PipelinesResponse")
tb.t.Fatalf("could not unmarshal apiResponse.Data json")
}
return &integrationResp
}
func (tb *IntegrationsTestBed) GetIntegrationConnectionStatus(
integrationId string,
) *integrations.IntegrationConnectionStatus {
result := tb.RequestQS(fmt.Sprintf(
"/api/v1/integrations/%s/connection_status", integrationId,
), nil)
dataJson, err := json.Marshal(result.Data)
if err != nil {
tb.t.Fatalf("could not marshal apiResponse.Data: %v", err)
}
var connectionStatus integrations.IntegrationConnectionStatus
err = json.Unmarshal(dataJson, &connectionStatus)
if err != nil {
tb.t.Fatalf("could not unmarshal apiResponse.Data json")
}
return &connectionStatus
}
func (tb *IntegrationsTestBed) RequestQSToInstallIntegration(
integrationId string, config map[string]interface{},
) {
@ -188,6 +233,10 @@ func (tb *IntegrationsTestBed) RequestQS(
return &result
}
func (tb *IntegrationsTestBed) mockLogQueryResponse(logsInResponse []model.SignozLog) {
addLogsQueryExpectation(tb.mockClickhouse, logsInResponse)
}
func NewIntegrationsTestBed(t *testing.T) *IntegrationsTestBed {
testDB, testDBFilePath := integrations.NewTestSqliteDB(t)
@ -199,9 +248,14 @@ func NewIntegrationsTestBed(t *testing.T) *IntegrationsTestBed {
t.Fatalf("could not create integrations controller: %v", err)
}
fm := featureManager.StartManager()
reader, mockClickhouse := NewMockClickhouseReader(t, testDB, fm)
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{
Reader: reader,
AppDao: dao.DB(),
IntegrationsController: controller,
FeatureFlags: fm,
})
if err != nil {
t.Fatalf("could not create a new ApiHandler: %v", err)
@ -217,8 +271,9 @@ func NewIntegrationsTestBed(t *testing.T) *IntegrationsTestBed {
}
return &IntegrationsTestBed{
t: t,
testUser: user,
qsHttpHandler: router,
t: t,
testUser: user,
qsHttpHandler: router,
mockClickhouse: mockClickhouse,
}
}

View File

@ -0,0 +1,133 @@
package tests
import (
"fmt"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
mockhouse "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
"golang.org/x/exp/maps"
)
func NewMockClickhouseReader(
t *testing.T, testDB *sqlx.DB, featureFlags interfaces.FeatureLookup,
) (
*clickhouseReader.ClickHouseReader, mockhouse.ClickConnMockCommon,
) {
require.NotNil(t, testDB)
mockDB, err := mockhouse.NewClickHouseWithQueryMatcher(nil, sqlmock.QueryMatcherRegexp)
require.Nil(t, err, "could not init mock clickhouse")
reader := clickhouseReader.NewReaderFromClickhouseConnection(
mockDB,
clickhouseReader.NewOptions("", 10, 10, 10*time.Second, ""),
testDB,
"",
featureFlags,
"",
)
return reader, mockDB
}
func addLogsQueryExpectation(
mockClickhouse mockhouse.ClickConnMockCommon,
logsToReturn []model.SignozLog,
) {
cols := []mockhouse.ColumnType{}
cols = append(cols, mockhouse.ColumnType{Type: "UInt64", Name: "timestamp"})
cols = append(cols, mockhouse.ColumnType{Type: "UInt64", Name: "observed_timestamp"})
cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "id"})
cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "trace_id"})
cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "span_id"})
cols = append(cols, mockhouse.ColumnType{Type: "UInt32", Name: "trace_flags"})
cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "severity_text"})
cols = append(cols, mockhouse.ColumnType{Type: "UInt8", Name: "severity_number"})
cols = append(cols, mockhouse.ColumnType{Type: "String", Name: "body"})
cols = append(cols, mockhouse.ColumnType{Type: "Array(String)", Name: "resources_string_key"})
cols = append(cols, mockhouse.ColumnType{Type: "Array(String)", Name: "resources_string_value"})
cols = append(cols, mockhouse.ColumnType{Type: "Array(String)", Name: "attributes_string_key"})
cols = append(cols, mockhouse.ColumnType{Type: "Array(String)", Name: "attributes_string_value"})
cols = append(cols, mockhouse.ColumnType{Type: "Array(String)", Name: "attributes_int64_key"})
cols = append(cols, mockhouse.ColumnType{Type: "Array(Int64)", Name: "attributes_int64_value"})
cols = append(cols, mockhouse.ColumnType{Type: "Array(String)", Name: "attributes_float64_key"})
cols = append(cols, mockhouse.ColumnType{Type: "Array(Float64)", Name: "attributes_float64_value"})
cols = append(cols, mockhouse.ColumnType{Type: "Array(String)", Name: "attributes_bool_key"})
cols = append(cols, mockhouse.ColumnType{Type: "Array(Bool)", Name: "attributes_bool_value"})
values := [][]any{}
for _, l := range logsToReturn {
rowValues := []any{}
rowValues = append(rowValues, l.Timestamp)
rowValues = append(rowValues, l.Timestamp)
rowValues = append(rowValues, l.ID)
rowValues = append(rowValues, l.TraceID)
rowValues = append(rowValues, l.SpanID)
rowValues = append(rowValues, l.TraceFlags)
rowValues = append(rowValues, l.SeverityText)
rowValues = append(rowValues, l.SeverityNumber)
rowValues = append(rowValues, l.Body)
rowValues = append(rowValues, maps.Keys(l.Resources_string))
rowValues = append(rowValues, maps.Values(l.Resources_string))
rowValues = append(rowValues, maps.Keys(l.Attributes_string))
rowValues = append(rowValues, maps.Values(l.Attributes_string))
rowValues = append(rowValues, maps.Keys(l.Attributes_int64))
rowValues = append(rowValues, maps.Values(l.Attributes_int64))
rowValues = append(rowValues, maps.Keys(l.Attributes_float64))
rowValues = append(rowValues, maps.Values(l.Attributes_float64))
rowValues = append(rowValues, maps.Keys(l.Attributes_bool))
rowValues = append(rowValues, maps.Values(l.Attributes_bool))
values = append(values, rowValues)
}
rows := mockhouse.NewRows(cols, values)
mockClickhouse.ExpectQuery(
"SELECT .*? from signoz_logs.distributed_logs.*",
).WillReturnRows(rows)
}
func makeTestSignozLog(
body string,
attributes map[string]interface{},
) model.SignozLog {
testLog := model.SignozLog{
Timestamp: uint64(time.Now().UnixNano()),
Body: body,
Attributes_bool: map[string]bool{},
Attributes_string: map[string]string{},
Attributes_int64: map[string]int64{},
Attributes_float64: map[string]float64{},
Resources_string: map[string]string{},
SeverityText: entry.Info.String(),
SeverityNumber: uint8(entry.Info),
SpanID: uuid.New().String(),
TraceID: uuid.New().String(),
}
for k, v := range attributes {
switch v.(type) {
case bool:
testLog.Attributes_bool[k] = v.(bool)
case string:
testLog.Attributes_string[k] = v.(string)
case int:
testLog.Attributes_int64[k] = int64(v.(int))
case float64:
testLog.Attributes_float64[k] = v.(float64)
default:
panic(fmt.Sprintf("found attribute value of unsupported type %T in test log", v))
}
}
return testLog
}