diff --git a/pkg/query-service/app/integrations/Readme.md b/pkg/query-service/app/integrations/Readme.md new file mode 100644 index 0000000000..7ffe1a1a08 --- /dev/null +++ b/pkg/query-service/app/integrations/Readme.md @@ -0,0 +1 @@ +# SigNoz integrations diff --git a/pkg/query-service/app/integrations/manager.go b/pkg/query-service/app/integrations/manager.go new file mode 100644 index 0000000000..3caf352172 --- /dev/null +++ b/pkg/query-service/app/integrations/manager.go @@ -0,0 +1,208 @@ +package integrations + +import ( + "context" + "fmt" + "slices" + "time" + + "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" + "go.signoz.io/signoz/pkg/query-service/model" +) + +type IntegrationAuthor struct { + Name string + Email string + HomePage string +} +type IntegrationSummary struct { + Id string + Title string + Description string // A short description + + Author IntegrationAuthor +} + +type IntegrationAssets struct { + // Each integration is expected to specify all log transformations + // in a single pipeline with a source based filter + LogPipeline *logparsingpipeline.PostablePipeline + + // TBD: Dashboards, alerts, saved views, facets (indexed attribs)... +} + +type IntegrationDetails struct { + IntegrationSummary + IntegrationAssets +} + +type IntegrationsListItem struct { + IntegrationSummary + IsInstalled bool +} + +type InstalledIntegration struct { + IntegrationId string `db:"integration_id"` + Config InstalledIntegrationConfig `db:"config_json"` + InstalledAt time.Time `db:"installed_at"` +} +type InstalledIntegrationConfig map[string]interface{} + +type Integration struct { + IntegrationDetails + Installation *InstalledIntegration +} + +type Manager struct { + availableIntegrationsRepo AvailableIntegrationsRepo + installedIntegrationsRepo InstalledIntegrationsRepo +} + +type IntegrationsFilter struct { + IsInstalled *bool +} + +func (m *Manager) ListIntegrations( + ctx context.Context, + filter *IntegrationsFilter, + // Expected to have pagination over time. +) ([]IntegrationsListItem, *model.ApiError) { + available, apiErr := m.availableIntegrationsRepo.list(ctx) + if apiErr != nil { + return nil, model.WrapApiError( + apiErr, "could not fetch available integrations", + ) + } + + installed, apiErr := m.installedIntegrationsRepo.list(ctx) + if apiErr != nil { + return nil, model.WrapApiError( + apiErr, "could not fetch installed integrations", + ) + } + installedIds := []string{} + for _, ii := range installed { + installedIds = append(installedIds, ii.IntegrationId) + } + + result := []IntegrationsListItem{} + for _, ai := range available { + result = append(result, IntegrationsListItem{ + IntegrationSummary: ai.IntegrationSummary, + IsInstalled: slices.Contains(installedIds, ai.Id), + }) + } + + if filter != nil { + if filter.IsInstalled != nil { + filteredResult := []IntegrationsListItem{} + for _, r := range result { + if r.IsInstalled == *filter.IsInstalled { + filteredResult = append(filteredResult, r) + } + } + result = filteredResult + } + } + + return result, nil +} + +func (m *Manager) GetIntegration( + ctx context.Context, + integrationId string, +) (*Integration, *model.ApiError) { + integrationDetails, apiErr := m.getIntegrationDetails( + ctx, integrationId, + ) + if apiErr != nil { + return nil, apiErr + } + + installation, apiErr := m.getInstalledIntegration( + ctx, integrationId, + ) + if apiErr != nil { + return nil, apiErr + } + + return &Integration{ + IntegrationDetails: *integrationDetails, + Installation: installation, + }, nil +} + +func (m *Manager) InstallIntegration( + ctx context.Context, + integrationId string, + config InstalledIntegrationConfig, +) (*IntegrationsListItem, *model.ApiError) { + integrationDetails, apiErr := m.getIntegrationDetails(ctx, integrationId) + if apiErr != nil { + return nil, apiErr + } + + _, apiErr = m.installedIntegrationsRepo.upsert( + ctx, integrationId, config, + ) + if apiErr != nil { + return nil, model.WrapApiError( + apiErr, "could not insert installed integration", + ) + } + + return &IntegrationsListItem{ + IntegrationSummary: integrationDetails.IntegrationSummary, + IsInstalled: true, + }, nil +} + +func (m *Manager) UninstallIntegration( + ctx context.Context, + integrationId string, +) *model.ApiError { + return m.installedIntegrationsRepo.delete(ctx, integrationId) +} + +// Helpers. +func (m *Manager) getIntegrationDetails( + ctx context.Context, + integrationId string, +) (*IntegrationDetails, *model.ApiError) { + ais, apiErr := m.availableIntegrationsRepo.get( + ctx, []string{integrationId}, + ) + if apiErr != nil { + return nil, model.WrapApiError(apiErr, fmt.Sprintf( + "could not fetch integration: %s", integrationId, + )) + } + + integrationDetails, wasFound := ais[integrationId] + if !wasFound { + return nil, model.NotFoundError(fmt.Errorf( + "could not find integration: %s", integrationId, + )) + } + return &integrationDetails, nil +} + +func (m *Manager) getInstalledIntegration( + ctx context.Context, + integrationId string, +) (*InstalledIntegration, *model.ApiError) { + iis, apiErr := m.installedIntegrationsRepo.get( + ctx, []string{integrationId}, + ) + if apiErr != nil { + return nil, model.WrapApiError(apiErr, fmt.Sprintf( + "could not fetch installed integration: %s", integrationId, + )) + } + + installation, wasFound := iis[integrationId] + if !wasFound { + return nil, nil + } + return &installation, nil +} diff --git a/pkg/query-service/app/integrations/manager_test.go b/pkg/query-service/app/integrations/manager_test.go new file mode 100644 index 0000000000..08dd50b255 --- /dev/null +++ b/pkg/query-service/app/integrations/manager_test.go @@ -0,0 +1,78 @@ +package integrations + +import ( + "context" + "testing" + + _ "github.com/mattn/go-sqlite3" + "github.com/stretchr/testify/require" +) + +func TestIntegrationLifecycle(t *testing.T) { + require := require.New(t) + + mgr := NewTestIntegrationsManager(t) + ctx := context.Background() + + ii := true + installedIntegrationsFilter := &IntegrationsFilter{ + IsInstalled: &ii, + } + + installedIntegrations, apiErr := mgr.ListIntegrations( + ctx, installedIntegrationsFilter, + ) + require.Nil(apiErr) + require.Equal([]IntegrationsListItem{}, installedIntegrations) + + availableIntegrations, apiErr := mgr.ListIntegrations(ctx, nil) + require.Nil(apiErr) + require.Equal(2, len(availableIntegrations)) + require.False(availableIntegrations[0].IsInstalled) + require.False(availableIntegrations[1].IsInstalled) + + testIntegrationConfig := map[string]interface{}{} + installed, apiErr := mgr.InstallIntegration( + ctx, availableIntegrations[1].Id, testIntegrationConfig, + ) + require.Nil(apiErr) + require.Equal(installed.Id, availableIntegrations[1].Id) + + integration, apiErr := mgr.GetIntegration(ctx, availableIntegrations[1].Id) + require.Nil(apiErr) + require.Equal(integration.Id, availableIntegrations[1].Id) + require.NotNil(integration.Installation) + + installedIntegrations, apiErr = mgr.ListIntegrations( + ctx, installedIntegrationsFilter, + ) + require.Nil(apiErr) + require.Equal(1, len(installedIntegrations)) + require.Equal(availableIntegrations[1].Id, installedIntegrations[0].Id) + + availableIntegrations, apiErr = mgr.ListIntegrations(ctx, nil) + require.Nil(apiErr) + require.Equal(2, len(availableIntegrations)) + require.False(availableIntegrations[0].IsInstalled) + require.True(availableIntegrations[1].IsInstalled) + + apiErr = mgr.UninstallIntegration(ctx, installed.Id) + require.Nil(apiErr) + + integration, apiErr = mgr.GetIntegration(ctx, availableIntegrations[1].Id) + require.Nil(apiErr) + require.Equal(integration.Id, availableIntegrations[1].Id) + require.Nil(integration.Installation) + + installedIntegrations, apiErr = mgr.ListIntegrations( + ctx, installedIntegrationsFilter, + ) + require.Nil(apiErr) + require.Equal(0, len(installedIntegrations)) + + availableIntegrations, apiErr = mgr.ListIntegrations(ctx, nil) + require.Nil(apiErr) + require.Equal(2, len(availableIntegrations)) + require.False(availableIntegrations[0].IsInstalled) + require.False(availableIntegrations[1].IsInstalled) +} diff --git a/pkg/query-service/app/integrations/repo.go b/pkg/query-service/app/integrations/repo.go new file mode 100644 index 0000000000..9ed46cd247 --- /dev/null +++ b/pkg/query-service/app/integrations/repo.go @@ -0,0 +1,58 @@ +package integrations + +import ( + "context" + "database/sql/driver" + "encoding/json" + + "github.com/pkg/errors" + "go.signoz.io/signoz/pkg/query-service/model" +) + +// For serializing from db +func (c *InstalledIntegrationConfig) Scan(src interface{}) error { + if data, ok := src.([]byte); ok { + return json.Unmarshal(data, &c) + } + return nil +} + +// For serializing to db +func (c *InstalledIntegrationConfig) Value() (driver.Value, error) { + filterSetJson, err := json.Marshal(c) + if err != nil { + return nil, errors.Wrap(err, "could not serialize integration config to JSON") + } + return filterSetJson, nil +} + +type InstalledIntegrationsRepo interface { + list(context.Context) ([]InstalledIntegration, *model.ApiError) + + get( + ctx context.Context, integrationIds []string, + ) (map[string]InstalledIntegration, *model.ApiError) + + upsert( + ctx context.Context, + integrationId string, + config InstalledIntegrationConfig, + ) (*InstalledIntegration, *model.ApiError) + + delete(ctx context.Context, integrationId string) *model.ApiError +} + +type AvailableIntegrationsRepo interface { + list(context.Context) ([]IntegrationDetails, *model.ApiError) + + get( + ctx context.Context, integrationIds []string, + ) (map[string]IntegrationDetails, *model.ApiError) + + // AvailableIntegrationsRepo implementations are expected to cache + // details of installed integrations for quick retrieval. + // + // For v0 only bundled integrations are available, later versions + // are expected to add methods in this interface for pinning installed + // integration details in local cache. +} diff --git a/pkg/query-service/app/integrations/sqlite_repo.go b/pkg/query-service/app/integrations/sqlite_repo.go new file mode 100644 index 0000000000..94e9c4d51d --- /dev/null +++ b/pkg/query-service/app/integrations/sqlite_repo.go @@ -0,0 +1,168 @@ +package integrations + +import ( + "context" + "fmt" + "strings" + + "github.com/jmoiron/sqlx" + "go.signoz.io/signoz/pkg/query-service/model" +) + +func InitSqliteDBIfNeeded(db *sqlx.DB) error { + if db == nil { + return fmt.Errorf("db is required.") + } + + createTablesStatements := ` + CREATE TABLE IF NOT EXISTS integrations_installed( + integration_id TEXT PRIMARY KEY, + config_json TEXT, + installed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + ` + _, err := db.Exec(createTablesStatements) + if err != nil { + return fmt.Errorf( + "could not ensure integrations schema in sqlite DB: %w", err, + ) + } + + return nil +} + +type InstalledIntegrationsSqliteRepo struct { + db *sqlx.DB +} + +func NewInstalledIntegrationsSqliteRepo(db *sqlx.DB) ( + *InstalledIntegrationsSqliteRepo, error, +) { + err := InitSqliteDBIfNeeded(db) + if err != nil { + return nil, fmt.Errorf( + "couldn't ensure sqlite schema for installed integrations: %w", err, + ) + } + + return &InstalledIntegrationsSqliteRepo{ + db: db, + }, nil +} + +func (r *InstalledIntegrationsSqliteRepo) list( + ctx context.Context, +) ([]InstalledIntegration, *model.ApiError) { + integrations := []InstalledIntegration{} + + err := r.db.SelectContext( + ctx, &integrations, ` + select + integration_id, + config_json, + installed_at + from integrations_installed + `, + ) + if err != nil { + return nil, model.InternalError(fmt.Errorf( + "could not query installed integrations: %w", err, + )) + } + return integrations, nil +} + +func (r *InstalledIntegrationsSqliteRepo) get( + ctx context.Context, integrationIds []string, +) (map[string]InstalledIntegration, *model.ApiError) { + integrations := []InstalledIntegration{} + + idPlaceholders := []string{} + idValues := []interface{}{} + for _, id := range integrationIds { + idPlaceholders = append(idPlaceholders, "?") + idValues = append(idValues, id) + } + + err := r.db.SelectContext( + ctx, &integrations, fmt.Sprintf(` + select + integration_id, + config_json, + installed_at + from integrations_installed + where integration_id in (%s)`, + strings.Join(idPlaceholders, ", "), + ), + idValues..., + ) + if err != nil { + return nil, model.InternalError(fmt.Errorf( + "could not query installed integrations: %w", err, + )) + } + + result := map[string]InstalledIntegration{} + for _, ii := range integrations { + result[ii.IntegrationId] = ii + } + + return result, nil +} + +func (r *InstalledIntegrationsSqliteRepo) upsert( + ctx context.Context, + integrationId string, + config InstalledIntegrationConfig, +) (*InstalledIntegration, *model.ApiError) { + serializedConfig, err := config.Value() + if err != nil { + return nil, model.BadRequest(fmt.Errorf( + "could not serialize integration config: %w", err, + )) + } + + _, dbErr := r.db.ExecContext( + ctx, ` + INSERT INTO integrations_installed ( + integration_id, + config_json + ) values ($1, $2) + on conflict(integration_id) do update + set config_json=excluded.config_json + `, integrationId, serializedConfig, + ) + if dbErr != nil { + return nil, model.InternalError(fmt.Errorf( + "could not insert record for integration installation: %w", dbErr, + )) + } + + res, apiErr := r.get(ctx, []string{integrationId}) + if apiErr != nil || len(res) < 1 { + return nil, model.WrapApiError( + apiErr, "could not fetch installed integration", + ) + } + + installed := res[integrationId] + + return &installed, nil +} + +func (r *InstalledIntegrationsSqliteRepo) delete( + ctx context.Context, integrationId string, +) *model.ApiError { + _, dbErr := r.db.ExecContext(ctx, ` + DELETE FROM integrations_installed where integration_id = ? + `, integrationId) + + if dbErr != nil { + return model.InternalError(fmt.Errorf( + "could not delete installed integration record for %s: %w", + integrationId, dbErr, + )) + } + + return nil +} diff --git a/pkg/query-service/app/integrations/test_utils.go b/pkg/query-service/app/integrations/test_utils.go new file mode 100644 index 0000000000..6dcb9ec355 --- /dev/null +++ b/pkg/query-service/app/integrations/test_utils.go @@ -0,0 +1,161 @@ +package integrations + +import ( + "context" + "os" + "slices" + "testing" + + "github.com/jmoiron/sqlx" + "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func NewTestSqliteDB(t *testing.T) ( + db *sqlx.DB, dbFilePath string, +) { + testDBFile, err := os.CreateTemp("", "test-signoz-db-*") + if err != nil { + t.Fatalf("could not create temp file for test db: %v", err) + } + testDBFilePath := testDBFile.Name() + t.Cleanup(func() { os.Remove(testDBFilePath) }) + testDBFile.Close() + + testDB, err := sqlx.Open("sqlite3", testDBFilePath) + if err != nil { + t.Fatalf("could not open test db sqlite file: %v", err) + } + + return testDB, testDBFilePath +} + +func NewTestIntegrationsManager(t *testing.T) *Manager { + testDB, _ := NewTestSqliteDB(t) + + installedIntegrationsRepo, err := NewInstalledIntegrationsSqliteRepo(testDB) + if err != nil { + t.Fatalf("could not init sqlite DB for installed integrations: %v", err) + } + + return &Manager{ + availableIntegrationsRepo: &TestAvailableIntegrationsRepo{}, + installedIntegrationsRepo: installedIntegrationsRepo, + } +} + +type TestAvailableIntegrationsRepo struct{} + +func (t *TestAvailableIntegrationsRepo) list( + ctx context.Context, +) ([]IntegrationDetails, *model.ApiError) { + return []IntegrationDetails{ + { + IntegrationSummary: IntegrationSummary{ + Id: "test-integration-1", + Title: "Test Integration 1", + Description: "A test integration", + Author: IntegrationAuthor{ + Name: "signoz", + Email: "integrations@signoz.io", + HomePage: "https://signoz.io", + }, + }, + IntegrationAssets: IntegrationAssets{ + LogPipeline: &logparsingpipeline.PostablePipeline{ + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + Filter: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "GET", + }, + }, + }, + Config: []logparsingpipeline.PipelineOperator{ + { + OrderId: 1, + ID: "add", + Type: "add", + Field: "attributes.test", + Value: "val", + Enabled: true, + Name: "test add", + }, + }, + }, + }, + }, { + IntegrationSummary: IntegrationSummary{ + Id: "test-integration-2", + Title: "Test Integration 2", + Description: "Another test integration", + Author: IntegrationAuthor{ + Name: "signoz", + Email: "integrations@signoz.io", + HomePage: "https://signoz.io", + }, + }, + IntegrationAssets: IntegrationAssets{ + LogPipeline: &logparsingpipeline.PostablePipeline{ + Name: "pipeline2", + Alias: "pipeline2", + Enabled: true, + Filter: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "GET", + }, + }, + }, + Config: []logparsingpipeline.PipelineOperator{ + { + OrderId: 1, + ID: "add", + Type: "add", + Field: "attributes.test", + Value: "val", + Enabled: true, + Name: "test add", + }, + }, + }, + }, + }, + }, nil +} + +func (t *TestAvailableIntegrationsRepo) get( + ctx context.Context, ids []string, +) (map[string]IntegrationDetails, *model.ApiError) { + availableIntegrations, apiErr := t.list(ctx) + if apiErr != nil { + return nil, apiErr + } + + result := map[string]IntegrationDetails{} + + for _, ai := range availableIntegrations { + if slices.Contains(ids, ai.Id) { + result[ai.Id] = ai + } + } + + return result, nil +}