feat: QS package for integrations (#4578)

* chore: bring in latest state of QS api work for integrations

* chore: integrations v0 qs API: refactor installed integration struct

* chore: finish up with integration lifecycle tests

* chore: some cleanup

* chore: some more cleanup

* chore: some more cleanup

* chore: some more cleanup

* chore: some more cleanup

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
Raj Kamal Singh 2024-02-28 09:54:50 +05:30 committed by GitHub
parent 8f9d643923
commit ddaa464d97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 674 additions and 0 deletions

View File

@ -0,0 +1 @@
# SigNoz integrations

View File

@ -0,0 +1,208 @@
package integrations
import (
"context"
"fmt"
"slices"
"time"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/model"
)
type IntegrationAuthor struct {
Name string
Email string
HomePage string
}
type IntegrationSummary struct {
Id string
Title string
Description string // A short description
Author IntegrationAuthor
}
type IntegrationAssets struct {
// Each integration is expected to specify all log transformations
// in a single pipeline with a source based filter
LogPipeline *logparsingpipeline.PostablePipeline
// TBD: Dashboards, alerts, saved views, facets (indexed attribs)...
}
type IntegrationDetails struct {
IntegrationSummary
IntegrationAssets
}
type IntegrationsListItem struct {
IntegrationSummary
IsInstalled bool
}
type InstalledIntegration struct {
IntegrationId string `db:"integration_id"`
Config InstalledIntegrationConfig `db:"config_json"`
InstalledAt time.Time `db:"installed_at"`
}
type InstalledIntegrationConfig map[string]interface{}
type Integration struct {
IntegrationDetails
Installation *InstalledIntegration
}
type Manager struct {
availableIntegrationsRepo AvailableIntegrationsRepo
installedIntegrationsRepo InstalledIntegrationsRepo
}
type IntegrationsFilter struct {
IsInstalled *bool
}
func (m *Manager) ListIntegrations(
ctx context.Context,
filter *IntegrationsFilter,
// Expected to have pagination over time.
) ([]IntegrationsListItem, *model.ApiError) {
available, apiErr := m.availableIntegrationsRepo.list(ctx)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "could not fetch available integrations",
)
}
installed, apiErr := m.installedIntegrationsRepo.list(ctx)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "could not fetch installed integrations",
)
}
installedIds := []string{}
for _, ii := range installed {
installedIds = append(installedIds, ii.IntegrationId)
}
result := []IntegrationsListItem{}
for _, ai := range available {
result = append(result, IntegrationsListItem{
IntegrationSummary: ai.IntegrationSummary,
IsInstalled: slices.Contains(installedIds, ai.Id),
})
}
if filter != nil {
if filter.IsInstalled != nil {
filteredResult := []IntegrationsListItem{}
for _, r := range result {
if r.IsInstalled == *filter.IsInstalled {
filteredResult = append(filteredResult, r)
}
}
result = filteredResult
}
}
return result, nil
}
func (m *Manager) GetIntegration(
ctx context.Context,
integrationId string,
) (*Integration, *model.ApiError) {
integrationDetails, apiErr := m.getIntegrationDetails(
ctx, integrationId,
)
if apiErr != nil {
return nil, apiErr
}
installation, apiErr := m.getInstalledIntegration(
ctx, integrationId,
)
if apiErr != nil {
return nil, apiErr
}
return &Integration{
IntegrationDetails: *integrationDetails,
Installation: installation,
}, nil
}
func (m *Manager) InstallIntegration(
ctx context.Context,
integrationId string,
config InstalledIntegrationConfig,
) (*IntegrationsListItem, *model.ApiError) {
integrationDetails, apiErr := m.getIntegrationDetails(ctx, integrationId)
if apiErr != nil {
return nil, apiErr
}
_, apiErr = m.installedIntegrationsRepo.upsert(
ctx, integrationId, config,
)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "could not insert installed integration",
)
}
return &IntegrationsListItem{
IntegrationSummary: integrationDetails.IntegrationSummary,
IsInstalled: true,
}, nil
}
func (m *Manager) UninstallIntegration(
ctx context.Context,
integrationId string,
) *model.ApiError {
return m.installedIntegrationsRepo.delete(ctx, integrationId)
}
// Helpers.
func (m *Manager) getIntegrationDetails(
ctx context.Context,
integrationId string,
) (*IntegrationDetails, *model.ApiError) {
ais, apiErr := m.availableIntegrationsRepo.get(
ctx, []string{integrationId},
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, fmt.Sprintf(
"could not fetch integration: %s", integrationId,
))
}
integrationDetails, wasFound := ais[integrationId]
if !wasFound {
return nil, model.NotFoundError(fmt.Errorf(
"could not find integration: %s", integrationId,
))
}
return &integrationDetails, nil
}
func (m *Manager) getInstalledIntegration(
ctx context.Context,
integrationId string,
) (*InstalledIntegration, *model.ApiError) {
iis, apiErr := m.installedIntegrationsRepo.get(
ctx, []string{integrationId},
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, fmt.Sprintf(
"could not fetch installed integration: %s", integrationId,
))
}
installation, wasFound := iis[integrationId]
if !wasFound {
return nil, nil
}
return &installation, nil
}

