Feat: QS: Log Pipelines for installed integrations (#4674)

* chore: refactor: inject sqlx.DB into opamp.initDB instead of DB file name

* chore: reorganize test utils a little

* chore: add test validating pipelines for installed integrations show up in pipelines list

* chore: get basic integration pipelines testcase passing

* chore: reconcile experimental changes with latest state of develop

* chore: add integration test for reordering of pipelines

* chore: marker for integration pipelines using Id

* chore: hookup propagation of installed integration pipelines by opamp

* chore: add util for mapping slices

* chore: add support for reordering integration pipelines

* chore: exclude user saved integration pipelines if no longer installed

* chore: flesh out rest of intgeration pipelines scenarios

* chore: handle scenario when an integration is installed before any pipelines exist

* chore: notify agentConf of update after uninstalling an integration

* chore: some minor cleanup

* chore: some more cleanup

* chore: update ee server for changed controllers

* chore: some more cleanup

* chore: change builtin integration id prefix to avoid using colons that break yaml

* chore: update builtin integration id in test
This commit is contained in:
Raj Kamal Singh 2024-03-11 14:15:11 +05:30 committed by GitHub
parent a4d5774ae3
commit 9ace374855
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 631 additions and 225 deletions

View File

@ -172,7 +172,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
// initiate opamp
_, err = opAmpModel.InitDB(baseconst.RELATIONAL_DATASOURCE_PATH)
_, err = opAmpModel.InitDB(localDB)
if err != nil {
return nil, err
}
@ -185,7 +185,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
// ingestion pipelines manager
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(localDB, "sqlite")
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
localDB, "sqlite", integrationsController.GetPipelinesForInstalledIntegrations,
)
if err != nil {
return nil, err
}

View File

@ -111,10 +111,6 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) (
return nil, "", errors.Wrap(apiErr.ToError(), "failed to get latest agent config version")
}
if latestConfig == nil {
continue
}
updatedConf, serializedSettingsUsed, apiErr := feature.RecommendAgentConfig(
recommendation, latestConfig,
)
@ -124,13 +120,24 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) (
))
}
recommendation = updatedConf
configId := fmt.Sprintf("%s:%d", featureType, latestConfig.Version)
// It is possible for a feature to recommend collector config
// before any user created config versions exist.
//
// For example, log pipeline config for installed integrations will
// have to be recommended even if the user hasn't created any pipelines yet
configVersion := -1
if latestConfig != nil {
configVersion = latestConfig.Version
}
configId := fmt.Sprintf("%s:%d", featureType, configVersion)
settingVersionsUsed = append(settingVersionsUsed, configId)
m.updateDeployStatus(
context.Background(),
featureType,
latestConfig.Version,
configVersion,
string(DeployInitiated),
"Deployment has started",
configId,
@ -209,6 +216,10 @@ func StartNewVersion(
return cfg, nil
}
func NotifyConfigUpdate(ctx context.Context) {
m.notifyConfigUpdateSubscribers()
}
func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiError {
configVersion, err := GetConfigVersion(ctx, typ, version)

View File

@ -2787,16 +2787,17 @@ func (ah *APIHandler) listLogsPipelines(ctx context.Context) (
*logparsingpipeline.PipelinesResponse, *model.ApiError,
) {
// get lateset agent config
latestVersion := -1
lastestConfig, err := agentConf.GetLatestVersion(ctx, logPipelines)
if err != nil {
if err.Type() != model.ErrorNotFound {
if err != nil && err.Type() != model.ErrorNotFound {
return nil, model.WrapApiError(err, "failed to get latest agent config version")
} else {
return nil, nil
}
}
payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, lastestConfig.Version)
if lastestConfig != nil {
latestVersion = lastestConfig.Version
}
payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, latestVersion)
if err != nil {
return nil, model.WrapApiError(err, "failed to get pipelines")
}

View File

@ -127,7 +127,7 @@ func readBuiltInIntegration(dirpath string) (
)
}
integration.Id = "builtin::" + integration.Id
integration.Id = "builtin-" + integration.Id
return &integration, nil
}

View File

@ -20,7 +20,7 @@ func TestBuiltinIntegrations(t *testing.T) {
"some built in integrations are expected to be bundled.",
)
nginxIntegrationId := "builtin::nginx"
nginxIntegrationId := "builtin-nginx"
res, apiErr := repo.get(context.Background(), []string{
nginxIntegrationId,
})

View File

