mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-10 04:49:02 +08:00
Feat: qs integrations http api (#4622)
* chore: add http api test for signoz integrations * chore: add controller for integrations * chore: add http API handlers for integrations API * chore: hook up integrations API in new servers * chore: add remaining fields in Integration DTO --------- Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
parent
3fece44aef
commit
0870030d1c
@ -10,6 +10,7 @@ import (
|
||||
"go.signoz.io/signoz/ee/query-service/license"
|
||||
"go.signoz.io/signoz/ee/query-service/usage"
|
||||
baseapp "go.signoz.io/signoz/pkg/query-service/app"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/integrations"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
||||
"go.signoz.io/signoz/pkg/query-service/cache"
|
||||
baseint "go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
@ -31,6 +32,7 @@ type APIHandlerOptions struct {
|
||||
UsageManager *usage.Manager
|
||||
FeatureFlags baseint.FeatureLookup
|
||||
LicenseManager *license.Manager
|
||||
IntegrationsController *integrations.Controller
|
||||
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
|
||||
Cache cache.Cache
|
||||
// Querier Influx Interval
|
||||
@ -56,6 +58,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
|
||||
AppDao: opts.AppDao,
|
||||
RuleManager: opts.RulesManager,
|
||||
FeatureFlags: opts.FeatureFlags,
|
||||
IntegrationsController: opts.IntegrationsController,
|
||||
LogsParsingPipelineController: opts.LogsParsingPipelineController,
|
||||
Cache: opts.Cache,
|
||||
FluxInterval: opts.FluxInterval,
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
baseapp "go.signoz.io/signoz/pkg/query-service/app"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
|
||||
baseexplorer "go.signoz.io/signoz/pkg/query-service/app/explorer"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/integrations"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/opamp"
|
||||
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
|
||||
@ -176,6 +177,13 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
integrationsController, err := integrations.NewController(localDB)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't create integrations controller: %w", err,
|
||||
)
|
||||
}
|
||||
|
||||
// ingestion pipelines manager
|
||||
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(localDB, "sqlite")
|
||||
if err != nil {
|
||||
@ -233,6 +241,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
UsageManager: usageManager,
|
||||
FeatureFlags: lm,
|
||||
LicenseManager: lm,
|
||||
IntegrationsController: integrationsController,
|
||||
LogsParsingPipelineController: logParsingPipelineController,
|
||||
Cache: c,
|
||||
FluxInterval: fluxInterval,
|
||||
@ -317,6 +326,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler) (*http.Server, e
|
||||
apiHandler.RegisterRoutes(r, am)
|
||||
apiHandler.RegisterMetricsRoutes(r, am)
|
||||
apiHandler.RegisterLogsRoutes(r, am)
|
||||
apiHandler.RegisterIntegrationRoutes(r, am)
|
||||
apiHandler.RegisterQueryRangeV3Routes(r, am)
|
||||
apiHandler.RegisterQueryRangeV4Routes(r, am)
|
||||
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/agentConf"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/explorer"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/integrations"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/logs"
|
||||
logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/metrics"
|
||||
@ -94,6 +95,8 @@ type APIHandler struct {
|
||||
maxOpenConns int
|
||||
dialTimeout time.Duration
|
||||
|
||||
IntegrationsController *integrations.Controller
|
||||
|
||||
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
|
||||
|
||||
// SetupCompleted indicates if SigNoz is ready for general use.
|
||||
@ -125,8 +128,12 @@ type APIHandlerOpts struct {
|
||||
// feature flags querier
|
||||
FeatureFlags interfaces.FeatureLookup
|
||||
|
||||
// Integrations
|
||||
IntegrationsController *integrations.Controller
|
||||
|
||||
// Log parsing pipelines
|
||||
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
|
||||
|
||||
// cache
|
||||
Cache cache.Cache
|
||||
|
||||
@ -174,6 +181,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
alertManager: alertManager,
|
||||
ruleManager: opts.RuleManager,
|
||||
featureFlags: opts.FeatureFlags,
|
||||
IntegrationsController: opts.IntegrationsController,
|
||||
LogsParsingPipelineController: opts.LogsParsingPipelineController,
|
||||
querier: querier,
|
||||
querierV2: querierv2,
|
||||
@ -2392,6 +2400,101 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response
|
||||
w.Write(resp)
|
||||
}
|
||||
|
||||
// Integrations
|
||||
func (ah *APIHandler) RegisterIntegrationRoutes(router *mux.Router, am *AuthMiddleware) {
|
||||
subRouter := router.PathPrefix("/api/v1/integrations").Subrouter()
|
||||
|
||||
subRouter.HandleFunc(
|
||||
"/install", am.ViewAccess(ah.InstallIntegration),
|
||||
).Methods(http.MethodPost)
|
||||
|
||||
subRouter.HandleFunc(
|
||||
"/uninstall", am.ViewAccess(ah.UninstallIntegration),
|
||||
).Methods(http.MethodPost)
|
||||
|
||||
subRouter.HandleFunc(
|
||||
"/{integrationId}", am.ViewAccess(ah.GetIntegration),
|
||||
).Methods(http.MethodGet)
|
||||
|
||||
subRouter.HandleFunc(
|
||||
"", am.ViewAccess(ah.ListIntegrations),
|
||||
).Methods(http.MethodGet)
|
||||
}
|
||||
|
||||
func (ah *APIHandler) ListIntegrations(
|
||||
w http.ResponseWriter, r *http.Request,
|
||||
) {
|
||||
params := map[string]string{}
|
||||
for k, values := range r.URL.Query() {
|
||||
params[k] = values[0]
|
||||
}
|
||||
|
||||
resp, apiErr := ah.IntegrationsController.ListIntegrations(
|
||||
r.Context(), params,
|
||||
)
|
||||
if apiErr != nil {
|
||||
RespondError(w, apiErr, "Failed to fetch integrations")
|
||||
return
|
||||
}
|
||||
ah.Respond(w, resp)
|
||||
}
|
||||
|
||||
func (ah *APIHandler) GetIntegration(
|
||||
w http.ResponseWriter, r *http.Request,
|
||||
) {
|
||||
integrationId := mux.Vars(r)["integrationId"]
|
||||
resp, apiErr := ah.IntegrationsController.GetIntegration(
|
||||
r.Context(), integrationId,
|
||||
)
|
||||
if apiErr != nil {
|
||||
RespondError(w, apiErr, "Failed to fetch integration details")
|
||||
return
|
||||
}
|
||||
ah.Respond(w, resp)
|
||||
}
|
||||
|
||||
func (ah *APIHandler) InstallIntegration(
|
||||
w http.ResponseWriter, r *http.Request,
|
||||
) {
|
||||
req := integrations.InstallIntegrationRequest{}
|
||||
|
||||
err := json.NewDecoder(r.Body).Decode(&req)
|
||||
if err != nil {
|
||||
RespondError(w, model.BadRequest(err), nil)
|
||||
return
|
||||
}
|
||||
|
||||
integration, apiErr := ah.IntegrationsController.Install(
|
||||
r.Context(), &req,
|
||||
)
|
||||
if apiErr != nil {
|
||||
RespondError(w, apiErr, nil)
|
||||
return
|
||||
}
|
||||
|
||||
ah.Respond(w, integration)
|
||||
}
|
||||
|
||||
func (ah *APIHandler) UninstallIntegration(
|
||||
w http.ResponseWriter, r *http.Request,
|
||||
) {
|
||||
req := integrations.UninstallIntegrationRequest{}
|
||||
|
||||
err := json.NewDecoder(r.Body).Decode(&req)
|
||||
if err != nil {
|
||||
RespondError(w, model.BadRequest(err), nil)
|
||||
return
|
||||
}
|
||||
|
||||
apiErr := ah.IntegrationsController.Uninstall(r.Context(), &req)
|
||||
if apiErr != nil {
|
||||
RespondError(w, apiErr, nil)
|
||||
return
|
||||
}
|
||||
|
||||
ah.Respond(w, map[string]interface{}{})
|
||||
}
|
||||
|
||||
// logs
|
||||
func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router, am *AuthMiddleware) {
|
||||
subRouter := router.PathPrefix("/api/v1/logs").Subrouter()
|
||||
|
86
pkg/query-service/app/integrations/controller.go
Normal file
86
pkg/query-service/app/integrations/controller.go
Normal file
@ -0,0 +1,86 @@
|
||||
package integrations
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
)
|
||||
|
||||
type Controller struct {
|
||||
mgr *Manager
|
||||
}
|
||||
|
||||
func NewController(db *sqlx.DB) (
|
||||
*Controller, error,
|
||||
) {
|
||||
mgr, err := NewManager(db)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't create integrations manager: %w", err)
|
||||
}
|
||||
|
||||
return &Controller{
|
||||
mgr: mgr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type IntegrationsListResponse struct {
|
||||
Integrations []IntegrationsListItem `json:"integrations"`
|
||||
|
||||
// Pagination details to come later
|
||||
}
|
||||
|
||||
func (c *Controller) ListIntegrations(
|
||||
ctx context.Context, params map[string]string,
|
||||
) (
|
||||
*IntegrationsListResponse, *model.ApiError,
|
||||
) {
|
||||
var filters *IntegrationsFilter
|
||||
if isInstalledFilter, exists := params["is_installed"]; exists {
|
||||
isInstalled := !(isInstalledFilter == "false")
|
||||
filters = &IntegrationsFilter{
|
||||
IsInstalled: &isInstalled,
|
||||
}
|
||||
}
|
||||
|
||||
integrations, apiErr := c.mgr.ListIntegrations(ctx, filters)
|
||||
if apiErr != nil {
|
||||
return nil, apiErr
|
||||
}
|
||||
|
||||
return &IntegrationsListResponse{
|
||||
Integrations: integrations,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Controller) GetIntegration(
|
||||
ctx context.Context, integrationId string,
|
||||
) (*Integration, *model.ApiError) {
|
||||
return c.mgr.GetIntegration(ctx, integrationId)
|
||||
}
|
||||
|
||||
type InstallIntegrationRequest struct {
|
||||
IntegrationId string `json:"integration_id"`
|
||||
Config map[string]interface{} `json:"config"`
|
||||
}
|
||||
|
||||
func (c *Controller) Install(
|
||||
ctx context.Context, req *InstallIntegrationRequest,
|
||||
) (*IntegrationsListItem, *model.ApiError) {
|
||||
return c.mgr.InstallIntegration(
|
||||
ctx, req.IntegrationId, req.Config,
|
||||
)
|
||||
}
|
||||
|
||||
type UninstallIntegrationRequest struct {
|
||||
IntegrationId string `json:"integration_id"`
|
||||
}
|
||||
|
||||
func (c *Controller) Uninstall(
|
||||
ctx context.Context, req *UninstallIntegrationRequest,
|
||||
) *model.ApiError {
|
||||
return c.mgr.UninstallIntegration(
|
||||
ctx, req.IntegrationId,
|
||||
)
|
||||
}
|
@ -4,53 +4,89 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
)
|
||||
|
||||
type IntegrationAuthor struct {
|
||||
Name string
|
||||
Email string
|
||||
HomePage string
|
||||
Name string `json:"name"`
|
||||
Email string `json:"email"`
|
||||
HomePage string `json:"homepage"`
|
||||
}
|
||||
type IntegrationSummary struct {
|
||||
Id string
|
||||
Title string
|
||||
Description string // A short description
|
||||
Id string `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Description string `json:"description"` // A short description
|
||||
|
||||
Author IntegrationAuthor
|
||||
Author IntegrationAuthor `json:"author"`
|
||||
|
||||
Icon string `json:"icon"`
|
||||
}
|
||||
|
||||
type IntegrationAssets struct {
|
||||
// Each integration is expected to specify all log transformations
|
||||
// in a single pipeline with a source based filter
|
||||
LogPipeline *logparsingpipeline.PostablePipeline
|
||||
Logs LogsAssets `json:"logs"`
|
||||
Dashboards []dashboards.Dashboard `json:"dashboards"`
|
||||
|
||||
// TBD: Dashboards, alerts, saved views, facets (indexed attribs)...
|
||||
// TODO(Raj): Maybe use a struct for alerts
|
||||
Alerts []map[string]interface{} `json:"alerts"`
|
||||
}
|
||||
|
||||
type LogsAssets struct {
|
||||
Pipelines []logparsingpipeline.PostablePipeline `json:"pipelines"`
|
||||
}
|
||||
|
||||
type IntegrationConfigStep struct {
|
||||
Title string `json:"title"`
|
||||
Instructions string `json:"instructions"`
|
||||
}
|
||||
|
||||
type DataCollectedForIntegration struct {
|
||||
Logs []CollectedLogAttribute `json:"logs"`
|
||||
Metrics []CollectedMetric `json:"metrics"`
|
||||
}
|
||||
|
||||
type CollectedLogAttribute struct {
|
||||
Name string `json:"name"`
|
||||
Path string `json:"path"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
type CollectedMetric struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Unit string `json:"unit"`
|
||||
}
|
||||
|
||||
type IntegrationDetails struct {
|
||||
IntegrationSummary
|
||||
IntegrationAssets
|
||||
|
||||
Categories []string `json:"categories"`
|
||||
Overview string `json:"overview"` // markdown
|
||||
Configuration []IntegrationConfigStep `json:"configuration"`
|
||||
DataCollected DataCollectedForIntegration `json:"data_collected"`
|
||||
Assets IntegrationAssets `json:"assets"`
|
||||
}
|
||||
|
||||
type IntegrationsListItem struct {
|
||||
IntegrationSummary
|
||||
IsInstalled bool
|
||||
IsInstalled bool `json:"is_installed"`
|
||||
}
|
||||
|
||||
type InstalledIntegration struct {
|
||||
IntegrationId string `db:"integration_id"`
|
||||
Config InstalledIntegrationConfig `db:"config_json"`
|
||||
InstalledAt time.Time `db:"installed_at"`
|
||||
IntegrationId string `json:"integration_id" db:"integration_id"`
|
||||
Config InstalledIntegrationConfig `json:"config_json" db:"config_json"`
|
||||
InstalledAt time.Time `json:"installed_at" db:"installed_at"`
|
||||
}
|
||||
type InstalledIntegrationConfig map[string]interface{}
|
||||
|
||||
type Integration struct {
|
||||
IntegrationDetails
|
||||
Installation *InstalledIntegration
|
||||
Installation *InstalledIntegration `json:"installation"`
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
@ -58,6 +94,21 @@ type Manager struct {
|
||||
installedIntegrationsRepo InstalledIntegrationsRepo
|
||||
}
|
||||
|
||||
func NewManager(db *sqlx.DB) (*Manager, error) {
|
||||
iiRepo, err := NewInstalledIntegrationsSqliteRepo(db)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"could not init sqlite DB for installed integrations: %w", err,
|
||||
)
|
||||
}
|
||||
|
||||
return &Manager{
|
||||
// TODO(Raj): Hook up a real available integrations provider.
|
||||
availableIntegrationsRepo: &TestAvailableIntegrationsRepo{},
|
||||
installedIntegrationsRepo: iiRepo,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type IntegrationsFilter struct {
|
||||
IsInstalled *bool
|
||||
}
|
||||
@ -169,6 +220,12 @@ func (m *Manager) getIntegrationDetails(
|
||||
ctx context.Context,
|
||||
integrationId string,
|
||||
) (*IntegrationDetails, *model.ApiError) {
|
||||
if len(strings.TrimSpace(integrationId)) < 1 {
|
||||
return nil, model.BadRequest(fmt.Errorf(
|
||||
"integrationId is required",
|
||||
))
|
||||
}
|
||||
|
||||
ais, apiErr := m.availableIntegrationsRepo.get(
|
||||
ctx, []string{integrationId},
|
||||
)
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
@ -61,38 +62,57 @@ func (t *TestAvailableIntegrationsRepo) list(
|
||||
Email: "integrations@signoz.io",
|
||||
HomePage: "https://signoz.io",
|
||||
},
|
||||
Icon: `data:image/svg+xml;utf8,<svg ... > ... </svg>`,
|
||||
},
|
||||
IntegrationAssets: IntegrationAssets{
|
||||
LogPipeline: &logparsingpipeline.PostablePipeline{
|
||||
Name: "pipeline1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
Categories: []string{"testcat1", "testcat2"},
|
||||
Overview: "test integration overview",
|
||||
Configuration: []IntegrationConfigStep{
|
||||
{
|
||||
Title: "Step 1",
|
||||
Instructions: "Set source attrib on your signals",
|
||||
},
|
||||
},
|
||||
DataCollected: DataCollectedForIntegration{
|
||||
Logs: []CollectedLogAttribute{},
|
||||
Metrics: []CollectedMetric{},
|
||||
},
|
||||
Assets: IntegrationAssets{
|
||||
Logs: LogsAssets{
|
||||
Pipelines: []logparsingpipeline.PostablePipeline{
|
||||
{
|
||||
Name: "pipeline1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
},
|
||||
Operator: "=",
|
||||
Value: "GET",
|
||||
},
|
||||
},
|
||||
},
|
||||
Config: []logparsingpipeline.PipelineOperator{
|
||||
{
|
||||
OrderId: 1,
|
||||
ID: "add",
|
||||
Type: "add",
|
||||
Field: "attributes.test",
|
||||
Value: "val",
|
||||
Enabled: true,
|
||||
Name: "test add",
|
||||
},
|
||||
Operator: "=",
|
||||
Value: "GET",
|
||||
},
|
||||
},
|
||||
},
|
||||
Config: []logparsingpipeline.PipelineOperator{
|
||||
{
|
||||
OrderId: 1,
|
||||
ID: "add",
|
||||
Type: "add",
|
||||
Field: "attributes.test",
|
||||
Value: "val",
|
||||
Enabled: true,
|
||||
Name: "test add",
|
||||
},
|
||||
},
|
||||
},
|
||||
Dashboards: []dashboards.Dashboard{},
|
||||
Alerts: []map[string]interface{}{},
|
||||
},
|
||||
}, {
|
||||
IntegrationSummary: IntegrationSummary{
|
||||
@ -104,38 +124,57 @@ func (t *TestAvailableIntegrationsRepo) list(
|
||||
Email: "integrations@signoz.io",
|
||||
HomePage: "https://signoz.io",
|
||||
},
|
||||
Icon: `data:image/svg+xml;utf8,<svg ... > ... </svg>`,
|
||||
},
|
||||
IntegrationAssets: IntegrationAssets{
|
||||
LogPipeline: &logparsingpipeline.PostablePipeline{
|
||||
Name: "pipeline2",
|
||||
Alias: "pipeline2",
|
||||
Enabled: true,
|
||||
Filter: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
Categories: []string{"testcat1", "testcat2"},
|
||||
Overview: "test integration overview",
|
||||
Configuration: []IntegrationConfigStep{
|
||||
{
|
||||
Title: "Step 1",
|
||||
Instructions: "Set source attrib on your signals",
|
||||
},
|
||||
},
|
||||
DataCollected: DataCollectedForIntegration{
|
||||
Logs: []CollectedLogAttribute{},
|
||||
Metrics: []CollectedMetric{},
|
||||
},
|
||||
Assets: IntegrationAssets{
|
||||
Logs: LogsAssets{
|
||||
Pipelines: []logparsingpipeline.PostablePipeline{
|
||||
{
|
||||
Name: "pipeline2",
|
||||
Alias: "pipeline2",
|
||||
Enabled: true,
|
||||
Filter: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
},
|
||||
Operator: "=",
|
||||
Value: "GET",
|
||||
},
|
||||
},
|
||||
},
|
||||
Config: []logparsingpipeline.PipelineOperator{
|
||||
{
|
||||
OrderId: 1,
|
||||
ID: "add",
|
||||
Type: "add",
|
||||
Field: "attributes.test",
|
||||
Value: "val",
|
||||
Enabled: true,
|
||||
Name: "test add",
|
||||
},
|
||||
Operator: "=",
|
||||
Value: "GET",
|
||||
},
|
||||
},
|
||||
},
|
||||
Config: []logparsingpipeline.PipelineOperator{
|
||||
{
|
||||
OrderId: 1,
|
||||
ID: "add",
|
||||
Type: "add",
|
||||
Field: "attributes.test",
|
||||
Value: "val",
|
||||
Enabled: true,
|
||||
Name: "test add",
|
||||
},
|
||||
},
|
||||
},
|
||||
Dashboards: []dashboards.Dashboard{},
|
||||
Alerts: []map[string]interface{}{},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/agentConf"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/integrations"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/opamp"
|
||||
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
|
||||
@ -155,7 +156,14 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// ingestion pipelines manager
|
||||
|
||||
integrationsController, err := integrations.NewController(localDB)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't create integrations controller: %w", err,
|
||||
)
|
||||
}
|
||||
|
||||
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(localDB, "sqlite")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -173,6 +181,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
AppDao: dao.DB(),
|
||||
RuleManager: rm,
|
||||
FeatureFlags: fm,
|
||||
IntegrationsController: integrationsController,
|
||||
LogsParsingPipelineController: logParsingPipelineController,
|
||||
Cache: c,
|
||||
FluxInterval: fluxInterval,
|
||||
@ -266,6 +275,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) {
|
||||
api.RegisterRoutes(r, am)
|
||||
api.RegisterMetricsRoutes(r, am)
|
||||
api.RegisterLogsRoutes(r, am)
|
||||
api.RegisterIntegrationRoutes(r, am)
|
||||
api.RegisterQueryRangeV3Routes(r, am)
|
||||
api.RegisterQueryRangeV4Routes(r, am)
|
||||
|
||||
|
@ -803,13 +803,13 @@ func createTestUser() (*model.User, *model.ApiError) {
|
||||
return nil, apiErr
|
||||
}
|
||||
|
||||
group, apiErr := dao.DB().CreateGroup(ctx, &model.Group{
|
||||
Name: "test",
|
||||
})
|
||||
group, apiErr := dao.DB().GetGroupByName(ctx, constants.AdminGroup)
|
||||
if apiErr != nil {
|
||||
return nil, apiErr
|
||||
}
|
||||
|
||||
auth.InitAuthCache(ctx)
|
||||
|
||||
return dao.DB().CreateUser(
|
||||
ctx,
|
||||
&model.User{
|
||||
@ -843,7 +843,7 @@ func NewAuthenticatedTestRequest(
|
||||
}
|
||||
req = httptest.NewRequest(http.MethodPost, path, &body)
|
||||
} else {
|
||||
req = httptest.NewRequest(http.MethodPost, path, nil)
|
||||
req = httptest.NewRequest(http.MethodGet, path, nil)
|
||||
}
|
||||
|
||||
req.Header.Add("Authorization", "Bearer "+userJwt.AccessJwt)
|
||||
|
224
pkg/query-service/tests/integration/signoz_integrations_test.go
Normal file
224
pkg/query-service/tests/integration/signoz_integrations_test.go
Normal file
@ -0,0 +1,224 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"runtime/debug"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.signoz.io/signoz/pkg/query-service/app"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/integrations"
|
||||
"go.signoz.io/signoz/pkg/query-service/auth"
|
||||
"go.signoz.io/signoz/pkg/query-service/dao"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
)
|
||||
|
||||
// Higher level tests for UI facing APIs
|
||||
|
||||
func TestSignozIntegrationLifeCycle(t *testing.T) {
|
||||
require := require.New(t)
|
||||
testbed := NewIntegrationsTestBed(t)
|
||||
|
||||
installedResp := testbed.GetInstalledIntegrationsFromQS()
|
||||
require.Equal(
|
||||
len(installedResp.Integrations), 0,
|
||||
"no integrations should be installed at the beginning",
|
||||
)
|
||||
|
||||
availableResp := testbed.GetAvailableIntegrationsFromQS()
|
||||
availableIntegrations := availableResp.Integrations
|
||||
require.Greater(
|
||||
len(availableIntegrations), 0,
|
||||
"some integrations should come bundled with SigNoz",
|
||||
)
|
||||
|
||||
require.False(availableIntegrations[0].IsInstalled)
|
||||
testbed.RequestQSToInstallIntegration(
|
||||
availableIntegrations[0].Id, map[string]interface{}{},
|
||||
)
|
||||
|
||||
ii := testbed.GetIntegrationDetailsFromQS(availableIntegrations[0].Id)
|
||||
require.Equal(ii.Id, availableIntegrations[0].Id)
|
||||
require.NotNil(ii.Installation)
|
||||
|
||||
installedResp = testbed.GetInstalledIntegrationsFromQS()
|
||||
installedIntegrations := installedResp.Integrations
|
||||
require.Equal(len(installedIntegrations), 1)
|
||||
require.Equal(installedIntegrations[0].Id, availableIntegrations[0].Id)
|
||||
|
||||
availableResp = testbed.GetAvailableIntegrationsFromQS()
|
||||
availableIntegrations = availableResp.Integrations
|
||||
require.Greater(len(availableIntegrations), 0)
|
||||
|
||||
require.True(availableIntegrations[0].IsInstalled)
|
||||
testbed.RequestQSToUninstallIntegration(
|
||||
availableIntegrations[0].Id,
|
||||
)
|
||||
|
||||
ii = testbed.GetIntegrationDetailsFromQS(availableIntegrations[0].Id)
|
||||
require.Equal(ii.Id, availableIntegrations[0].Id)
|
||||
require.Nil(ii.Installation)
|
||||
|
||||
installedResp = testbed.GetInstalledIntegrationsFromQS()
|
||||
installedIntegrations = installedResp.Integrations
|
||||
require.Equal(len(installedIntegrations), 0)
|
||||
|
||||
availableResp = testbed.GetAvailableIntegrationsFromQS()
|
||||
availableIntegrations = availableResp.Integrations
|
||||
require.Greater(len(availableIntegrations), 0)
|
||||
require.False(availableIntegrations[0].IsInstalled)
|
||||
}
|
||||
|
||||
type IntegrationsTestBed struct {
|
||||
t *testing.T
|
||||
testUser *model.User
|
||||
qsHttpHandler http.Handler
|
||||
}
|
||||
|
||||
func (tb *IntegrationsTestBed) GetAvailableIntegrationsFromQS() *integrations.IntegrationsListResponse {
|
||||
result := tb.RequestQS("/api/v1/integrations", nil)
|
||||
|
||||
dataJson, err := json.Marshal(result.Data)
|
||||
if err != nil {
|
||||
tb.t.Fatalf("could not marshal apiResponse.Data: %v", err)
|
||||
}
|
||||
var integrationsResp integrations.IntegrationsListResponse
|
||||
err = json.Unmarshal(dataJson, &integrationsResp)
|
||||
if err != nil {
|
||||
tb.t.Fatalf("could not unmarshal apiResponse.Data json into PipelinesResponse")
|
||||
}
|
||||
|
||||
return &integrationsResp
|
||||
}
|
||||
|
||||
func (tb *IntegrationsTestBed) GetInstalledIntegrationsFromQS() *integrations.IntegrationsListResponse {
|
||||
result := tb.RequestQS("/api/v1/integrations?is_installed=true", nil)
|
||||
|
||||
dataJson, err := json.Marshal(result.Data)
|
||||
if err != nil {
|
||||
tb.t.Fatalf("could not marshal apiResponse.Data: %v", err)
|
||||
}
|
||||
var integrationsResp integrations.IntegrationsListResponse
|
||||
err = json.Unmarshal(dataJson, &integrationsResp)
|
||||
if err != nil {
|
||||
tb.t.Fatalf("could not unmarshal apiResponse.Data json into PipelinesResponse")
|
||||
}
|
||||
|
||||
return &integrationsResp
|
||||
}
|
||||
|
||||
func (tb *IntegrationsTestBed) GetIntegrationDetailsFromQS(
|
||||
integrationId string,
|
||||
) *integrations.Integration {
|
||||
result := tb.RequestQS(fmt.Sprintf(
|
||||
"/api/v1/integrations/%s", integrationId,
|
||||
), nil)
|
||||
|
||||
dataJson, err := json.Marshal(result.Data)
|
||||
if err != nil {
|
||||
tb.t.Fatalf("could not marshal apiResponse.Data: %v", err)
|
||||
}
|
||||
var integrationResp integrations.Integration
|
||||
err = json.Unmarshal(dataJson, &integrationResp)
|
||||
if err != nil {
|
||||
tb.t.Fatalf("could not unmarshal apiResponse.Data json into PipelinesResponse")
|
||||
}
|
||||
|
||||
return &integrationResp
|
||||
}
|
||||
|
||||
func (tb *IntegrationsTestBed) RequestQSToInstallIntegration(
|
||||
integrationId string, config map[string]interface{},
|
||||
) {
|
||||
request := integrations.InstallIntegrationRequest{
|
||||
IntegrationId: integrationId,
|
||||
Config: config,
|
||||
}
|
||||
tb.RequestQS("/api/v1/integrations/install", request)
|
||||
}
|
||||
|
||||
func (tb *IntegrationsTestBed) RequestQSToUninstallIntegration(
|
||||
integrationId string,
|
||||
) {
|
||||
request := integrations.UninstallIntegrationRequest{
|
||||
IntegrationId: integrationId,
|
||||
}
|
||||
tb.RequestQS("/api/v1/integrations/uninstall", request)
|
||||
}
|
||||
|
||||
func (tb *IntegrationsTestBed) RequestQS(
|
||||
path string,
|
||||
postData interface{},
|
||||
) *app.ApiResponse {
|
||||
req, err := NewAuthenticatedTestRequest(
|
||||
tb.testUser, path, postData,
|
||||
)
|
||||
if err != nil {
|
||||
tb.t.Fatalf("couldn't create authenticated test request: %v", err)
|
||||
}
|
||||
|
||||
respWriter := httptest.NewRecorder()
|
||||
tb.qsHttpHandler.ServeHTTP(respWriter, req)
|
||||
response := respWriter.Result()
|
||||
responseBody, err := io.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
tb.t.Fatalf("couldn't read response body received from QS: %v", err)
|
||||
}
|
||||
|
||||
if response.StatusCode != 200 {
|
||||
tb.t.Fatalf(
|
||||
"unexpected response status from query service for path %s. status: %d, body: %v\n%v",
|
||||
path, response.StatusCode, string(responseBody), string(debug.Stack()),
|
||||
)
|
||||
}
|
||||
|
||||
var result app.ApiResponse
|
||||
err = json.Unmarshal(responseBody, &result)
|
||||
if err != nil {
|
||||
tb.t.Fatalf(
|
||||
"Could not unmarshal QS response into an ApiResponse.\nResponse body: %s",
|
||||
string(responseBody),
|
||||
)
|
||||
}
|
||||
|
||||
return &result
|
||||
}
|
||||
|
||||
func NewIntegrationsTestBed(t *testing.T) *IntegrationsTestBed {
|
||||
testDB, testDBFilePath := integrations.NewTestSqliteDB(t)
|
||||
|
||||
// TODO(Raj): This should not require passing in the DB file path
|
||||
dao.InitDao("sqlite", testDBFilePath)
|
||||
|
||||
controller, err := integrations.NewController(testDB)
|
||||
if err != nil {
|
||||
t.Fatalf("could not create integrations controller: %v", err)
|
||||
}
|
||||
|
||||
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{
|
||||
AppDao: dao.DB(),
|
||||
IntegrationsController: controller,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("could not create a new ApiHandler: %v", err)
|
||||
}
|
||||
|
||||
router := app.NewRouter()
|
||||
am := app.NewAuthMiddleware(auth.GetUserFromRequest)
|
||||
apiHandler.RegisterIntegrationRoutes(router, am)
|
||||
|
||||
user, apiErr := createTestUser()
|
||||
if apiErr != nil {
|
||||
t.Fatalf("could not create a test user: %v", apiErr)
|
||||
}
|
||||
|
||||
return &IntegrationsTestBed{
|
||||
t: t,
|
||||
testUser: user,
|
||||
qsHttpHandler: router,
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user