feat(sqlstore): open a connection to sql once (#6867)

This commit is contained in:
Vibhu Pandey 2025-01-22 12:49:38 +05:30 committed by GitHub
parent 0baf0e9453
commit 837f434fe9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 118 additions and 158 deletions

View File

@ -111,25 +111,22 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {
// NewServer creates and initializes Server // NewServer creates and initializes Server
func NewServer(serverOptions *ServerOptions) (*Server, error) { func NewServer(serverOptions *ServerOptions) (*Server, error) {
modelDao, err := dao.InitDao(serverOptions.SigNoz.SQLStore.SQLxDB())
modelDao, err := dao.InitDao("sqlite", baseconst.RELATIONAL_DATASOURCE_PATH)
if err != nil { if err != nil {
return nil, err return nil, err
} }
baseexplorer.InitWithDSN(baseconst.RELATIONAL_DATASOURCE_PATH) if err := baseexplorer.InitWithDSN(serverOptions.SigNoz.SQLStore.SQLxDB()); err != nil {
if err := preferences.InitDB(baseconst.RELATIONAL_DATASOURCE_PATH); err != nil {
return nil, err return nil, err
} }
localDB, err := dashboards.InitDB(baseconst.RELATIONAL_DATASOURCE_PATH) if err := preferences.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB()); err != nil {
if err != nil {
return nil, err return nil, err
} }
localDB.SetMaxOpenConns(10) if err := dashboards.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB()); err != nil {
return nil, err
}
gatewayProxy, err := gateway.NewProxy(serverOptions.GatewayUrl, gateway.RoutePrefix) gatewayProxy, err := gateway.NewProxy(serverOptions.GatewayUrl, gateway.RoutePrefix)
if err != nil { if err != nil {
@ -137,7 +134,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
} }
// initiate license manager // initiate license manager
lm, err := licensepkg.StartManager("sqlite", localDB) lm, err := licensepkg.StartManager(serverOptions.SigNoz.SQLStore.SQLxDB())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -151,7 +148,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
if storage == "clickhouse" { if storage == "clickhouse" {
zap.L().Info("Using ClickHouse as datastore ...") zap.L().Info("Using ClickHouse as datastore ...")
qb := db.NewDataConnector( qb := db.NewDataConnector(
localDB, serverOptions.SigNoz.SQLStore.SQLxDB(),
serverOptions.PromConfigPath, serverOptions.PromConfigPath,
lm, lm,
serverOptions.MaxIdleConns, serverOptions.MaxIdleConns,
@ -187,7 +184,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
rm, err := makeRulesManager(serverOptions.PromConfigPath, rm, err := makeRulesManager(serverOptions.PromConfigPath,
baseconst.GetAlertManagerApiPrefix(), baseconst.GetAlertManagerApiPrefix(),
serverOptions.RuleRepoURL, serverOptions.RuleRepoURL,
localDB, serverOptions.SigNoz.SQLStore.SQLxDB(),
reader, reader,
c, c,
serverOptions.DisableRules, serverOptions.DisableRules,
@ -201,19 +198,19 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
} }
// initiate opamp // initiate opamp
_, err = opAmpModel.InitDB(localDB) _, err = opAmpModel.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB())
if err != nil { if err != nil {
return nil, err return nil, err
} }
integrationsController, err := integrations.NewController(localDB) integrationsController, err := integrations.NewController(serverOptions.SigNoz.SQLStore.SQLxDB())
if err != nil { if err != nil {
return nil, fmt.Errorf( return nil, fmt.Errorf(
"couldn't create integrations controller: %w", err, "couldn't create integrations controller: %w", err,
) )
} }
cloudIntegrationsController, err := cloudintegrations.NewController(localDB) cloudIntegrationsController, err := cloudintegrations.NewController(serverOptions.SigNoz.SQLStore.SQLxDB())
if err != nil { if err != nil {
return nil, fmt.Errorf( return nil, fmt.Errorf(
"couldn't create cloud provider integrations controller: %w", err, "couldn't create cloud provider integrations controller: %w", err,
@ -222,7 +219,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
// ingestion pipelines manager // ingestion pipelines manager
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController( logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
localDB, "sqlite", integrationsController.GetPipelinesForInstalledIntegrations, serverOptions.SigNoz.SQLStore.SQLxDB(), integrationsController.GetPipelinesForInstalledIntegrations,
) )
if err != nil { if err != nil {
return nil, err return nil, err
@ -230,8 +227,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
// initiate agent config handler // initiate agent config handler
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{ agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: localDB, DB: serverOptions.SigNoz.SQLStore.SQLxDB(),
DBEngine: AppDbEngine,
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController}, AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController},
}) })
if err != nil { if err != nil {
@ -239,7 +235,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
} }
// start the usagemanager // start the usagemanager
usageManager, err := usage.New("sqlite", modelDao, lm.GetRepo(), reader.GetConn()) usageManager, err := usage.New(modelDao, lm.GetRepo(), reader.GetConn())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,18 +1,10 @@
package dao package dao
import ( import (
"fmt" "github.com/jmoiron/sqlx"
"go.signoz.io/signoz/ee/query-service/dao/sqlite" "go.signoz.io/signoz/ee/query-service/dao/sqlite"
) )
func InitDao(engine, path string) (ModelDao, error) { func InitDao(inputDB *sqlx.DB) (ModelDao, error) {
return sqlite.InitDB(inputDB)
switch engine {
case "sqlite":
return sqlite.InitDB(path)
default:
return nil, fmt.Errorf("qsdb type: %s is not supported in query service", engine)
}
} }

View File

@ -65,8 +65,8 @@ func columnExists(db *sqlx.DB, tableName, columnName string) bool {
} }
// InitDB creates and extends base model DB repository // InitDB creates and extends base model DB repository
func InitDB(dataSourceName string) (*modelDao, error) { func InitDB(inputDB *sqlx.DB) (*modelDao, error) {
dao, err := basedsql.InitDB(dataSourceName) dao, err := basedsql.InitDB(inputDB)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -28,13 +28,8 @@ func NewLicenseRepo(db *sqlx.DB) Repo {
} }
} }
func (r *Repo) InitDB(engine string) error { func (r *Repo) InitDB(inputDB *sqlx.DB) error {
switch engine { return sqlite.InitDB(inputDB)
case "sqlite3", "sqlite":
return sqlite.InitDB(r.db)
default:
return fmt.Errorf("unsupported db")
}
} }
func (r *Repo) GetLicenses(ctx context.Context) ([]model.License, error) { func (r *Repo) GetLicenses(ctx context.Context) ([]model.License, error) {

View File

@ -51,13 +51,13 @@ type Manager struct {
activeFeatures basemodel.FeatureSet activeFeatures basemodel.FeatureSet
} }
func StartManager(dbType string, db *sqlx.DB, features ...basemodel.Feature) (*Manager, error) { func StartManager(db *sqlx.DB, features ...basemodel.Feature) (*Manager, error) {
if LM != nil { if LM != nil {
return LM, nil return LM, nil
} }
repo := NewLicenseRepo(db) repo := NewLicenseRepo(db)
err := repo.InitDB(dbType) err := repo.InitDB(db)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initiate license repo: %v", err) return nil, fmt.Errorf("failed to initiate license repo: %v", err)

View File

@ -179,7 +179,7 @@ func main() {
zap.L().Info("JWT secret key set successfully.") zap.L().Info("JWT secret key set successfully.")
} }
if err := migrate.Migrate(baseconst.RELATIONAL_DATASOURCE_PATH); err != nil { if err := migrate.Migrate(signoz.SQLStore.SQLxDB()); err != nil {
zap.L().Error("Failed to migrate", zap.Error(err)) zap.L().Error("Failed to migrate", zap.Error(err))
} else { } else {
zap.L().Info("Migration successful") zap.L().Info("Migration successful")

View File

@ -46,7 +46,7 @@ type Manager struct {
tenantID string tenantID string
} }
func New(dbType string, modelDao dao.ModelDao, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn) (*Manager, error) { func New(modelDao dao.ModelDao, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn) (*Manager, error) {
hostNameRegex := regexp.MustCompile(`tcp://(?P<hostname>.*):`) hostNameRegex := regexp.MustCompile(`tcp://(?P<hostname>.*):`)
hostNameRegexMatches := hostNameRegex.FindStringSubmatch(os.Getenv("ClickHouseUrl")) hostNameRegexMatches := hostNameRegex.FindStringSubmatch(os.Getenv("ClickHouseUrl"))

View File

@ -19,13 +19,8 @@ type Repo struct {
db *sqlx.DB db *sqlx.DB
} }
func (r *Repo) initDB(engine string) error { func (r *Repo) initDB(inputDB *sqlx.DB) error {
switch engine { return sqlite.InitDB(inputDB)
case "sqlite3", "sqlite":
return sqlite.InitDB(r.db)
default:
return fmt.Errorf("unsupported db")
}
} }
func (r *Repo) GetConfigHistory( func (r *Repo) GetConfigHistory(

View File

@ -39,8 +39,7 @@ type Manager struct {
} }
type ManagerOptions struct { type ManagerOptions struct {
DB *sqlx.DB DB *sqlx.DB
DBEngine string
// When acting as opamp.AgentConfigProvider, agent conf recommendations are // When acting as opamp.AgentConfigProvider, agent conf recommendations are
// applied to the base conf in the order the features have been specified here. // applied to the base conf in the order the features have been specified here.
@ -66,7 +65,7 @@ func Initiate(options *ManagerOptions) (*Manager, error) {
configSubscribers: map[string]func(){}, configSubscribers: map[string]func(){},
} }
err := m.initDB(options.DBEngine) err := m.initDB(options.DB)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "could not init agentConf db") return nil, errors.Wrap(err, "could not init agentConf db")
} }

View File

@ -35,14 +35,8 @@ var (
) )
// InitDB sets up setting up the connection pool global variable. // InitDB sets up setting up the connection pool global variable.
func InitDB(dataSourceName string) (*sqlx.DB, error) { func InitDB(inputDB *sqlx.DB) error {
var err error db = inputDB
db, err = sqlx.Open("sqlite3", dataSourceName)
if err != nil {
return nil, err
}
table_schema := `CREATE TABLE IF NOT EXISTS dashboards ( table_schema := `CREATE TABLE IF NOT EXISTS dashboards (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT NOT NULL UNIQUE, uuid TEXT NOT NULL UNIQUE,
@ -51,9 +45,9 @@ func InitDB(dataSourceName string) (*sqlx.DB, error) {
data TEXT NOT NULL data TEXT NOT NULL
);` );`
_, err = db.Exec(table_schema) _, err := db.Exec(table_schema)
if err != nil { if err != nil {
return nil, fmt.Errorf("error in creating dashboard table: %s", err.Error()) return fmt.Errorf("error in creating dashboard table: %s", err.Error())
} }
table_schema = `CREATE TABLE IF NOT EXISTS rules ( table_schema = `CREATE TABLE IF NOT EXISTS rules (
@ -65,7 +59,7 @@ func InitDB(dataSourceName string) (*sqlx.DB, error) {
_, err = db.Exec(table_schema) _, err = db.Exec(table_schema)
if err != nil { if err != nil {
return nil, fmt.Errorf("error in creating rules table: %s", err.Error()) return fmt.Errorf("error in creating rules table: %s", err.Error())
} }
table_schema = `CREATE TABLE IF NOT EXISTS notification_channels ( table_schema = `CREATE TABLE IF NOT EXISTS notification_channels (
@ -80,7 +74,7 @@ func InitDB(dataSourceName string) (*sqlx.DB, error) {
_, err = db.Exec(table_schema) _, err = db.Exec(table_schema)
if err != nil { if err != nil {
return nil, fmt.Errorf("error in creating notification_channles table: %s", err.Error()) return fmt.Errorf("error in creating notification_channles table: %s", err.Error())
} }
tableSchema := `CREATE TABLE IF NOT EXISTS planned_maintenance ( tableSchema := `CREATE TABLE IF NOT EXISTS planned_maintenance (
@ -96,7 +90,7 @@ func InitDB(dataSourceName string) (*sqlx.DB, error) {
);` );`
_, err = db.Exec(tableSchema) _, err = db.Exec(tableSchema)
if err != nil { if err != nil {
return nil, fmt.Errorf("error in creating planned_maintenance table: %s", err.Error()) return fmt.Errorf("error in creating planned_maintenance table: %s", err.Error())
} }
table_schema = `CREATE TABLE IF NOT EXISTS ttl_status ( table_schema = `CREATE TABLE IF NOT EXISTS ttl_status (
@ -112,49 +106,49 @@ func InitDB(dataSourceName string) (*sqlx.DB, error) {
_, err = db.Exec(table_schema) _, err = db.Exec(table_schema)
if err != nil { if err != nil {
return nil, fmt.Errorf("error in creating ttl_status table: %s", err.Error()) return fmt.Errorf("error in creating ttl_status table: %s", err.Error())
} }
// sqlite does not support "IF NOT EXISTS" // sqlite does not support "IF NOT EXISTS"
createdAt := `ALTER TABLE rules ADD COLUMN created_at datetime;` createdAt := `ALTER TABLE rules ADD COLUMN created_at datetime;`
_, err = db.Exec(createdAt) _, err = db.Exec(createdAt)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") { if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return nil, fmt.Errorf("error in adding column created_at to rules table: %s", err.Error()) return fmt.Errorf("error in adding column created_at to rules table: %s", err.Error())
} }
createdBy := `ALTER TABLE rules ADD COLUMN created_by TEXT;` createdBy := `ALTER TABLE rules ADD COLUMN created_by TEXT;`
_, err = db.Exec(createdBy) _, err = db.Exec(createdBy)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") { if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return nil, fmt.Errorf("error in adding column created_by to rules table: %s", err.Error()) return fmt.Errorf("error in adding column created_by to rules table: %s", err.Error())
} }
updatedBy := `ALTER TABLE rules ADD COLUMN updated_by TEXT;` updatedBy := `ALTER TABLE rules ADD COLUMN updated_by TEXT;`
_, err = db.Exec(updatedBy) _, err = db.Exec(updatedBy)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") { if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return nil, fmt.Errorf("error in adding column updated_by to rules table: %s", err.Error()) return fmt.Errorf("error in adding column updated_by to rules table: %s", err.Error())
} }
createdBy = `ALTER TABLE dashboards ADD COLUMN created_by TEXT;` createdBy = `ALTER TABLE dashboards ADD COLUMN created_by TEXT;`
_, err = db.Exec(createdBy) _, err = db.Exec(createdBy)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") { if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return nil, fmt.Errorf("error in adding column created_by to dashboards table: %s", err.Error()) return fmt.Errorf("error in adding column created_by to dashboards table: %s", err.Error())
} }
updatedBy = `ALTER TABLE dashboards ADD COLUMN updated_by TEXT;` updatedBy = `ALTER TABLE dashboards ADD COLUMN updated_by TEXT;`
_, err = db.Exec(updatedBy) _, err = db.Exec(updatedBy)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") { if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return nil, fmt.Errorf("error in adding column updated_by to dashboards table: %s", err.Error()) return fmt.Errorf("error in adding column updated_by to dashboards table: %s", err.Error())
} }
locked := `ALTER TABLE dashboards ADD COLUMN locked INTEGER DEFAULT 0;` locked := `ALTER TABLE dashboards ADD COLUMN locked INTEGER DEFAULT 0;`
_, err = db.Exec(locked) _, err = db.Exec(locked)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") { if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return nil, fmt.Errorf("error in adding column locked to dashboards table: %s", err.Error()) return fmt.Errorf("error in adding column locked to dashboards table: %s", err.Error())
} }
telemetry.GetInstance().SetDashboardsInfoCallback(GetDashboardsInfo) telemetry.GetInstance().SetDashboardsInfoCallback(GetDashboardsInfo)
return db, nil return nil
} }
type Dashboard struct { type Dashboard struct {
@ -288,7 +282,7 @@ func GetDashboard(ctx context.Context, uuid string) (*Dashboard, *model.ApiError
if err != nil { if err != nil {
return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("no dashboard found with uuid: %s", uuid)} return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("no dashboard found with uuid: %s", uuid)}
} }
return &dashboard, nil return &dashboard, nil
} }