View File

@ -0,0 +1,78 @@
package integrations
import (
"context"
"testing"
_ "github.com/mattn/go-sqlite3"
"github.com/stretchr/testify/require"
)
func TestIntegrationLifecycle(t *testing.T) {
require := require.New(t)
mgr := NewTestIntegrationsManager(t)
ctx := context.Background()
ii := true
installedIntegrationsFilter := &IntegrationsFilter{
IsInstalled: &ii,
}
installedIntegrations, apiErr := mgr.ListIntegrations(
ctx, installedIntegrationsFilter,
)
require.Nil(apiErr)
require.Equal([]IntegrationsListItem{}, installedIntegrations)
availableIntegrations, apiErr := mgr.ListIntegrations(ctx, nil)
require.Nil(apiErr)
require.Equal(2, len(availableIntegrations))
require.False(availableIntegrations[0].IsInstalled)
require.False(availableIntegrations[1].IsInstalled)
testIntegrationConfig := map[string]interface{}{}
installed, apiErr := mgr.InstallIntegration(
ctx, availableIntegrations[1].Id, testIntegrationConfig,
)
require.Nil(apiErr)
require.Equal(installed.Id, availableIntegrations[1].Id)
integration, apiErr := mgr.GetIntegration(ctx, availableIntegrations[1].Id)
require.Nil(apiErr)
require.Equal(integration.Id, availableIntegrations[1].Id)
require.NotNil(integration.Installation)
installedIntegrations, apiErr = mgr.ListIntegrations(
ctx, installedIntegrationsFilter,
)
require.Nil(apiErr)
require.Equal(1, len(installedIntegrations))
require.Equal(availableIntegrations[1].Id, installedIntegrations[0].Id)
availableIntegrations, apiErr = mgr.ListIntegrations(ctx, nil)
require.Nil(apiErr)
require.Equal(2, len(availableIntegrations))
require.False(availableIntegrations[0].IsInstalled)
require.True(availableIntegrations[1].IsInstalled)
apiErr = mgr.UninstallIntegration(ctx, installed.Id)
require.Nil(apiErr)
integration, apiErr = mgr.GetIntegration(ctx, availableIntegrations[1].Id)
require.Nil(apiErr)
require.Equal(integration.Id, availableIntegrations[1].Id)
require.Nil(integration.Installation)
installedIntegrations, apiErr = mgr.ListIntegrations(
ctx, installedIntegrationsFilter,
)
require.Nil(apiErr)
require.Equal(0, len(installedIntegrations))
availableIntegrations, apiErr = mgr.ListIntegrations(ctx, nil)
require.Nil(apiErr)
require.Equal(2, len(availableIntegrations))
require.False(availableIntegrations[0].IsInstalled)
require.False(availableIntegrations[1].IsInstalled)
}

View File