@ -5,6 +5,8 @@ import (
"fmt"
"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/agentConf"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/model"
)
@ -74,9 +76,14 @@ type InstallIntegrationRequest struct {
func (c *Controller) Install(
ctx context.Context, req *InstallIntegrationRequest,
) (*IntegrationsListItem, *model.ApiError) {
return c.mgr.InstallIntegration(
res, apiErr := c.mgr.InstallIntegration(
ctx, req.IntegrationId, req.Config,
)
if apiErr != nil {
return nil, apiErr
}
agentConf.NotifyConfigUpdate(ctx)
return res, nil
}
type UninstallIntegrationRequest struct {
@ -92,7 +99,18 @@ func (c *Controller) Uninstall(
))
}
return c.mgr.UninstallIntegration(
apiErr := c.mgr.UninstallIntegration(
ctx, req.IntegrationId,
)
if apiErr != nil {
return apiErr
}
agentConf.NotifyConfigUpdate(ctx)
return nil
}
func (c *Controller) GetPipelinesForInstalledIntegrations(
ctx context.Context,
) ([]logparsingpipeline.Pipeline, *model.ApiError) {
return c.mgr.GetPipelinesForInstalledIntegrations(ctx)
}

View File

@ -7,12 +7,14 @@ import (
"strings"
"time"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/rules"
"go.signoz.io/signoz/pkg/query-service/utils"
)
type IntegrationAuthor struct {
@ -294,3 +296,42 @@ func (m *Manager) getInstalledIntegration(
}
return &installation, nil
}
func (m *Manager) GetPipelinesForInstalledIntegrations(
ctx context.Context,
) ([]logparsingpipeline.Pipeline, *model.ApiError) {
installations, apiErr := m.installedIntegrationsRepo.list(ctx)
if apiErr != nil {
return nil, apiErr
}
installedIds := utils.MapSlice(installations, func(i InstalledIntegration) string {
return i.IntegrationId
})
installedIntegrations, apiErr := m.availableIntegrationsRepo.get(ctx, installedIds)
if apiErr != nil {
return nil, apiErr
}
pipelines := []logparsingpipeline.Pipeline{}
for _, ii := range installedIntegrations {
for _, p := range ii.Assets.Logs.Pipelines {
pp := logparsingpipeline.Pipeline{
// Alias is used for identifying integration pipelines. Id can't be used for this
// since versioning while saving pipelines requires a new id for each version
// to avoid altering history when pipelines are edited/reordered etc
Alias: AliasForIntegrationPipeline(ii.Id, p.Alias),
Id: uuid.NewString(),
OrderId: p.OrderId,
Enabled: p.Enabled,
Name: p.Name,
Description: &p.Description,
Filter: p.Filter,
Config: p.Config,
}
pipelines = append(pipelines, pp)
}
}
return pipelines, nil
}

View File

@ -0,0 +1,33 @@
package integrations
import (
"strings"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/constants"
)
const IntegrationPipelineIdSeparator string = "--"
func AliasForIntegrationPipeline(
integrationId string, pipelineName string,
) string {
return strings.Join(
[]string{constants.IntegrationPipelineIdPrefix, integrationId, pipelineName},
IntegrationPipelineIdSeparator,
)
}
// Returns ptr to integration_id string if `p` is a pipeline for an installed integration.
// Returns null otherwise.
func IntegrationIdForPipeline(p logparsingpipeline.Pipeline) *string {
if strings.HasPrefix(p.Alias, constants.IntegrationPipelineIdPrefix) {
parts := strings.Split(p.Alias, IntegrationPipelineIdSeparator)
if len(parts) < 2 {
return nil
}
integrationId := parts[1]
return &integrationId
}
return nil
}

View File

@ -62,6 +62,7 @@ func (r *InstalledIntegrationsSqliteRepo) list(
config_json,
installed_at
from integrations_installed
order by installed_at
`,
)
if err != nil {

View File

@ -2,39 +2,19 @@ package integrations
import (
"context"
"os"
"slices"
"testing"
"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/rules"
"go.signoz.io/signoz/pkg/query-service/utils"
)
func NewTestSqliteDB(t *testing.T) (
db *sqlx.DB, dbFilePath string,
) {
testDBFile, err := os.CreateTemp("", "test-signoz-db-*")
if err != nil {
t.Fatalf("could not create temp file for test db: %v", err)
}
testDBFilePath := testDBFile.Name()
t.Cleanup(func() { os.Remove(testDBFilePath) })
testDBFile.Close()
testDB, err := sqlx.Open("sqlite3", testDBFilePath)
if err != nil {
t.Fatalf("could not open test db sqlite file: %v", err)
}
return testDB, testDBFilePath
}
func NewTestIntegrationsManager(t *testing.T) *Manager {
testDB, _ := NewTestSqliteDB(t)
testDB := utils.NewQueryServiceDBForTests(t)
installedIntegrationsRepo, err := NewInstalledIntegrationsSqliteRepo(testDB)
if err != nil {

View File

@ -4,25 +4,38 @@ import (
"context"
"encoding/json"
"fmt"
"slices"
"strings"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/agentConf"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/multierr"
"go.signoz.io/signoz/pkg/query-service/utils"
"go.uber.org/zap"
)
// Controller takes care of deployment cycle of log parsing pipelines.
type LogParsingPipelineController struct {
Repo
GetIntegrationPipelines func(context.Context) ([]Pipeline, *model.ApiError)
}
func NewLogParsingPipelinesController(db *sqlx.DB, engine string) (*LogParsingPipelineController, error) {
func NewLogParsingPipelinesController(
db *sqlx.DB,
engine string,
getIntegrationPipelines func(context.Context) ([]Pipeline, *model.ApiError),
) (*LogParsingPipelineController, error) {
repo := NewRepo(db)
err := repo.InitDB(engine)
return &LogParsingPipelineController{Repo: repo}, err
return &LogParsingPipelineController{
Repo: repo,
GetIntegrationPipelines: getIntegrationPipelines,
}, err
}
// PipelinesResponse is used to prepare http response for pipelines config related requests
@ -47,29 +60,22 @@ func (ic *LogParsingPipelineController) ApplyPipelines(
var pipelines []Pipeline
// scan through postable pipelines, to select the existing pipelines or insert missing ones
for _, r := range postable {
for idx, r := range postable {
// note: we process only new and changed pipelines here, deleted pipelines are not expected
// from client. if user deletes a pipelines, the client should not send that pipelines in the update.
// in effect, the new config version will not have that pipelines.
if r.Id == "" {
// looks like a new or changed pipeline, store it first
inserted, err := ic.insertPipeline(ctx, &r)
if err != nil {
zap.S().Errorf("failed to insert edited pipeline %s", err.Error())
return nil, model.WrapApiError(err, "failed to insert edited pipeline")
} else {
pipelines = append(pipelines, *inserted)
}
} else {
selected, err := ic.GetPipeline(ctx, r.Id)
if err != nil {
zap.S().Errorf("failed to find edited pipeline %s", err.Error())
return nil, model.WrapApiError(err, "failed to find edited pipeline")
}
pipelines = append(pipelines, *selected)
// For versioning, pipelines get stored with unique ids each time they are saved.
// This ensures updating a pipeline doesn't alter historical versions that referenced
// the same pipeline id.
r.Id = uuid.NewString()
r.OrderId = idx + 1
pipeline, apiErr := ic.insertPipeline(ctx, &r)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "failed to insert pipeline")
}
pipelines = append(pipelines, *pipeline)
}
@ -85,35 +91,86 @@ func (ic *LogParsingPipelineController) ApplyPipelines(
return nil, err
}
history, _ := agentConf.GetConfigHistory(ctx, agentConf.ElementTypeLogPipelines, 10)
insertedCfg, _ := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, cfg.Version)
response := &PipelinesResponse{
ConfigVersion: insertedCfg,
Pipelines: pipelines,
History: history,
return ic.GetPipelinesByVersion(ctx, cfg.Version)
}
if err != nil {
return response, model.WrapApiError(err, "failed to apply pipelines")
// Returns effective list of pipelines including user created
// pipelines and pipelines for installed integrations
func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion(
ctx context.Context, version int,
) ([]Pipeline, *model.ApiError) {
result := []Pipeline{}
if version >= 0 {
savedPipelines, errors := ic.getPipelinesByVersion(ctx, version)
if errors != nil {
zap.S().Errorf("failed to get pipelines for version %d, %w", version, errors)
return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version"))
}
return response, nil
result = savedPipelines
}
integrationPipelines, apiErr := ic.GetIntegrationPipelines(ctx)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "could not get pipelines for installed integrations",
)
}
// Filter out any integration pipelines included in pipelines saved by user
// if the corresponding integration is no longer installed.
ipAliases := utils.MapSlice(integrationPipelines, func(p Pipeline) string {
return p.Alias
})
result = utils.FilterSlice(result, func(p Pipeline) bool {
if !strings.HasPrefix(p.Alias, constants.IntegrationPipelineIdPrefix) {
return true
}
return slices.Contains(ipAliases, p.Alias)
})
// Add installed integration pipelines to the list of pipelines saved by user.
// Users are allowed to enable/disable and reorder integration pipelines while
// saving the pipeline list.
for _, ip := range integrationPipelines {
userPipelineIdx := slices.IndexFunc(result, func(p Pipeline) bool {
return p.Alias == ip.Alias
})
if userPipelineIdx >= 0 {
ip.Enabled = result[userPipelineIdx].Enabled
result[userPipelineIdx] = ip
} else {
// installed integration pipelines get added to the end of the list by default.
result = append(result, ip)
}
}
for idx := range result {
result[idx].OrderId = idx + 1
}
return result, nil
}
// GetPipelinesByVersion responds with version info and associated pipelines
func (ic *LogParsingPipelineController) GetPipelinesByVersion(
ctx context.Context, version int,
) (*PipelinesResponse, *model.ApiError) {
pipelines, errors := ic.getPipelinesByVersion(ctx, version)
pipelines, errors := ic.getEffectivePipelinesByVersion(ctx, version)
if errors != nil {
zap.S().Errorf("failed to get pipelines for version %d, %w", version, errors)
return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version"))
}
configVersion, err := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, version)
var configVersion *agentConf.ConfigVersion
if version >= 0 {
cv, err := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, version)
if err != nil {
zap.S().Errorf("failed to get config for version %d, %s", version, err.Error())
return nil, model.WrapApiError(err, "failed to get config for given version")
}
configVersion = cv
}
return &PipelinesResponse{
ConfigVersion: configVersion,
@ -163,26 +220,29 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
serializedSettingsUsed string,
apiErr *model.ApiError,
) {
pipelinesVersion := -1
if configVersion != nil {
pipelinesVersion = configVersion.Version
}
pipelines, errs := pc.getPipelinesByVersion(
context.Background(), configVersion.Version,
pipelinesResp, apiErr := pc.GetPipelinesByVersion(
context.Background(), pipelinesVersion,
)
if len(errs) > 0 {
return nil, "", model.InternalError(multierr.Combine(errs...))
if apiErr != nil {
return nil, "", apiErr
}
updatedConf, apiErr := GenerateCollectorConfigWithPipelines(
currentConfYaml, pipelines,
currentConfYaml, pipelinesResp.Pipelines,
)
if apiErr != nil {
return nil, "", model.WrapApiError(apiErr, "could not marshal yaml for updated conf")
}
rawPipelineData, err := json.Marshal(pipelines)
rawPipelineData, err := json.Marshal(pipelinesResp.Pipelines)
if err != nil {
return nil, "", model.BadRequest(errors.Wrap(err, "could not serialize pipelines to JSON"))
}
return updatedConf, string(rawPipelineData), nil
}

View File

@ -2,7 +2,6 @@ package opamp
import (
"fmt"
"os"
"testing"
"github.com/knadh/koanf"
@ -13,6 +12,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/app/opamp/model"
"go.signoz.io/signoz/pkg/query-service/utils"
"golang.org/x/exp/maps"
)
@ -165,16 +165,8 @@ type testbed struct {
}
func newTestbed(t *testing.T) *testbed {
// Init opamp model.
testDBFile, err := os.CreateTemp("", "test-signoz-db-*")
if err != nil {
t.Fatalf("could not create temp file for test db: %v", err)
}
testDBFilePath := testDBFile.Name()
t.Cleanup(func() { os.Remove(testDBFilePath) })
testDBFile.Close()
_, err = model.InitDB(testDBFilePath)
testDB := utils.NewQueryServiceDBForTests(t)
_, err := model.InitDB(testDB)
if err != nil {
t.Fatalf("could not init opamp model: %v", err)
}

View File

@ -29,14 +29,9 @@ func (a *Agents) Count() int {
return len(a.connections)
}
// InitDB initializes the database and creates the agents table.
func InitDB(dataSourceName string) (*sqlx.DB, error) {
var err error
db, err = sqlx.Open("sqlite3", dataSourceName)
if err != nil {
return nil, err
}
// Initialize the database and create schema if needed
func InitDB(qsDB *sqlx.DB) (*sqlx.DB, error) {
db = qsDB
tableSchema := `CREATE TABLE IF NOT EXISTS agents (
agent_id TEXT PRIMARY KEY UNIQUE,
@ -46,7 +41,7 @@ func InitDB(dataSourceName string) (*sqlx.DB, error) {
effective_config TEXT NOT NULL
);`
_, err = db.Exec(tableSchema)
_, err := db.Exec(tableSchema)
if err != nil {
return nil, fmt.Errorf("Error in creating agents table: %s", err.Error())
}

View File

@ -159,12 +159,12 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
integrationsController, err := integrations.NewController(localDB)
if err != nil {
return nil, fmt.Errorf(
"couldn't create integrations controller: %w", err,
)
return nil, fmt.Errorf("couldn't create integrations controller: %w", err)
}
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(localDB, "sqlite")
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
localDB, "sqlite", integrationsController.GetPipelinesForInstalledIntegrations,
)
if err != nil {
return nil, err
}
@ -213,7 +213,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
s.privateHTTP = privateServer
_, err = opAmpModel.InitDB(constants.RELATIONAL_DATASOURCE_PATH)
_, err = opAmpModel.InitDB(localDB)
if err != nil {
return nil, err
}

View File

@ -308,6 +308,8 @@ var ReservedColumnTargetAliases = map[string]struct{}{
// logsPPLPfx is a short constant for logsPipelinePrefix
const LogsPPLPfx = "logstransform/pipeline_"
const IntegrationPipelineIdPrefix = "integration"
// The datatype present here doesn't represent the actual datatype of column in the logs table.
var StaticFieldsLogsV3 = map[string]v3.AttributeKey{

View File

@ -1,14 +1,11 @@
package tests
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"runtime/debug"
"strings"
"testing"
@ -18,10 +15,10 @@ import (
"github.com/knadh/koanf/parsers/yaml"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/agentConf"
"go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/integrations"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/app/opamp"
opampModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
@ -31,20 +28,21 @@ import (
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
"go.signoz.io/signoz/pkg/query-service/utils"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)
func TestLogPipelinesLifecycle(t *testing.T) {
testbed := NewLogPipelinesTestBed(t)
assert := assert.New(t)
testbed := NewLogPipelinesTestBed(t, nil)
require := require.New(t)
getPipelinesResp := testbed.GetPipelinesFromQS()
assert.Equal(
require.Equal(
0, len(getPipelinesResp.Pipelines),
"There should be no pipelines at the start",
)
assert.Equal(
require.Equal(
0, len(getPipelinesResp.History),
"There should be no pipelines config history at the start",
)
@ -118,11 +116,11 @@ func TestLogPipelinesLifecycle(t *testing.T) {
)
// Deployment status should be pending.
assert.Equal(
require.Equal(
1, len(getPipelinesResp.History),
"pipelines config history should not be empty after 1st configuration",
)
assert.Equal(
require.Equal(
agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
"pipelines deployment should be in progress after 1st configuration",
)
@ -134,7 +132,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
assertPipelinesResponseMatchesPostedPipelines(
t, postablePipelines, getPipelinesResp,
)
assert.Equal(
require.Equal(
agentConf.Deployed,
getPipelinesResp.History[0].DeployStatus,
"pipeline deployment should be complete after acknowledgment from opamp client",
@ -149,12 +147,13 @@ func TestLogPipelinesLifecycle(t *testing.T) {
testbed.assertPipelinesSentToOpampClient(updatePipelinesResp.Pipelines)
testbed.assertNewAgentGetsPipelinesOnConnection(updatePipelinesResp.Pipelines)
assert.Equal(
2, len(updatePipelinesResp.History),
getPipelinesResp = testbed.GetPipelinesFromQS()
require.Equal(
2, len(getPipelinesResp.History),
"there should be 2 history entries after posting pipelines config for the 2nd time",
)
assert.Equal(
agentConf.DeployInitiated, updatePipelinesResp.History[0].DeployStatus,
require.Equal(
agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
"deployment should be in progress for latest pipeline config",
)
@ -165,7 +164,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
assertPipelinesResponseMatchesPostedPipelines(
t, postablePipelines, getPipelinesResp,
)
assert.Equal(
require.Equal(
agentConf.Deployed,
getPipelinesResp.History[0].DeployStatus,
"deployment for latest pipeline config should be complete after acknowledgment from opamp client",
@ -174,7 +173,7 @@ func TestLogPipelinesLifecycle(t *testing.T) {
func TestLogPipelinesHistory(t *testing.T) {
require := require.New(t)
testbed := NewLogPipelinesTestBed(t)
testbed := NewLogPipelinesTestBed(t, nil)
// Only the latest config version can be "IN_PROGRESS",
// other incomplete deployments should have status "UNKNOWN"
@ -356,7 +355,7 @@ func TestLogPipelinesValidation(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
testbed := NewLogPipelinesTestBed(t)
testbed := NewLogPipelinesTestBed(t, nil)
testbed.PostPipelinesToQSExpectingStatusCode(
logparsingpipeline.PostablePipelines{
Pipelines: []logparsingpipeline.PostablePipeline{tc.Pipeline},
@ -369,7 +368,7 @@ func TestLogPipelinesValidation(t *testing.T) {
func TestCanSavePipelinesWithoutConnectedAgents(t *testing.T) {
require := require.New(t)
testbed := NewTestbedWithoutOpamp(t)
testbed := NewTestbedWithoutOpamp(t, nil)
getPipelinesResp := testbed.GetPipelinesFromQS()
require.Equal(0, len(getPipelinesResp.Pipelines))
@ -422,7 +421,6 @@ func TestCanSavePipelinesWithoutConnectedAgents(t *testing.T) {
// configuring log pipelines and provides test helpers.
type LogPipelinesTestBed struct {
t *testing.T
testDBFilePath string
testUser *model.User
apiHandler *app.APIHandler
agentConfMgr *agentConf.Manager
@ -430,25 +428,20 @@ type LogPipelinesTestBed struct {
opampClientConn *opamp.MockOpAmpConnection
}
func NewTestbedWithoutOpamp(t *testing.T) *LogPipelinesTestBed {
// Create a tmp file based sqlite db for testing.
testDBFile, err := os.CreateTemp("", "test-signoz-db-*")
if err != nil {
t.Fatalf("could not create temp file for test db: %v", err)
// testDB can be injected for sharing a DB across multiple integration testbeds.
func NewTestbedWithoutOpamp(t *testing.T, testDB *sqlx.DB) *LogPipelinesTestBed {
if testDB == nil {
testDB = utils.NewQueryServiceDBForTests(t)
}
testDBFilePath := testDBFile.Name()
t.Cleanup(func() { os.Remove(testDBFilePath) })
testDBFile.Close()
// TODO(Raj): move away from singleton DB instances to avoid
// issues when running tests in parallel.
dao.InitDao("sqlite", testDBFilePath)
testDB, err := sqlx.Open("sqlite3", testDBFilePath)
ic, err := integrations.NewController(testDB)
if err != nil {
t.Fatalf("could not open test db sqlite file: %v", err)
t.Fatalf("could not create integrations controller: %v", err)
}
controller, err := logparsingpipeline.NewLogParsingPipelinesController(testDB, "sqlite")
controller, err := logparsingpipeline.NewLogParsingPipelinesController(
testDB, "sqlite", ic.GetPipelinesForInstalledIntegrations,
)
if err != nil {
t.Fatalf("could not create a logparsingpipelines controller: %v", err)
}
@ -467,7 +460,7 @@ func NewTestbedWithoutOpamp(t *testing.T) *LogPipelinesTestBed {
}
// Mock an available opamp agent
testDB, err = opampModel.InitDB(testDBFilePath)
testDB, err = opampModel.InitDB(testDB)
require.Nil(t, err, "failed to init opamp model")
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
@ -480,15 +473,14 @@ func NewTestbedWithoutOpamp(t *testing.T) *LogPipelinesTestBed {
return &LogPipelinesTestBed{
t: t,
testDBFilePath: testDBFilePath,
testUser: user,
apiHandler: apiHandler,
agentConfMgr: agentConfMgr,
}
}
func NewLogPipelinesTestBed(t *testing.T) *LogPipelinesTestBed {
testbed := NewTestbedWithoutOpamp(t)
func NewLogPipelinesTestBed(t *testing.T, testDB *sqlx.DB) *LogPipelinesTestBed {
testbed := NewTestbedWithoutOpamp(t, testDB)
opampServer := opamp.InitializeServer(nil, testbed.agentConfMgr)
err := opampServer.Start(opamp.GetAvailableLocalAddress())
@ -590,8 +582,8 @@ func (tb *LogPipelinesTestBed) GetPipelinesFromQS() *logparsingpipeline.Pipeline
if response.StatusCode != 200 {
tb.t.Fatalf(
"could not list log parsing pipelines. status: %d, body: %v",
response.StatusCode, string(responseBody),
"could not list log parsing pipelines. status: %d, body: %v\n%s",
response.StatusCode, string(responseBody), string(debug.Stack()),
)
}
@ -625,7 +617,7 @@ func assertPipelinesRecommendedInRemoteConfig(
pipelines []logparsingpipeline.Pipeline,
) {
collectorConfigFiles := msg.RemoteConfig.Config.ConfigMap
assert.Equal(
require.Equal(
t, len(collectorConfigFiles), 1,
"otel config sent to client is expected to contain atleast 1 file",
)
@ -653,7 +645,7 @@ func assertPipelinesRecommendedInRemoteConfig(
}
_, expectedLogProcessorNames, err := logparsingpipeline.PreparePipelineProcessor(pipelines)
assert.Equal(
require.Equal(
t, expectedLogProcessorNames, collectorConfLogsPipelineProcNames,
"config sent to opamp client doesn't contain expected log pipelines",
)
@ -661,7 +653,7 @@ func assertPipelinesRecommendedInRemoteConfig(
collectorConfProcessors := collectorConfSentToClient["processors"].(map[string]interface{})
for _, procName := range expectedLogProcessorNames {
pipelineProcessorInConf, procExists := collectorConfProcessors[procName]
assert.True(t, procExists, fmt.Sprintf(
require.True(t, procExists, fmt.Sprintf(
"%s processor not found in config sent to opamp client", procName,
))
@ -747,16 +739,16 @@ func assertPipelinesResponseMatchesPostedPipelines(
postablePipelines logparsingpipeline.PostablePipelines,
pipelinesResp *logparsingpipeline.PipelinesResponse,
) {
assert.Equal(
require.Equal(
t, len(postablePipelines.Pipelines), len(pipelinesResp.Pipelines),
"length mistmatch between posted pipelines and pipelines in response",
)
for i, pipeline := range pipelinesResp.Pipelines {
postable := postablePipelines.Pipelines[i]
assert.Equal(t, postable.Name, pipeline.Name, "pipeline.Name mismatch")
assert.Equal(t, postable.OrderId, pipeline.OrderId, "pipeline.OrderId mismatch")
assert.Equal(t, postable.Enabled, pipeline.Enabled, "pipeline.Enabled mismatch")
assert.Equal(t, postable.Config, pipeline.Config, "pipeline.Config mismatch")
require.Equal(t, postable.Name, pipeline.Name, "pipeline.Name mismatch")
require.Equal(t, postable.OrderId, pipeline.OrderId, "pipeline.OrderId mismatch")
require.Equal(t, postable.Enabled, pipeline.Enabled, "pipeline.Enabled mismatch")
require.Equal(t, postable.Config, pipeline.Config, "pipeline.Config mismatch")
}
}
@ -792,60 +784,3 @@ func newInitialAgentConfigMap() *protobufs.AgentConfigMap {
},
}
}
func createTestUser() (*model.User, *model.ApiError) {
// Create a test user for auth
ctx := context.Background()
org, apiErr := dao.DB().CreateOrg(ctx, &model.Organization{
Name: "test",
})
if apiErr != nil {
return nil, apiErr
}
group, apiErr := dao.DB().GetGroupByName(ctx, constants.AdminGroup)
if apiErr != nil {
return nil, apiErr
}
auth.InitAuthCache(ctx)
return dao.DB().CreateUser(
ctx,
&model.User{
Name: "test",
Email: "test@test.com",
Password: "test",
OrgId: org.Id,
GroupId: group.Id,
},
true,
)
}
func NewAuthenticatedTestRequest(
user *model.User,
path string,
postData interface{},
) (*http.Request, error) {
userJwt, err := auth.GenerateJWTForUser(user)
if err != nil {
return nil, err
}
var req *http.Request
if postData != nil {
var body bytes.Buffer
err = json.NewEncoder(&body).Encode(postData)
if err != nil {
return nil, err
}
req = httptest.NewRequest(http.MethodPost, path, &body)
} else {
req = httptest.NewRequest(http.MethodGet, path, nil)
}
req.Header.Add("Authorization", "Bearer "+userJwt.AccessJwt)
return req, nil
}

View File

@ -7,23 +7,28 @@ import (
"net/http"
"net/http/httptest"
"runtime/debug"
"slices"
"testing"
"github.com/jmoiron/sqlx"
mockhouse "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/integrations"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/dao"
"go.signoz.io/signoz/pkg/query-service/featureManager"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
)
// Higher level tests for UI facing APIs
func TestSignozIntegrationLifeCycle(t *testing.T) {
require := require.New(t)
testbed := NewIntegrationsTestBed(t)
testbed := NewIntegrationsTestBed(t, nil)
installedResp := testbed.GetInstalledIntegrationsFromQS()
require.Equal(
@ -92,6 +97,184 @@ func TestSignozIntegrationLifeCycle(t *testing.T) {
require.False(availableIntegrations[0].IsInstalled)
}
func TestLogPipelinesForInstalledSignozIntegrations(t *testing.T) {
require := require.New(t)
testDB := utils.NewQueryServiceDBForTests(t)
integrationsTB := NewIntegrationsTestBed(t, testDB)
pipelinesTB := NewLogPipelinesTestBed(t, testDB)
availableIntegrationsResp := integrationsTB.GetAvailableIntegrationsFromQS()
availableIntegrations := availableIntegrationsResp.Integrations
require.Greater(
len(availableIntegrations), 0,
"some integrations should come bundled with SigNoz",
)
getPipelinesResp := pipelinesTB.GetPipelinesFromQS()
require.Equal(
0, len(getPipelinesResp.Pipelines),
"There should be no pipelines at the start",
)
// Find an available integration that contains a log pipeline
var testAvailableIntegration *integrations.IntegrationsListItem
for _, ai := range availableIntegrations {
details := integrationsTB.GetIntegrationDetailsFromQS(ai.Id)
require.NotNil(details)
if len(details.Assets.Logs.Pipelines) > 0 {
testAvailableIntegration = &ai
break
}
}
require.NotNil(testAvailableIntegration)
// Installing an integration should add its pipelines to pipelines list
require.False(testAvailableIntegration.IsInstalled)
integrationsTB.RequestQSToInstallIntegration(
testAvailableIntegration.Id, map[string]interface{}{},
)
testIntegration := integrationsTB.GetIntegrationDetailsFromQS(testAvailableIntegration.Id)
require.NotNil(testIntegration.Installation)
testIntegrationPipelines := testIntegration.Assets.Logs.Pipelines
require.Greater(
len(testIntegrationPipelines), 0,
"test integration expected to have a pipeline",
)
getPipelinesResp = pipelinesTB.GetPipelinesFromQS()
require.Equal(
len(testIntegrationPipelines), len(getPipelinesResp.Pipelines),
"Pipelines for installed integrations should appear in pipelines list",
)
lastPipeline := getPipelinesResp.Pipelines[len(getPipelinesResp.Pipelines)-1]
require.NotNil(integrations.IntegrationIdForPipeline(lastPipeline))
require.Equal(testIntegration.Id, *integrations.IntegrationIdForPipeline(lastPipeline))
pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines)
pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines)
// After saving a user created pipeline, pipelines response should include
// both user created pipelines and pipelines for installed integrations.
postablePipelines := logparsingpipeline.PostablePipelines{
Pipelines: []logparsingpipeline.PostablePipeline{
{
OrderId: 1,
Name: "pipeline1",
Alias: "pipeline1",
Enabled: true,
Filter: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "method",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
},
},
},
Config: []logparsingpipeline.PipelineOperator{
{
OrderId: 1,
ID: "add",
Type: "add",
Field: "attributes.test",
Value: "val",
Enabled: true,
Name: "test add",
},
},
},
},
}
pipelinesTB.PostPipelinesToQS(postablePipelines)
getPipelinesResp = pipelinesTB.GetPipelinesFromQS()
require.Equal(1+len(testIntegrationPipelines), len(getPipelinesResp.Pipelines))
pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines)
pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines)
// Reordering integration pipelines should be possible.
postable := postableFromPipelines(getPipelinesResp.Pipelines)
slices.Reverse(postable.Pipelines)
for i := range postable.Pipelines {
postable.Pipelines[i].OrderId = i + 1
}
pipelinesTB.PostPipelinesToQS(postable)
getPipelinesResp = pipelinesTB.GetPipelinesFromQS()
firstPipeline := getPipelinesResp.Pipelines[0]
require.NotNil(integrations.IntegrationIdForPipeline(firstPipeline))
require.Equal(testIntegration.Id, *integrations.IntegrationIdForPipeline(firstPipeline))
pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines)
pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines)
// enabling/disabling integration pipelines should be possible.
require.True(firstPipeline.Enabled)
postable.Pipelines[0].Enabled = false
pipelinesTB.PostPipelinesToQS(postable)
getPipelinesResp = pipelinesTB.GetPipelinesFromQS()
require.Equal(1+len(testIntegrationPipelines), len(getPipelinesResp.Pipelines))
firstPipeline = getPipelinesResp.Pipelines[0]
require.NotNil(integrations.IntegrationIdForPipeline(firstPipeline))
require.Equal(testIntegration.Id, *integrations.IntegrationIdForPipeline(firstPipeline))
require.False(firstPipeline.Enabled)
pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines)
pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines)
// should not be able to edit integrations pipeline.
require.Greater(len(postable.Pipelines[0].Config), 0)
postable.Pipelines[0].Config = []logparsingpipeline.PipelineOperator{}
pipelinesTB.PostPipelinesToQS(postable)
getPipelinesResp = pipelinesTB.GetPipelinesFromQS()
require.Equal(1+len(testIntegrationPipelines), len(getPipelinesResp.Pipelines))
firstPipeline = getPipelinesResp.Pipelines[0]
require.NotNil(integrations.IntegrationIdForPipeline(firstPipeline))
require.Equal(testIntegration.Id, *integrations.IntegrationIdForPipeline(firstPipeline))
require.False(firstPipeline.Enabled)
require.Greater(len(firstPipeline.Config), 0)
// should not be able to delete integrations pipeline
postable.Pipelines = []logparsingpipeline.PostablePipeline{postable.Pipelines[1]}
pipelinesTB.PostPipelinesToQS(postable)
getPipelinesResp = pipelinesTB.GetPipelinesFromQS()
require.Equal(1+len(testIntegrationPipelines), len(getPipelinesResp.Pipelines))
lastPipeline = getPipelinesResp.Pipelines[1]
require.NotNil(integrations.IntegrationIdForPipeline(lastPipeline))
require.Equal(testIntegration.Id, *integrations.IntegrationIdForPipeline(lastPipeline))
// Uninstalling an integration should remove its pipelines
// from pipelines list in the UI
integrationsTB.RequestQSToUninstallIntegration(
testIntegration.Id,
)
getPipelinesResp = pipelinesTB.GetPipelinesFromQS()
require.Equal(
1, len(getPipelinesResp.Pipelines),
"Pipelines for uninstalled integrations should get removed from pipelines list",
)
pipelinesTB.assertPipelinesSentToOpampClient(getPipelinesResp.Pipelines)
pipelinesTB.assertNewAgentGetsPipelinesOnConnection(getPipelinesResp.Pipelines)
}
type IntegrationsTestBed struct {
t *testing.T
testUser *model.User
@ -232,11 +415,11 @@ func (tb *IntegrationsTestBed) mockLogQueryResponse(logsInResponse []model.Signo
addLogsQueryExpectation(tb.mockClickhouse, logsInResponse)
}
func NewIntegrationsTestBed(t *testing.T) *IntegrationsTestBed {
testDB, testDBFilePath := integrations.NewTestSqliteDB(t)
// TODO(Raj): This should not require passing in the DB file path
dao.InitDao("sqlite", testDBFilePath)
// testDB can be injected for sharing a DB across multiple integration testbeds.
func NewIntegrationsTestBed(t *testing.T, testDB *sqlx.DB) *IntegrationsTestBed {
if testDB == nil {
testDB = utils.NewQueryServiceDBForTests(t)
}
controller, err := integrations.NewController(testDB)
if err != nil {
@ -272,3 +455,30 @@ func NewIntegrationsTestBed(t *testing.T) *IntegrationsTestBed {
mockClickhouse: mockClickhouse,
}
}
func postableFromPipelines(pipelines []logparsingpipeline.Pipeline) logparsingpipeline.PostablePipelines {
result := logparsingpipeline.PostablePipelines{}
for _, p := range pipelines {
postable := logparsingpipeline.PostablePipeline{
Id: p.Id,
OrderId: p.OrderId,
Name: p.Name,
Alias: p.Alias,
Enabled: p.Enabled,
Config: p.Config,
}
if p.Description != nil {
postable.Description = *p.Description
}
if p.Filter != nil {
postable.Filter = p.Filter
}
result.Pipelines = append(result.Pipelines, postable)
}
return result
}

View File

@ -1,7 +1,12 @@
package tests
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
@ -12,6 +17,9 @@ import (
mockhouse "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/dao"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
"golang.org/x/exp/maps"
@ -131,3 +139,62 @@ func makeTestSignozLog(
return testLog
}
func createTestUser() (*model.User, *model.ApiError) {
// Create a test user for auth
ctx := context.Background()
org, apiErr := dao.DB().CreateOrg(ctx, &model.Organization{
Name: "test",
})
if apiErr != nil {
return nil, apiErr
}
group, apiErr := dao.DB().GetGroupByName(ctx, constants.AdminGroup)
if apiErr != nil {
return nil, apiErr
}
auth.InitAuthCache(ctx)
userId := uuid.NewString()
return dao.DB().CreateUser(
ctx,
&model.User{
Id: userId,
Name: "test",
Email: userId[:8] + "test@test.com",
Password: "test",
OrgId: org.Id,
GroupId: group.Id,
},
true,
)
}
func NewAuthenticatedTestRequest(
user *model.User,
path string,
postData interface{},
) (*http.Request, error) {
userJwt, err := auth.GenerateJWTForUser(user)
if err != nil {
return nil, err
}
var req *http.Request
if postData != nil {
var body bytes.Buffer
err = json.NewEncoder(&body).Encode(postData)
if err != nil {
return nil, err
}
req = httptest.NewRequest(http.MethodPost, path, &body)
} else {
req = httptest.NewRequest(http.MethodGet, path, nil)
}
req.Header.Add("Authorization", "Bearer "+userJwt.AccessJwt)
return req, nil
}

View File

@ -0,0 +1,29 @@
package utils
// Map as in map-reduce.
func MapSlice[Slice ~[]Elem, Elem any, Output any](
slice Slice, mapper func(Elem) Output,
) []Output {
result := []Output{}
for _, item := range slice {
mapped := mapper(item)
result = append(result, mapped)
}
return result
}
func FilterSlice[Slice ~[]Elem, Elem any](
slice Slice, filterFn func(Elem) bool,
) Slice {
result := Slice{}
for _, item := range slice {
if filterFn(item) {
result = append(result, item)
}
}
return result
}

View File

@ -0,0 +1,29 @@
package utils
import (
"os"
"testing"
"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/dao"
)
func NewQueryServiceDBForTests(t *testing.T) *sqlx.DB {
testDBFile, err := os.CreateTemp("", "test-signoz-db-*")
if err != nil {
t.Fatalf("could not create temp file for test db: %v", err)
}
testDBFilePath := testDBFile.Name()
t.Cleanup(func() { os.Remove(testDBFilePath) })
testDBFile.Close()
testDB, err := sqlx.Open("sqlite3", testDBFilePath)
if err != nil {
t.Fatalf("could not open test db sqlite file: %v", err)
}
// TODO(Raj): This should not require passing in the DB file path
dao.InitDao("sqlite", testDBFilePath)
return testDB
}