View File

@ -34,13 +34,8 @@ type SavedView struct {
} }
// InitWithDSN sets up setting up the connection pool global variable. // InitWithDSN sets up setting up the connection pool global variable.
func InitWithDSN(dataSourceName string) (*sqlx.DB, error) { func InitWithDSN(inputDB *sqlx.DB) error {
var err error db = inputDB
db, err = sqlx.Open("sqlite3", dataSourceName)
if err != nil {
return nil, err
}
tableSchema := `CREATE TABLE IF NOT EXISTS saved_views ( tableSchema := `CREATE TABLE IF NOT EXISTS saved_views (
uuid TEXT PRIMARY KEY, uuid TEXT PRIMARY KEY,
@ -56,14 +51,14 @@ func InitWithDSN(dataSourceName string) (*sqlx.DB, error) {
extra_data TEXT extra_data TEXT
);` );`
_, err = db.Exec(tableSchema) _, err := db.Exec(tableSchema)
if err != nil { if err != nil {
return nil, fmt.Errorf("error in creating saved views table: %s", err.Error()) return fmt.Errorf("error in creating saved views table: %s", err.Error())
} }
telemetry.GetInstance().SetSavedViewsInfoCallback(GetSavedViewsInfo) telemetry.GetInstance().SetSavedViewsInfoCallback(GetSavedViewsInfo)
return db, nil return nil
} }
func InitWithDB(sqlDB *sqlx.DB) { func InitWithDB(sqlDB *sqlx.DB) {

View File

@ -27,11 +27,10 @@ type LogParsingPipelineController struct {
func NewLogParsingPipelinesController( func NewLogParsingPipelinesController(
db *sqlx.DB, db *sqlx.DB,
engine string,
getIntegrationPipelines func(context.Context) ([]Pipeline, *model.ApiError), getIntegrationPipelines func(context.Context) ([]Pipeline, *model.ApiError),
) (*LogParsingPipelineController, error) { ) (*LogParsingPipelineController, error) {
repo := NewRepo(db) repo := NewRepo(db)
err := repo.InitDB(engine) err := repo.InitDB(db)
return &LogParsingPipelineController{ return &LogParsingPipelineController{
Repo: repo, Repo: repo,
GetIntegrationPipelines: getIntegrationPipelines, GetIntegrationPipelines: getIntegrationPipelines,

View File

@ -29,13 +29,8 @@ func NewRepo(db *sqlx.DB) Repo {
} }
} }
func (r *Repo) InitDB(engine string) error { func (r *Repo) InitDB(inputDB *sqlx.DB) error {
switch engine { return sqlite.InitDB(inputDB)
case "sqlite3", "sqlite":
return sqlite.InitDB(r.db)
default:
return fmt.Errorf("unsupported db")
}
} }
// insertPipeline stores a given postable pipeline to database // insertPipeline stores a given postable pipeline to database

View File

@ -203,14 +203,8 @@ type UpdatePreference struct {
var db *sqlx.DB var db *sqlx.DB
func InitDB(datasourceName string) error { func InitDB(inputDB *sqlx.DB) error {
var err error db = inputDB
db, err = sqlx.Open("sqlite3", datasourceName)
if err != nil {
return err
}
// create the user preference table // create the user preference table
tableSchema := ` tableSchema := `
PRAGMA foreign_keys = ON; PRAGMA foreign_keys = ON;
@ -225,7 +219,7 @@ func InitDB(datasourceName string) error {
ON DELETE CASCADE ON DELETE CASCADE
);` );`
_, err = db.Exec(tableSchema) _, err := db.Exec(tableSchema)
if err != nil { if err != nil {
return fmt.Errorf("error in creating user_preference table: %s", err.Error()) return fmt.Errorf("error in creating user_preference table: %s", err.Error())
} }

View File

@ -99,23 +99,22 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {
// NewServer creates and initializes Server // NewServer creates and initializes Server
func NewServer(serverOptions *ServerOptions) (*Server, error) { func NewServer(serverOptions *ServerOptions) (*Server, error) {
var err error
if err := dao.InitDao("sqlite", constants.RELATIONAL_DATASOURCE_PATH); err != nil { if err := dao.InitDao(serverOptions.SigNoz.SQLStore.SQLxDB()); err != nil {
return nil, err return nil, err
} }
if err := preferences.InitDB(constants.RELATIONAL_DATASOURCE_PATH); err != nil { if err := preferences.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB()); err != nil {
return nil, err return nil, err
} }
localDB, err := dashboards.InitDB(constants.RELATIONAL_DATASOURCE_PATH) if err := dashboards.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB()); err != nil {
explorer.InitWithDSN(constants.RELATIONAL_DATASOURCE_PATH)
if err != nil {
return nil, err return nil, err
} }
localDB.SetMaxOpenConns(10) if err := explorer.InitWithDSN(serverOptions.SigNoz.SQLStore.SQLxDB()); err != nil {
return nil, err
}
// initiate feature manager // initiate feature manager
fm := featureManager.StartManager() fm := featureManager.StartManager()
@ -127,7 +126,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
if storage == "clickhouse" { if storage == "clickhouse" {
zap.L().Info("Using ClickHouse as datastore ...") zap.L().Info("Using ClickHouse as datastore ...")
clickhouseReader := clickhouseReader.NewReader( clickhouseReader := clickhouseReader.NewReader(
localDB, serverOptions.SigNoz.SQLStore.SQLxDB(),
serverOptions.PromConfigPath, serverOptions.PromConfigPath,
fm, fm,
serverOptions.MaxIdleConns, serverOptions.MaxIdleConns,
@ -163,7 +162,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
rm, err := makeRulesManager( rm, err := makeRulesManager(
serverOptions.PromConfigPath, serverOptions.PromConfigPath,
constants.GetAlertManagerApiPrefix(), constants.GetAlertManagerApiPrefix(),
serverOptions.RuleRepoURL, localDB, reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema) serverOptions.RuleRepoURL, serverOptions.SigNoz.SQLStore.SQLxDB(), reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -173,18 +172,18 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err return nil, err
} }
integrationsController, err := integrations.NewController(localDB) integrationsController, err := integrations.NewController(serverOptions.SigNoz.SQLStore.SQLxDB())
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't create integrations controller: %w", err) return nil, fmt.Errorf("couldn't create integrations controller: %w", err)
} }
cloudIntegrationsController, err := cloudintegrations.NewController(localDB) cloudIntegrationsController, err := cloudintegrations.NewController(serverOptions.SigNoz.SQLStore.SQLxDB())
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't create cloud provider integrations controller: %w", err) return nil, fmt.Errorf("couldn't create cloud provider integrations controller: %w", err)
} }
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController( logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
localDB, "sqlite", integrationsController.GetPipelinesForInstalledIntegrations, serverOptions.SigNoz.SQLStore.SQLxDB(), integrationsController.GetPipelinesForInstalledIntegrations,
) )
if err != nil { if err != nil {
return nil, err return nil, err
@ -236,14 +235,13 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
s.privateHTTP = privateServer s.privateHTTP = privateServer
_, err = opAmpModel.InitDB(localDB) _, err = opAmpModel.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB())
if err != nil { if err != nil {
return nil, err return nil, err
} }
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{ agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: localDB, DB: serverOptions.SigNoz.SQLStore.SQLxDB(),
DBEngine: "sqlite",
AgentFeatures: []agentConf.AgentFeature{ AgentFeatures: []agentConf.AgentFeature{
logParsingPipelineController, logParsingPipelineController,
}, },

View File

@ -76,6 +76,7 @@ var AmChannelApiPath = GetOrDefaultEnv("ALERTMANAGER_API_CHANNEL_PATH", "v1/rout
var OTLPTarget = GetOrDefaultEnv("OTEL_EXPORTER_OTLP_ENDPOINT", "") var OTLPTarget = GetOrDefaultEnv("OTEL_EXPORTER_OTLP_ENDPOINT", "")
var LogExportBatchSize = GetOrDefaultEnv("OTEL_BLRP_MAX_EXPORT_BATCH_SIZE", "512") var LogExportBatchSize = GetOrDefaultEnv("OTEL_BLRP_MAX_EXPORT_BATCH_SIZE", "512")
// [Deprecated] SIGNOZ_LOCAL_DB_PATH is deprecated and scheduled for removal. Please use SIGNOZ_SQLSTORE_SQLITE_PATH instead.
var RELATIONAL_DATASOURCE_PATH = GetOrDefaultEnv("SIGNOZ_LOCAL_DB_PATH", "/var/lib/signoz/signoz.db") var RELATIONAL_DATASOURCE_PATH = GetOrDefaultEnv("SIGNOZ_LOCAL_DB_PATH", "/var/lib/signoz/signoz.db")
var DurationSortFeature = GetOrDefaultEnv("DURATION_SORT_FEATURE", "true") var DurationSortFeature = GetOrDefaultEnv("DURATION_SORT_FEATURE", "true")

View File

@ -1,26 +1,19 @@
package dao package dao
import ( import (
"fmt" "github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/dao/sqlite" "go.signoz.io/signoz/pkg/query-service/dao/sqlite"
) )
var db ModelDao var db ModelDao
func InitDao(engine, path string) error { func InitDao(inputDB *sqlx.DB) error {
var err error var err error
db, err = sqlite.InitDB(inputDB)
switch engine { if err != nil {
case "sqlite": return err
db, err = sqlite.InitDB(path)
if err != nil {
return errors.Wrap(err, "failed to initialize DB")
}
default:
return fmt.Errorf("RelationalDB type: %s is not supported in query service", engine)
} }
return nil return nil
} }

View File

@ -17,15 +17,7 @@ type ModelDaoSqlite struct {
} }
// InitDB sets up setting up the connection pool global variable. // InitDB sets up setting up the connection pool global variable.
func InitDB(dataSourceName string) (*ModelDaoSqlite, error) { func InitDB(db *sqlx.DB) (*ModelDaoSqlite, error) {
var err error
db, err := sqlx.Open("sqlite3", dataSourceName)
if err != nil {
return nil, errors.Wrap(err, "failed to Open sqlite3 DB")
}
db.SetMaxOpenConns(10)
table_schema := ` table_schema := `
PRAGMA foreign_keys = ON; PRAGMA foreign_keys = ON;
@ -88,7 +80,7 @@ func InitDB(dataSourceName string) (*ModelDaoSqlite, error) {
); );
` `
_, err = db.Exec(table_schema) _, err := db.Exec(table_schema)
if err != nil { if err != nil {
return nil, fmt.Errorf("error in creating tables: %v", err.Error()) return nil, fmt.Errorf("error in creating tables: %v", err.Error())
} }

View File

@ -122,7 +122,7 @@ func main() {
zap.L().Info("JWT secret key set successfully.") zap.L().Info("JWT secret key set successfully.")
} }
if err := migrate.Migrate(constants.RELATIONAL_DATASOURCE_PATH); err != nil { if err := migrate.Migrate(signoz.SQLStore.SQLxDB()); err != nil {
zap.L().Error("Failed to migrate", zap.Error(err)) zap.L().Error("Failed to migrate", zap.Error(err))
} else { } else {
zap.L().Info("Migration successful") zap.L().Info("Migration successful")

View File

@ -41,11 +41,7 @@ func getMigrationVersion(conn *sqlx.DB, version string) (*DataMigration, error)
return &migration, nil return &migration, nil
} }
func Migrate(dsn string) error { func Migrate(conn *sqlx.DB) error {
conn, err := sqlx.Connect("sqlite3", dsn)
if err != nil {
return err
}
if err := initSchema(conn); err != nil { if err := initSchema(conn); err != nil {
return err return err
} }

View File

@ -461,7 +461,7 @@ func NewTestbedWithoutOpamp(t *testing.T, testDB *sqlx.DB) *LogPipelinesTestBed
} }
controller, err := logparsingpipeline.NewLogParsingPipelinesController( controller, err := logparsingpipeline.NewLogParsingPipelinesController(
testDB, "sqlite", ic.GetPipelinesForInstalledIntegrations, testDB, ic.GetPipelinesForInstalledIntegrations,
) )
if err != nil { if err != nil {
t.Fatalf("could not create a logparsingpipelines controller: %v", err) t.Fatalf("could not create a logparsingpipelines controller: %v", err)
@ -485,8 +485,7 @@ func NewTestbedWithoutOpamp(t *testing.T, testDB *sqlx.DB) *LogPipelinesTestBed
require.Nil(t, err, "failed to init opamp model") require.Nil(t, err, "failed to init opamp model")
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{ agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: testDB, DB: testDB,
DBEngine: "sqlite",
AgentFeatures: []agentConf.AgentFeature{ AgentFeatures: []agentConf.AgentFeature{
apiHandler.LogsParsingPipelineController, apiHandler.LogsParsingPipelineController,
}}) }})

View File

@ -28,11 +28,11 @@ func NewTestSqliteDB(t *testing.T) (testDB *sqlx.DB, testDBFilePath string) {
} }
func NewQueryServiceDBForTests(t *testing.T) *sqlx.DB { func NewQueryServiceDBForTests(t *testing.T) *sqlx.DB {
testDB, testDBFilePath := NewTestSqliteDB(t) testDB, _ := NewTestSqliteDB(t)
// TODO(Raj): This should not require passing in the DB file path // TODO(Raj): This should not require passing in the DB file path
dao.InitDao("sqlite", testDBFilePath) dao.InitDao(testDB)
dashboards.InitDB(testDBFilePath) dashboards.InitDB(testDB)
return testDB return testDB
} }

View File

@ -2,6 +2,8 @@ package signoz
import ( import (
"context" "context"
"fmt"
"os"
"go.signoz.io/signoz/pkg/cache" "go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/config" "go.signoz.io/signoz/pkg/config"
@ -49,5 +51,15 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig) (Confi
return Config{}, err return Config{}, err
} }
mergeAndEnsureBackwardCompatibility(&config)
return config, nil return config, nil
} }
func mergeAndEnsureBackwardCompatibility(config *Config) {
// SIGNOZ_LOCAL_DB_PATH
if os.Getenv("SIGNOZ_LOCAL_DB_PATH") != "" {
fmt.Println("[Deprecated] env SIGNOZ_LOCAL_DB_PATH is deprecated and scheduled for removal. Please use SIGNOZ_SQLSTORE_SQLITE_PATH instead.")
config.SQLStore.Sqlite.Path = os.Getenv("SIGNOZ_LOCAL_DB_PATH")
}
}

View File

@ -6,14 +6,16 @@ import (
"go.signoz.io/signoz/pkg/cache" "go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/factory" "go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/instrumentation" "go.signoz.io/signoz/pkg/instrumentation"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/version" "go.signoz.io/signoz/pkg/version"
"go.signoz.io/signoz/pkg/web" "go.signoz.io/signoz/pkg/web"
) )
type SigNoz struct { type SigNoz struct {
Cache cache.Cache Cache cache.Cache
Web web.Web Web web.Web
SQLStore sqlstore.SQLStore
} }
func New( func New(
@ -54,8 +56,21 @@ func New(
return nil, err return nil, err
} }
// Initialize sqlstore from the available sqlstore provider factories
sqlstore, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
config.SQLStore,
providerConfig.SQLStoreProviderFactories,
config.SQLStore.Provider,
)
if err != nil {
return nil, err
}
return &SigNoz{ return &SigNoz{
Cache: cache, Cache: cache,
Web: web, Web: web,
SQLStore: sqlstore,
}, nil }, nil
} }