@ -0,0 +1,58 @@
package integrations
import (
"context"
"database/sql/driver"
"encoding/json"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/model"
)
// For serializing from db
func (c *InstalledIntegrationConfig) Scan(src interface{}) error {
if data, ok := src.([]byte); ok {
return json.Unmarshal(data, &c)
}
return nil
}
// For serializing to db
func (c *InstalledIntegrationConfig) Value() (driver.Value, error) {
filterSetJson, err := json.Marshal(c)
if err != nil {
return nil, errors.Wrap(err, "could not serialize integration config to JSON")
}
return filterSetJson, nil
}
type InstalledIntegrationsRepo interface {
list(context.Context) ([]InstalledIntegration, *model.ApiError)
get(
ctx context.Context, integrationIds []string,
) (map[string]InstalledIntegration, *model.ApiError)
upsert(
ctx context.Context,
integrationId string,
config InstalledIntegrationConfig,
) (*InstalledIntegration, *model.ApiError)
delete(ctx context.Context, integrationId string) *model.ApiError
}
type AvailableIntegrationsRepo interface {
list(context.Context) ([]IntegrationDetails, *model.ApiError)
get(
ctx context.Context, integrationIds []string,
) (map[string]IntegrationDetails, *model.ApiError)
// AvailableIntegrationsRepo implementations are expected to cache
// details of installed integrations for quick retrieval.
//
// For v0 only bundled integrations are available, later versions
// are expected to add methods in this interface for pinning installed
// integration details in local cache.
}

View File

@ -0,0 +1,168 @@
package integrations
import (
"context"
"fmt"
"strings"
"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/model"
)
func InitSqliteDBIfNeeded(db *sqlx.DB) error {
if db == nil {
return fmt.Errorf("db is required.")
}
createTablesStatements := `
CREATE TABLE IF NOT EXISTS integrations_installed(
integration_id TEXT PRIMARY KEY,
config_json TEXT,
installed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`
_, err := db.Exec(createTablesStatements)
if err != nil {
return fmt.Errorf(
"could not ensure integrations schema in sqlite DB: %w", err,
)
}
return nil
}
type InstalledIntegrationsSqliteRepo struct {
db *sqlx.DB
}
func NewInstalledIntegrationsSqliteRepo(db *sqlx.DB) (
*InstalledIntegrationsSqliteRepo, error,
) {
err := InitSqliteDBIfNeeded(db)
if err != nil {
return nil, fmt.Errorf(
"couldn't ensure sqlite schema for installed integrations: %w", err,
)
}
return &InstalledIntegrationsSqliteRepo{
db: db,
}, nil
}
func (r *InstalledIntegrationsSqliteRepo) list(
ctx context.Context,
) ([]InstalledIntegration, *model.ApiError) {
integrations := []InstalledIntegration{}
err := r.db.SelectContext(
ctx, &integrations, `
select
integration_id,
config_json,
installed_at
from integrations_installed
`,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not query installed integrations: %w", err,
))
}
return integrations, nil
}
func (r *InstalledIntegrationsSqliteRepo) get(
ctx context.Context, integrationIds []string,
) (map[string]InstalledIntegration, *model.ApiError) {
integrations := []InstalledIntegration{}
idPlaceholders := []string{}
idValues := []interface{}{}
for _, id := range integrationIds {
idPlaceholders = append(idPlaceholders, "?")
idValues = append(idValues, id)
}
err := r.db.SelectContext(
ctx, &integrations, fmt.Sprintf(`
select
integration_id,
config_json,
installed_at
from integrations_installed
where integration_id in (%s)`,
strings.Join(idPlaceholders, ", "),
),
idValues...,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not query installed integrations: %w", err,
))
}
result := map[string]InstalledIntegration{}
for _, ii := range integrations {
result[ii.IntegrationId] = ii
}
return result, nil
}
func (r *InstalledIntegrationsSqliteRepo) upsert(
ctx context.Context,
integrationId string,
config InstalledIntegrationConfig,
) (*InstalledIntegration, *model.ApiError) {
serializedConfig, err := config.Value()
if err != nil {
return nil, model.BadRequest(fmt.Errorf(
"could not serialize integration config: %w", err,
))
}
_, dbErr := r.db.ExecContext(
ctx, `
INSERT INTO integrations_installed (
integration_id,
config_json
) values ($1, $2)
on conflict(integration_id) do update
set config_json=excluded.config_json
`, integrationId, serializedConfig,
)
if dbErr != nil {
return nil, model.InternalError(fmt.Errorf(
"could not insert record for integration installation: %w", dbErr,
))
}
res, apiErr := r.get(ctx, []string{integrationId})
if apiErr != nil || len(res) < 1 {
return nil, model.WrapApiError(
apiErr, "could not fetch installed integration",
)
}
installed := res[integrationId]
return &installed, nil
}
func (r *InstalledIntegrationsSqliteRepo) delete(
ctx context.Context, integrationId string,
) *model.ApiError {
_, dbErr := r.db.ExecContext(ctx, `
DELETE FROM integrations_installed where integration_id = ?
`, integrationId)
if dbErr != nil {
return model.InternalError(fmt.Errorf(
"could not delete installed integration record for %s: %w",
integrationId, dbErr,
))
}
return nil
}

View File

@ -0,0 +1,161 @@
package integrations
import (
"context"
"os"
"slices"
"testing"
"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func NewTestSqliteDB(t *testing.T) (
db *sqlx.DB, dbFilePath string,
) {
testDBFile, err := os.CreateTemp("", "test-signoz-db-*")
if err != nil {
t.Fatalf("could not create temp file for test db: %v", err)
}
testDBFilePath := testDBFile.Name()
t.Cleanup(func() { os.Remove(testDBFilePath) })
testDBFile.Close()
testDB, err := sqlx.Open("sqlite3", testDBFilePath)
if err != nil {
t.Fatalf("could not open test db sqlite file: %v", err)
}
return testDB, testDBFilePath
}
func NewTestIntegrationsManager(t *testing.T) *Manager {
testDB, _ := NewTestSqliteDB(t)
installedIntegrationsRepo, err := NewInstalledIntegrationsSqliteRepo(testDB)
if err != nil {
t.Fatalf("could not init sqlite DB for installed integrations: %v", err)
}
return &Manager{
availableIntegrationsRepo: &TestAvailableIntegrationsRepo{},
installedIntegrationsRepo: installedIntegrationsRepo,
}
}
type TestAvailableIntegrationsRepo struct{}
func (t *TestAvailableIntegrationsRepo) list(
ctx context.Context,
) ([]IntegrationDetails, *model.ApiError) {
return []IntegrationDetails{
{
IntegrationSummary: IntegrationSummary{
Id: "test-integration-1",
Title: "Test Integration 1",
Description: "A test integration",
Author: IntegrationAuthor{
Name: "signoz",
Email: "integrations@signoz.io",
HomePage: "https://signoz.io",
},
},
IntegrationAssets: IntegrationAssets{
LogPipeline: &logparsingpipeline.PostablePipeline{
Name: "pipeline1",
Alias: "pipeline1",
Enabled: true,
Filter: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "method",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
},
},
},
Config: []logparsingpipeline.PipelineOperator{
{
OrderId: 1,
ID: "add",
Type: "add",
Field: "attributes.test",
Value: "val",
Enabled: true,
Name: "test add",
},
},
},
},
}, {
IntegrationSummary: IntegrationSummary{
Id: "test-integration-2",
Title: "Test Integration 2",
Description: "Another test integration",
Author: IntegrationAuthor{
Name: "signoz",
Email: "integrations@signoz.io",
HomePage: "https://signoz.io",
},
},
IntegrationAssets: IntegrationAssets{
LogPipeline: &logparsingpipeline.PostablePipeline{
Name: "pipeline2",
Alias: "pipeline2",
Enabled: true,
Filter: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "method",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
},
},
},
Config: []logparsingpipeline.PipelineOperator{
{
OrderId: 1,
ID: "add",
Type: "add",
Field: "attributes.test",
Value: "val",
Enabled: true,
Name: "test add",
},
},
},
},
},
}, nil
}
func (t *TestAvailableIntegrationsRepo) get(
ctx context.Context, ids []string,
) (map[string]IntegrationDetails, *model.ApiError) {
availableIntegrations, apiErr := t.list(ctx)
if apiErr != nil {
return nil, apiErr
}
result := map[string]IntegrationDetails{}
for _, ai := range availableIntegrations {
if slices.Contains(ids, ai.Id) {
result[ai.Id] = ai
}
}
return result, nil
}