feat(sqlmigration): consolidate all sqlmigrations into one package (#7018)

* feat(sqlmigration): add sqlmigrations

* feat(sqlmigration): test sqlmigrations

* feat(sqlmigration): add remaining factories

* feat(sqlmigration): consolidate into single package

* fix(telemetrystore): remove existing env variables

* fix(telemetrystore): fix DSN
This commit is contained in:
Vibhu Pandey 2025-02-04 14:53:36 +05:30 committed by GitHub
parent e414215786
commit dc15ee8176
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 383 additions and 903 deletions

View File

@ -11,7 +11,6 @@ import (
"net"
"net/http"
_ "net/http/pprof" // http profiler
"os"
"regexp"
"time"
@ -149,9 +148,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
var reader interfaces.DataConnector
storage := os.Getenv("STORAGE")
if storage == "clickhouse" {
zap.L().Info("Using ClickHouse as datastore ...")
qb := db.NewDataConnector(
serverOptions.SigNoz.SQLStore.SQLxDB(),
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
@ -165,9 +161,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
)
go qb.Start(readerReady)
reader = qb
} else {
return nil, fmt.Errorf("storage type: %s is not supported in query service", storage)
}
skipConfig := &basemodel.SkipConfig{}
if serverOptions.SkipTopLvlOpsPath != "" {
// read skip config

View File

@ -7,7 +7,6 @@ import (
basedao "go.signoz.io/signoz/pkg/query-service/dao"
basedsql "go.signoz.io/signoz/pkg/query-service/dao/sqlite"
baseint "go.signoz.io/signoz/pkg/query-service/interfaces"
"go.uber.org/zap"
)
type modelDao struct {
@ -29,41 +28,6 @@ func (m *modelDao) checkFeature(key string) error {
return m.flags.CheckFeature(key)
}
func columnExists(db *sqlx.DB, tableName, columnName string) bool {
query := fmt.Sprintf("PRAGMA table_info(%s);", tableName)
rows, err := db.Query(query)
if err != nil {
zap.L().Error("Failed to query table info", zap.Error(err))
return false
}
defer rows.Close()
var (
cid int
name string
ctype string
notnull int
dflt_value *string
pk int
)
for rows.Next() {
err := rows.Scan(&cid, &name, &ctype, &notnull, &dflt_value, &pk)
if err != nil {
zap.L().Error("Failed to scan table info", zap.Error(err))
return false
}
if name == columnName {
return true
}
}
err = rows.Err()
if err != nil {
zap.L().Error("Failed to scan table info", zap.Error(err))
return false
}
return false
}
// InitDB creates and extends base model DB repository
func InitDB(inputDB *sqlx.DB) (*modelDao, error) {
dao, err := basedsql.InitDB(inputDB)
@ -73,69 +37,6 @@ func InitDB(inputDB *sqlx.DB) (*modelDao, error) {
// set package variable so dependent base methods (e.g. AuthCache) will work
basedao.SetDB(dao)
m := &modelDao{ModelDaoSqlite: dao}
table_schema := `
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS org_domains(
id TEXT PRIMARY KEY,
org_id TEXT NOT NULL,
name VARCHAR(50) NOT NULL UNIQUE,
created_at INTEGER NOT NULL,
updated_at INTEGER,
data TEXT NOT NULL,
FOREIGN KEY(org_id) REFERENCES organizations(id)
);
CREATE TABLE IF NOT EXISTS personal_access_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
user_id TEXT NOT NULL,
token TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
last_used INTEGER NOT NULL,
revoked BOOLEAN NOT NULL,
updated_by_user_id TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(id)
);
`
_, err = m.DB().Exec(table_schema)
if err != nil {
return nil, fmt.Errorf("error in creating tables: %v", err.Error())
}
if !columnExists(m.DB(), "personal_access_tokens", "role") {
_, err = m.DB().Exec("ALTER TABLE personal_access_tokens ADD COLUMN role TEXT NOT NULL DEFAULT 'ADMIN';")
if err != nil {
return nil, fmt.Errorf("error in adding column: %v", err.Error())
}
}
if !columnExists(m.DB(), "personal_access_tokens", "updated_at") {
_, err = m.DB().Exec("ALTER TABLE personal_access_tokens ADD COLUMN updated_at INTEGER NOT NULL DEFAULT 0;")
if err != nil {
return nil, fmt.Errorf("error in adding column: %v", err.Error())
}
}
if !columnExists(m.DB(), "personal_access_tokens", "last_used") {
_, err = m.DB().Exec("ALTER TABLE personal_access_tokens ADD COLUMN last_used INTEGER NOT NULL DEFAULT 0;")
if err != nil {
return nil, fmt.Errorf("error in adding column: %v", err.Error())
}
}
if !columnExists(m.DB(), "personal_access_tokens", "revoked") {
_, err = m.DB().Exec("ALTER TABLE personal_access_tokens ADD COLUMN revoked BOOLEAN NOT NULL DEFAULT FALSE;")
if err != nil {
return nil, fmt.Errorf("error in adding column: %v", err.Error())
}
}
if !columnExists(m.DB(), "personal_access_tokens", "updated_by_user_id") {
_, err = m.DB().Exec("ALTER TABLE personal_access_tokens ADD COLUMN updated_by_user_id TEXT NOT NULL DEFAULT '';")
if err != nil {
return nil, fmt.Errorf("error in adding column: %v", err.Error())
}
}
return m, nil
}

View File

@ -10,7 +10,6 @@ import (
"github.com/jmoiron/sqlx"
"github.com/mattn/go-sqlite3"
"go.signoz.io/signoz/ee/query-service/license/sqlite"
"go.signoz.io/signoz/ee/query-service/model"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
@ -28,10 +27,6 @@ func NewLicenseRepo(db *sqlx.DB) Repo {
}
}
func (r *Repo) InitDB(inputDB *sqlx.DB) error {
return sqlite.InitDB(inputDB)
}
func (r *Repo) GetLicensesV3(ctx context.Context) ([]*model.LicenseV3, error) {
licensesData := []model.LicenseDB{}
licenseV3Data := []*model.LicenseV3{}

View File

@ -2,7 +2,6 @@ package license
import (
"context"
"fmt"
"sync/atomic"
"time"
@ -50,11 +49,6 @@ func StartManager(db *sqlx.DB, features ...basemodel.Feature) (*Manager, error)
}
repo := NewLicenseRepo(db)
err := repo.InitDB(db)
if err != nil {
return nil, fmt.Errorf("failed to initiate license repo: %v", err)
}
m := &Manager{
repo: &repo,
}

View File

@ -1,63 +0,0 @@
package sqlite
import (
"fmt"
"github.com/jmoiron/sqlx"
)
func InitDB(db *sqlx.DB) error {
var err error
if db == nil {
return fmt.Errorf("invalid db connection")
}
table_schema := `CREATE TABLE IF NOT EXISTS licenses(
key TEXT PRIMARY KEY,
createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
planDetails TEXT,
activationId TEXT,
validationMessage TEXT,
lastValidated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS sites(
uuid TEXT PRIMARY KEY,
alias VARCHAR(180) DEFAULT 'PROD',
url VARCHAR(300),
createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`
_, err = db.Exec(table_schema)
if err != nil {
return fmt.Errorf("error in creating licenses table: %s", err.Error())
}
table_schema = `CREATE TABLE IF NOT EXISTS feature_status (
name TEXT PRIMARY KEY,
active bool,
usage INTEGER DEFAULT 0,
usage_limit INTEGER DEFAULT 0,
route TEXT
);`
_, err = db.Exec(table_schema)
if err != nil {
return fmt.Errorf("error in creating feature_status table: %s", err.Error())
}
table_schema = `CREATE TABLE IF NOT EXISTS licenses_v3 (
id TEXT PRIMARY KEY,
key TEXT NOT NULL UNIQUE,
data TEXT
);`
_, err = db.Exec(table_schema)
if err != nil {
return fmt.Errorf("error in creating licenses_v3 table: %s", err.Error())
}
return nil
}

View File

@ -18,7 +18,6 @@ import (
"go.signoz.io/signoz/pkg/config/fileprovider"
"go.signoz.io/signoz/pkg/query-service/auth"
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/migrate"
"go.signoz.io/signoz/pkg/query-service/version"
"go.signoz.io/signoz/pkg/signoz"
"google.golang.org/grpc"
@ -183,12 +182,6 @@ func main() {
zap.L().Info("JWT secret key set successfully.")
}
if err := migrate.Migrate(signoz.SQLStore.SQLxDB()); err != nil {
zap.L().Error("Failed to migrate", zap.Error(err))
} else {
zap.L().Info("Migration successful")
}
server, err := app.NewServer(serverOptions)
if err != nil {
zap.L().Fatal("Failed to create server", zap.Error(err))

2
go.mod
View File

@ -32,7 +32,7 @@ require (
github.com/knadh/koanf v1.5.0
github.com/knadh/koanf/v2 v2.1.1
github.com/mailru/easyjson v0.7.7
github.com/mattn/go-sqlite3 v2.0.3+incompatible
github.com/mattn/go-sqlite3 v1.14.24
github.com/oklog/oklog v0.3.2
github.com/open-telemetry/opamp-go v0.5.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.111.0

4
go.sum
View File

@ -521,8 +521,8 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U=
github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=

View File

@ -0,0 +1,13 @@
package configtest
import (
"go.signoz.io/signoz/pkg/config"
"go.signoz.io/signoz/pkg/config/envprovider"
)
func NewResolverConfig() config.ResolverConfig {
return config.ResolverConfig{
Uris: []string{"env:"},
ProviderFactories: []config.ProviderFactory{envprovider.NewFactory()},
}
}

View File

@ -10,7 +10,20 @@ import (
func NewLogger(config Config, wrappers ...loghandler.Wrapper) *slog.Logger {
logger := slog.New(
loghandler.New(
slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: config.Logs.Level, AddSource: true}),
slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: config.Logs.Level, AddSource: true, ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
// This is more in line with OpenTelemetry semantic conventions
if a.Key == slog.SourceKey {
a.Key = "code"
return a
}
if a.Key == slog.TimeKey {
a.Key = "timestamp"
return a
}
return a
}}),
wrappers...,
),
)

View File

@ -8,7 +8,6 @@ import (
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/agentConf/sqlite"
"go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
"golang.org/x/exp/slices"
@ -19,10 +18,6 @@ type Repo struct {
db *sqlx.DB
}
func (r *Repo) initDB(inputDB *sqlx.DB) error {
return sqlite.InitDB(inputDB)
}
func (r *Repo) GetConfigHistory(
ctx context.Context, typ ElementTypeDef, limit int,
) ([]ConfigVersion, *model.ApiError) {

View File

@ -65,10 +65,6 @@ func Initiate(options *ManagerOptions) (*Manager, error) {
configSubscribers: map[string]func(){},
}
err := m.initDB(options.DB)
if err != nil {
return nil, errors.Wrap(err, "could not init agentConf db")
}
return m, nil
}

View File

@ -1,65 +0,0 @@
package sqlite
import (
"fmt"
"github.com/pkg/errors"
"github.com/jmoiron/sqlx"
)
func InitDB(db *sqlx.DB) error {
var err error
if db == nil {
return fmt.Errorf("invalid db connection")
}
table_schema := `CREATE TABLE IF NOT EXISTS agent_config_versions(
id TEXT PRIMARY KEY,
created_by TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_by TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
version INTEGER DEFAULT 1,
active int,
is_valid int,
disabled int,
element_type VARCHAR(120) NOT NULL,
deploy_status VARCHAR(80) NOT NULL DEFAULT 'DIRTY',
deploy_sequence INTEGER,
deploy_result TEXT,
last_hash TEXT,
last_config TEXT,
UNIQUE(element_type, version)
);
CREATE UNIQUE INDEX IF NOT EXISTS agent_config_versions_u1
ON agent_config_versions(element_type, version);
CREATE INDEX IF NOT EXISTS agent_config_versions_nu1
ON agent_config_versions(last_hash);
CREATE TABLE IF NOT EXISTS agent_config_elements(
id TEXT PRIMARY KEY,
created_by TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_by TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
element_id TEXT NOT NULL,
element_type VARCHAR(120) NOT NULL,
version_id TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS agent_config_elements_u1
ON agent_config_elements(version_id, element_id, element_type);
`
_, err = db.Exec(table_schema)
if err != nil {
return errors.Wrap(err, "Error in creating agent config tables")
}
return nil
}

View File

@ -37,42 +37,11 @@ type cloudProviderAccountsRepository interface {
func newCloudProviderAccountsRepository(db *sqlx.DB) (
*cloudProviderAccountsSQLRepository, error,
) {
if err := initAccountsSqliteDBIfNeeded(db); err != nil {
return nil, fmt.Errorf("could not init sqlite DB for cloudintegrations accounts: %w", err)
}
return &cloudProviderAccountsSQLRepository{
db: db,
}, nil
}
func initAccountsSqliteDBIfNeeded(db *sqlx.DB) error {
if db == nil {
return fmt.Errorf("db is required")
}
createTablesStatements := `
CREATE TABLE IF NOT EXISTS cloud_integrations_accounts(
cloud_provider TEXT NOT NULL,
id TEXT NOT NULL,
config_json TEXT,
cloud_account_id TEXT,
last_agent_report_json TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
removed_at TIMESTAMP,
UNIQUE(cloud_provider, id)
)
`
_, err := db.Exec(createTablesStatements)
if err != nil {
return fmt.Errorf(
"could not ensure cloud provider accounts schema in sqlite DB: %w", err,
)
}
return nil
}
type cloudProviderAccountsSQLRepository struct {
db *sqlx.DB
}

View File

@ -38,44 +38,11 @@ type serviceConfigRepository interface {
func newServiceConfigRepository(db *sqlx.DB) (
*serviceConfigSQLRepository, error,
) {
if err := initServiceConfigSqliteDBIfNeeded(db); err != nil {
return nil, fmt.Errorf(
"could not init sqlite DB for cloudintegrations service configs: %w", err,
)
}
return &serviceConfigSQLRepository{
db: db,
}, nil
}
func initServiceConfigSqliteDBIfNeeded(db *sqlx.DB) error {
if db == nil {
return fmt.Errorf("db is required")
}
createTableStatement := `
CREATE TABLE IF NOT EXISTS cloud_integrations_service_configs(
cloud_provider TEXT NOT NULL,
cloud_account_id TEXT NOT NULL,
service_id TEXT NOT NULL,
config_json TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
UNIQUE(cloud_provider, cloud_account_id, service_id)
)
`
_, err := db.Exec(createTableStatement)
if err != nil {
return fmt.Errorf(
"could not ensure cloud provider service configs schema in sqlite DB: %w", err,
)
}
return nil
}
type serviceConfigSQLRepository struct {
db *sqlx.DB
}

View File

@ -37,115 +37,6 @@ var (
// InitDB sets up setting up the connection pool global variable.
func InitDB(inputDB *sqlx.DB) error {
db = inputDB
table_schema := `CREATE TABLE IF NOT EXISTS dashboards (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT NOT NULL UNIQUE,
created_at datetime NOT NULL,
updated_at datetime NOT NULL,
data TEXT NOT NULL
);`
_, err := db.Exec(table_schema)
if err != nil {
return fmt.Errorf("error in creating dashboard table: %s", err.Error())
}
table_schema = `CREATE TABLE IF NOT EXISTS rules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
updated_at datetime NOT NULL,
deleted INTEGER DEFAULT 0,
data TEXT NOT NULL
);`
_, err = db.Exec(table_schema)
if err != nil {
return fmt.Errorf("error in creating rules table: %s", err.Error())
}
table_schema = `CREATE TABLE IF NOT EXISTS notification_channels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at datetime NOT NULL,
updated_at datetime NOT NULL,
name TEXT NOT NULL UNIQUE,
type TEXT NOT NULL,
deleted INTEGER DEFAULT 0,
data TEXT NOT NULL
);`
_, err = db.Exec(table_schema)
if err != nil {
return fmt.Errorf("error in creating notification_channles table: %s", err.Error())
}
tableSchema := `CREATE TABLE IF NOT EXISTS planned_maintenance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
alert_ids TEXT,
schedule TEXT NOT NULL,
created_at datetime NOT NULL,
created_by TEXT NOT NULL,
updated_at datetime NOT NULL,
updated_by TEXT NOT NULL
);`
_, err = db.Exec(tableSchema)
if err != nil {
return fmt.Errorf("error in creating planned_maintenance table: %s", err.Error())
}
table_schema = `CREATE TABLE IF NOT EXISTS ttl_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transaction_id TEXT NOT NULL,
created_at datetime NOT NULL,
updated_at datetime NOT NULL,
table_name TEXT NOT NULL,
ttl INTEGER DEFAULT 0,
cold_storage_ttl INTEGER DEFAULT 0,
status TEXT NOT NULL
);`
_, err = db.Exec(table_schema)
if err != nil {
return fmt.Errorf("error in creating ttl_status table: %s", err.Error())
}
// sqlite does not support "IF NOT EXISTS"
createdAt := `ALTER TABLE rules ADD COLUMN created_at datetime;`
_, err = db.Exec(createdAt)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return fmt.Errorf("error in adding column created_at to rules table: %s", err.Error())
}
createdBy := `ALTER TABLE rules ADD COLUMN created_by TEXT;`
_, err = db.Exec(createdBy)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return fmt.Errorf("error in adding column created_by to rules table: %s", err.Error())
}
updatedBy := `ALTER TABLE rules ADD COLUMN updated_by TEXT;`
_, err = db.Exec(updatedBy)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return fmt.Errorf("error in adding column updated_by to rules table: %s", err.Error())
}
createdBy = `ALTER TABLE dashboards ADD COLUMN created_by TEXT;`
_, err = db.Exec(createdBy)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return fmt.Errorf("error in adding column created_by to dashboards table: %s", err.Error())
}
updatedBy = `ALTER TABLE dashboards ADD COLUMN updated_by TEXT;`
_, err = db.Exec(updatedBy)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return fmt.Errorf("error in adding column updated_by to dashboards table: %s", err.Error())
}
locked := `ALTER TABLE dashboards ADD COLUMN locked INTEGER DEFAULT 0;`
_, err = db.Exec(locked)
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
return fmt.Errorf("error in adding column locked to dashboards table: %s", err.Error())
}
telemetry.GetInstance().SetDashboardsInfoCallback(GetDashboardsInfo)
return nil

View File

@ -36,26 +36,6 @@ type SavedView struct {
// InitWithDSN sets up setting up the connection pool global variable.
func InitWithDSN(inputDB *sqlx.DB) error {
db = inputDB
tableSchema := `CREATE TABLE IF NOT EXISTS saved_views (
uuid TEXT PRIMARY KEY,
name TEXT NOT NULL,
category TEXT NOT NULL,
created_at datetime NOT NULL,
created_by TEXT,
updated_at datetime NOT NULL,
updated_by TEXT,
source_page TEXT NOT NULL,
tags TEXT,
data TEXT NOT NULL,
extra_data TEXT
);`
_, err := db.Exec(tableSchema)
if err != nil {
return fmt.Errorf("error in creating saved views table: %s", err.Error())
}
telemetry.GetInstance().SetSavedViewsInfoCallback(GetSavedViewsInfo)
return nil

View File

@ -56,7 +56,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/dao"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"go.signoz.io/signoz/pkg/query-service/integrations/signozio"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/rules"
@ -543,7 +542,6 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
router.HandleFunc("/api/v1/version", am.OpenAccess(aH.getVersion)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/featureFlags", am.OpenAccess(aH.getFeatureFlags)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/configs", am.OpenAccess(aH.getConfigs)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/health", am.OpenAccess(aH.getHealth)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/listErrors", am.ViewAccess(aH.listErrors)).Methods(http.MethodPost)
@ -1972,16 +1970,6 @@ func (aH *APIHandler) CheckFeature(f string) bool {
return err == nil
}
func (aH *APIHandler) getConfigs(w http.ResponseWriter, r *http.Request) {
configs, err := signozio.FetchDynamicConfigs()
if err != nil {
aH.HandleError(w, err, http.StatusInternalServerError)
return
}
aH.Respond(w, configs)
}
// getHealth is used to check the health of the service.
// 'live' query param can be used to check liveliness of
// the service by checking the database connection.

View File

@ -9,28 +9,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/model"
)
func InitSqliteDBIfNeeded(db *sqlx.DB) error {
if db == nil {
return fmt.Errorf("db is required")
}
createTablesStatements := `
CREATE TABLE IF NOT EXISTS integrations_installed(
integration_id TEXT PRIMARY KEY,
config_json TEXT,
installed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`
_, err := db.Exec(createTablesStatements)
if err != nil {
return fmt.Errorf(
"could not ensure integrations schema in sqlite DB: %w", err,
)
}
return nil
}
type InstalledIntegrationsSqliteRepo struct {
db *sqlx.DB
}
@ -38,13 +16,6 @@ type InstalledIntegrationsSqliteRepo struct {
func NewInstalledIntegrationsSqliteRepo(db *sqlx.DB) (
*InstalledIntegrationsSqliteRepo, error,
) {
err := InitSqliteDBIfNeeded(db)
if err != nil {
return nil, fmt.Errorf(
"couldn't ensure sqlite schema for installed integrations: %w", err,
)
}
return &InstalledIntegrationsSqliteRepo{
db: db,
}, nil

View File

@ -30,11 +30,10 @@ func NewLogParsingPipelinesController(
getIntegrationPipelines func(context.Context) ([]Pipeline, *model.ApiError),
) (*LogParsingPipelineController, error) {
repo := NewRepo(db)
err := repo.InitDB(db)
return &LogParsingPipelineController{
Repo: repo,
GetIntegrationPipelines: getIntegrationPipelines,
}, err
}, nil
}
// PipelinesResponse is used to prepare http response for pipelines config related requests

View File

@ -9,7 +9,6 @@ import (
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline/sqlite"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
@ -29,10 +28,6 @@ func NewRepo(db *sqlx.DB) Repo {
}
}
func (r *Repo) InitDB(inputDB *sqlx.DB) error {
return sqlite.InitDB(inputDB)
}
// insertPipeline stores a given postable pipeline to database
func (r *Repo) insertPipeline(
ctx context.Context, postable *PostablePipeline,

View File

@ -1,35 +0,0 @@
package sqlite
import (
"fmt"
"github.com/pkg/errors"
"github.com/jmoiron/sqlx"
)
func InitDB(db *sqlx.DB) error {
var err error
if db == nil {
return fmt.Errorf("invalid db connection")
}
table_schema := `CREATE TABLE IF NOT EXISTS pipelines(
id TEXT PRIMARY KEY,
order_id INTEGER,
enabled BOOLEAN,
created_by TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
name VARCHAR(400) NOT NULL,
alias VARCHAR(20) NOT NULL,
description TEXT,
filter TEXT NOT NULL,
config_json TEXT
);
`
_, err = db.Exec(table_schema)
if err != nil {
return errors.Wrap(err, "Error in creating pipelines table")
}
return nil
}

View File

@ -33,19 +33,6 @@ func (a *Agents) Count() int {
func InitDB(qsDB *sqlx.DB) (*sqlx.DB, error) {
db = qsDB
tableSchema := `CREATE TABLE IF NOT EXISTS agents (
agent_id TEXT PRIMARY KEY UNIQUE,
started_at datetime NOT NULL,
terminated_at datetime,
current_status TEXT NOT NULL,
effective_config TEXT NOT NULL
);`
_, err := db.Exec(tableSchema)
if err != nil {
return nil, fmt.Errorf("error in creating agents table: %s", err.Error())
}
AllAgents = Agents{
agentsById: make(map[string]*Agent),
connections: make(map[types.Connection]map[string]bool),

View File

@ -205,44 +205,6 @@ var db *sqlx.DB
func InitDB(inputDB *sqlx.DB) error {
db = inputDB
// create the user preference table
tableSchema := `
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS user_preference(
preference_id TEXT NOT NULL,
preference_value TEXT,
user_id TEXT NOT NULL,
PRIMARY KEY (preference_id,user_id),
FOREIGN KEY (user_id)
REFERENCES users(id)
ON UPDATE CASCADE
ON DELETE CASCADE
);`
_, err := db.Exec(tableSchema)
if err != nil {
return fmt.Errorf("error in creating user_preference table: %s", err.Error())
}
// create the org preference table
tableSchema = `
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS org_preference(
preference_id TEXT NOT NULL,
preference_value TEXT,
org_id TEXT NOT NULL,
PRIMARY KEY (preference_id,org_id),
FOREIGN KEY (org_id)
REFERENCES organizations(id)
ON UPDATE CASCADE
ON DELETE CASCADE
);`
_, err = db.Exec(tableSchema)
if err != nil {
return fmt.Errorf("error in creating org_preference table: %s", err.Error())
}
return nil
}

View File

@ -11,7 +11,6 @@ import (
"net"
"net/http"
_ "net/http/pprof" // http profiler
"os"
"regexp"
"time"
@ -123,10 +122,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err
}
var reader interfaces.Reader
storage := os.Getenv("STORAGE")
if storage == "clickhouse" {
zap.L().Info("Using ClickHouse as datastore ...")
clickhouseReader := clickhouseReader.NewReader(
serverOptions.SigNoz.SQLStore.SQLxDB(),
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
@ -139,10 +134,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.SigNoz.Cache,
)
go clickhouseReader.Start(readerReady)
reader = clickhouseReader
} else {
return nil, fmt.Errorf("storage type: %s is not supported in query service", storage)
}
reader := clickhouseReader
skipConfig := &model.SkipConfig{}
if serverOptions.SkipTopLvlOpsPath != "" {
// read skip config

View File

@ -2,7 +2,6 @@ package sqlite
import (
"context"
"fmt"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
@ -18,73 +17,6 @@ type ModelDaoSqlite struct {
// InitDB sets up setting up the connection pool global variable.
func InitDB(db *sqlx.DB) (*ModelDaoSqlite, error) {
table_schema := `
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS invites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
token TEXT NOT NULL,
created_at INTEGER NOT NULL,
role TEXT NOT NULL,
org_id TEXT NOT NULL,
FOREIGN KEY(org_id) REFERENCES organizations(id)
);
CREATE TABLE IF NOT EXISTS organizations (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
created_at INTEGER NOT NULL,
is_anonymous INTEGER NOT NULL DEFAULT 0 CHECK(is_anonymous IN (0,1)),
has_opted_updates INTEGER NOT NULL DEFAULT 1 CHECK(has_opted_updates IN (0,1))
);
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
password TEXT NOT NULL,
created_at INTEGER NOT NULL,
profile_picture_url TEXT,
group_id TEXT NOT NULL,
org_id TEXT NOT NULL,
FOREIGN KEY(group_id) REFERENCES groups(id),
FOREIGN KEY(org_id) REFERENCES organizations(id)
);
CREATE TABLE IF NOT EXISTS groups (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS reset_password_request (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
token TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS user_flags (
user_id TEXT PRIMARY KEY,
flags TEXT,
FOREIGN KEY(user_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS apdex_settings (
service_name TEXT PRIMARY KEY,
threshold FLOAT NOT NULL,
exclude_status_codes TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS ingestion_keys (
key_id TEXT PRIMARY KEY,
name TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ingestion_key TEXT NOT NULL,
ingestion_url TEXT NOT NULL,
data_region TEXT NOT NULL
);
`
_, err := db.Exec(table_schema)
if err != nil {
return nil, fmt.Errorf("error in creating tables: %v", err.Error())
}
mds := &ModelDaoSqlite{db: db}
ctx := context.Background()

View File

@ -1,75 +0,0 @@
package signozio
import (
"encoding/json"
"io"
"net/http"
"time"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model"
)
var C *Client
const (
POST = "POST"
APPLICATION_JSON = "application/json"
)
type Client struct {
Prefix string
}
func New() *Client {
return &Client{
Prefix: constants.ConfigSignozIo,
}
}
func init() {
C = New()
}
// FetchDynamicConfigs fetches configs from config server
func FetchDynamicConfigs() (map[string]Config, *model.ApiError) {
client := http.Client{Timeout: 5 * time.Second}
req, err := http.NewRequest(http.MethodGet, C.Prefix+"/configs", http.NoBody)
if err != nil {
return DefaultConfig, nil
}
req.SetBasicAuth("admin", "SigNoz@adm1n")
httpResponse, err := client.Do(req)
if err != nil {
return DefaultConfig, nil
}
defer httpResponse.Body.Close()
if err != nil {
return DefaultConfig, nil
}
httpBody, err := io.ReadAll(httpResponse.Body)
if err != nil {
return DefaultConfig, nil
}
// read api request result
result := ConfigResult{}
err = json.Unmarshal(httpBody, &result)
if err != nil {
return DefaultConfig, nil
}
switch httpResponse.StatusCode {
case 200, 201:
return result.Data, nil
case 400, 401:
return DefaultConfig, nil
default:
return DefaultConfig, nil
}
}

View File

@ -1,54 +0,0 @@
package signozio
type status string
type ConfigResult struct {
Status status `json:"status"`
Data map[string]Config `json:"data,omitempty"`
ErrorType string `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
}
type Config struct {
Enabled bool `json:"enabled"`
FrontendPositionId string `json:"frontendPositionId"`
Components []ComponentProps `json:"components"`
}
type ComponentProps struct {
Text string `json:"text"`
Position int `json:"position"`
DarkIcon string `json:"darkIcon"`
LightIcon string `json:"lightIcon"`
Href string `json:"href"`
}
var DefaultConfig = map[string]Config{
"helpConfig": {
Enabled: true,
FrontendPositionId: "tooltip",
Components: []ComponentProps{
{
Text: "How to use SigNoz in production",
Position: 1,
LightIcon: "RiseOutlined",
DarkIcon: "RiseOutlined",
Href: "https://signoz.io/docs/production-readiness",
},
{
Text: "Create an issue in GitHub",
Position: 2,
LightIcon: "GithubFilled",
DarkIcon: "GithubOutlined",
Href: "https://github.com/SigNoz/signoz/issues/new/choose",
},
{
Text: "Read the docs",
Position: 3,
LightIcon: "FileTextFilled",
DarkIcon: "FileTextOutlined",
Href: "https://signoz.io/docs",
},
},
},
}

View File

@ -15,7 +15,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/migrate"
"go.signoz.io/signoz/pkg/query-service/version"
"go.signoz.io/signoz/pkg/signoz"
@ -126,12 +125,6 @@ func main() {
zap.L().Info("JWT secret key set successfully.")
}
if err := migrate.Migrate(signoz.SQLStore.SQLxDB()); err != nil {
zap.L().Error("Failed to migrate", zap.Error(err))
} else {
zap.L().Info("Migration successful")
}
server, err := app.NewServer(serverOptions)
if err != nil {
logger.Fatal("Failed to create server", zap.Error(err))

View File

@ -1,50 +0,0 @@
package migrate
import (
"database/sql"
"github.com/jmoiron/sqlx"
)
type DataMigration struct {
ID int `db:"id"`
Version string `db:"version"`
CreatedAt string `db:"created_at"`
Succeeded bool `db:"succeeded"`
}
func initSchema(conn *sqlx.DB) error {
tableSchema := `
CREATE TABLE IF NOT EXISTS data_migrations (
id SERIAL PRIMARY KEY,
version VARCHAR(255) NOT NULL UNIQUE,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
succeeded BOOLEAN NOT NULL DEFAULT FALSE
);
`
_, err := conn.Exec(tableSchema)
if err != nil {
return err
}
return nil
}
func getMigrationVersion(conn *sqlx.DB, version string) (*DataMigration, error) {
var migration DataMigration
err := conn.Get(&migration, "SELECT * FROM data_migrations WHERE version = $1", version)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
return &migration, nil
}
func Migrate(conn *sqlx.DB) error {
if err := initSchema(conn); err != nil {
return err
}
return nil
}

View File

@ -1,13 +1,20 @@
package utils
import (
"context"
"os"
"testing"
"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/factory/providertest"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/dao"
"go.signoz.io/signoz/pkg/sqlmigration"
"go.signoz.io/signoz/pkg/sqlmigrator"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/sqlstore/sqlitesqlstore"
)
func NewTestSqliteDB(t *testing.T) (testDB *sqlx.DB, testDBFilePath string) {
@ -19,11 +26,39 @@ func NewTestSqliteDB(t *testing.T) (testDB *sqlx.DB, testDBFilePath string) {
t.Cleanup(func() { os.Remove(testDBFilePath) })
testDBFile.Close()
testDB, err = sqlx.Open("sqlite3", testDBFilePath)
sqlstore, err := sqlitesqlstore.New(context.Background(), providertest.NewSettings(), sqlstore.Config{Provider: "sqlite", Sqlite: sqlstore.SqliteConfig{Path: testDBFilePath}})
if err != nil {
t.Fatalf("could not open test db sqlite file: %v", err)
t.Fatalf("could not create test db sqlite store: %v", err)
}
sqlmigrations, err := sqlmigration.New(
context.Background(),
providertest.NewSettings(),
sqlmigration.Config{},
factory.MustNewNamedMap(
sqlmigration.NewAddDataMigrationsFactory(),
sqlmigration.NewAddOrganizationFactory(),
sqlmigration.NewAddPreferencesFactory(),
sqlmigration.NewAddDashboardsFactory(),
sqlmigration.NewAddSavedViewsFactory(),
sqlmigration.NewAddAgentsFactory(),
sqlmigration.NewAddPipelinesFactory(),
sqlmigration.NewAddIntegrationsFactory(),
sqlmigration.NewAddLicensesFactory(),
sqlmigration.NewAddPatsFactory(),
),
)
if err != nil {
t.Fatalf("could not create test db sql migrations: %v", err)
}
err = sqlmigrator.New(context.Background(), providertest.NewSettings(), sqlstore, sqlmigrations, sqlmigrator.Config{}).Migrate(context.Background())
if err != nil {
t.Fatalf("could not migrate test db sql migrations: %v", err)
}
testDB = sqlstore.SQLxDB()
return testDB, testDBFilePath
}

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"reflect"
"time"
"go.signoz.io/signoz/pkg/apiserver"
@ -11,6 +12,7 @@ import (
"go.signoz.io/signoz/pkg/config"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/instrumentation"
"go.signoz.io/signoz/pkg/sqlmigration"
"go.signoz.io/signoz/pkg/sqlmigrator"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/telemetrystore"
@ -31,6 +33,9 @@ type Config struct {
// SQLStore config
SQLStore sqlstore.Config `mapstructure:"sqlstore"`
// SQLMigration config
SQLMigration sqlmigration.Config `mapstructure:"sqlmigration"`
// SQLMigrator config
SQLMigrator sqlmigrator.Config `mapstructure:"sqlmigrator"`
@ -72,15 +77,35 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec
mergeAndEnsureBackwardCompatibility(&config, deprecatedFlags)
if err := validateConfig(config); err != nil {
return Config{}, err
}
return config, nil
}
func validateConfig(config Config) error {
rvConfig := reflect.ValueOf(config)
for i := 0; i < rvConfig.NumField(); i++ {
factoryConfig, ok := rvConfig.Field(i).Interface().(factory.Config)
if !ok {
return fmt.Errorf("%q is not of type \"factory.Config\"", rvConfig.Type().Field(i).Name)
}
if err := factoryConfig.Validate(); err != nil {
return fmt.Errorf("failed to validate config %q: %w", rvConfig.Type().Field(i).Tag.Get("mapstructure"), err)
}
}
return nil
}
func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags DeprecatedFlags) {
// SIGNOZ_LOCAL_DB_PATH
if os.Getenv("SIGNOZ_LOCAL_DB_PATH") != "" {
fmt.Println("[Deprecated] env SIGNOZ_LOCAL_DB_PATH is deprecated and scheduled for removal. Please use SIGNOZ_SQLSTORE_SQLITE_PATH instead.")
config.SQLStore.Sqlite.Path = os.Getenv("SIGNOZ_LOCAL_DB_PATH")
}
if os.Getenv("CONTEXT_TIMEOUT") != "" {
fmt.Println("[Deprecated] env CONTEXT_TIMEOUT is deprecated and scheduled for removal. Please use SIGNOZ_APISERVER_TIMEOUT_DEFAULT instead.")
contextTimeoutDuration, err := time.ParseDuration(os.Getenv("CONTEXT_TIMEOUT") + "s")
@ -90,6 +115,7 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
fmt.Println("Error parsing CONTEXT_TIMEOUT, using default value of 60s")
}
}
if os.Getenv("CONTEXT_TIMEOUT_MAX_ALLOWED") != "" {
fmt.Println("[Deprecated] env CONTEXT_TIMEOUT_MAX_ALLOWED is deprecated and scheduled for removal. Please use SIGNOZ_APISERVER_TIMEOUT_MAX instead.")
@ -100,6 +126,12 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
fmt.Println("Error parsing CONTEXT_TIMEOUT_MAX_ALLOWED, using default value of 600s")
}
}
if os.Getenv("STORAGE") != "" {
fmt.Println("[Deprecated] env STORAGE is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_PROVIDER instead.")
config.TelemetryStore.Provider = os.Getenv("STORAGE")
}
if os.Getenv("ClickHouseUrl") != "" {
fmt.Println("[Deprecated] env ClickHouseUrl is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN instead.")
config.TelemetryStore.ClickHouse.DSN = os.Getenv("ClickHouseUrl")
@ -109,10 +141,12 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca
fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS env variable instead.")
config.TelemetryStore.Connection.MaxIdleConns = deprecatedFlags.MaxIdleConns
}
if deprecatedFlags.MaxOpenConns != 100 {
fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS env variable instead.")
config.TelemetryStore.Connection.MaxOpenConns = deprecatedFlags.MaxOpenConns
}
if deprecatedFlags.DialTimeout != 5*time.Second {
fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT environment variable instead.")
config.TelemetryStore.Connection.DialTimeout = deprecatedFlags.DialTimeout

16
pkg/signoz/config_test.go Normal file
View File

@ -0,0 +1,16 @@
package signoz
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"go.signoz.io/signoz/pkg/config/configtest"
)
// This is a test to ensure that all fields of config implement the factory.Config interface and are valid with
// their default values.
func TestValidateConfig(t *testing.T) {
_, err := NewConfig(context.Background(), configtest.NewResolverConfig(), DeprecatedFlags{})
assert.NoError(t, err)
}

View File

@ -56,6 +56,8 @@ func NewProviderConfig() ProviderConfig {
sqlmigration.NewAddAgentsFactory(),
sqlmigration.NewAddPipelinesFactory(),
sqlmigration.NewAddIntegrationsFactory(),
sqlmigration.NewAddLicensesFactory(),
sqlmigration.NewAddPatsFactory(),
),
TelemetryStoreProviderFactories: factory.MustNewNamedMap(
clickhousetelemetrystore.NewFactory(hook),

View File

@ -6,6 +6,8 @@ import (
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/instrumentation"
"go.signoz.io/signoz/pkg/sqlmigration"
"go.signoz.io/signoz/pkg/sqlmigrator"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/telemetrystore"
"go.signoz.io/signoz/pkg/version"
@ -31,6 +33,8 @@ func New(
return nil, err
}
instrumentation.Logger().InfoContext(ctx, "starting signoz", "config", config)
// Get the provider settings from instrumentation
providerSettings := instrumentation.ToProviderSettings()
@ -70,6 +74,7 @@ func New(
return nil, err
}
// Initialize telemetrystore from the available telemetrystore provider factories
telemetrystore, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
@ -81,6 +86,22 @@ func New(
return nil, err
}
// Run migrations on the sqlstore
sqlmigrations, err := sqlmigration.New(
ctx,
providerSettings,
config.SQLMigration,
providerConfig.SQLMigrationProviderFactories,
)
if err != nil {
return nil, err
}
err = sqlmigrator.New(ctx, providerSettings, sqlstore, sqlmigrations, config.SQLMigrator).Migrate(ctx)
if err != nil {
return nil, err
}
return &SigNoz{
Cache: cache,
Web: web,

View File

@ -48,6 +48,17 @@ func (migration *addIntegrations) Up(ctx context.Context, db *bun.DB) error {
return err
}
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS cloud_integrations_service_configs(
cloud_provider TEXT NOT NULL,
cloud_account_id TEXT NOT NULL,
service_id TEXT NOT NULL,
config_json TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
UNIQUE(cloud_provider, cloud_account_id, service_id)
)`); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,74 @@
package sqlmigration
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
)
type addLicenses struct{}
func NewAddLicensesFactory() factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("add_licenses"), newAddLicenses)
}
func newAddLicenses(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
return &addLicenses{}, nil
}
func (migration *addLicenses) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addLicenses) Up(ctx context.Context, db *bun.DB) error {
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS licenses(
key TEXT PRIMARY KEY,
createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
planDetails TEXT,
activationId TEXT,
validationMessage TEXT,
lastValidated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);`); err != nil {
return err
}
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS sites(
uuid TEXT PRIMARY KEY,
alias VARCHAR(180) DEFAULT 'PROD',
url VARCHAR(300),
createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);`); err != nil {
return err
}
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS feature_status (
name TEXT PRIMARY KEY,
active bool,
usage INTEGER DEFAULT 0,
usage_limit INTEGER DEFAULT 0,
route TEXT
);`); err != nil {
return err
}
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS licenses_v3 (
id TEXT PRIMARY KEY,
key TEXT NOT NULL UNIQUE,
data TEXT
);`); err != nil {
return err
}
return nil
}
func (migration *addLicenses) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@ -0,0 +1,110 @@
package sqlmigration
import (
"context"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
"go.signoz.io/signoz/pkg/factory"
)
type addPats struct{}
func NewAddPatsFactory() factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("add_pats"), newAddPats)
}
func newAddPats(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
return &addPats{}, nil
}
func (migration *addPats) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addPats) Up(ctx context.Context, db *bun.DB) error {
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS org_domains(
id TEXT PRIMARY KEY,
org_id TEXT NOT NULL,
name VARCHAR(50) NOT NULL UNIQUE,
created_at INTEGER NOT NULL,
updated_at INTEGER,
data TEXT NOT NULL,
FOREIGN KEY(org_id) REFERENCES organizations(id)
);`); err != nil {
return err
}
if _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS personal_access_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
user_id TEXT NOT NULL,
token TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
last_used INTEGER NOT NULL,
revoked BOOLEAN NOT NULL,
updated_by_user_id TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(id)
);`); err != nil {
return err
}
// table:rules op:add column created_at
if _, err := db.
NewAddColumn().
Table("personal_access_tokens").
ColumnExpr("role test not null default 'ADMIN'").
Apply(WrapIfNotExists(ctx, db, "personal_access_tokens", "role")).
Exec(ctx); err != nil && err != ErrNoExecute {
return err
}
if _, err := db.
NewAddColumn().
Table("personal_access_tokens").
ColumnExpr("updated_at INTEGER NOT NULL DEFAULT 0").
Apply(WrapIfNotExists(ctx, db, "personal_access_tokens", "updated_at")).
Exec(ctx); err != nil && err != ErrNoExecute {
return err
}
if _, err := db.
NewAddColumn().
Table("personal_access_tokens").
ColumnExpr("last_used INTEGER NOT NULL DEFAULT 0").
Apply(WrapIfNotExists(ctx, db, "personal_access_tokens", "last_used")).
Exec(ctx); err != nil && err != ErrNoExecute {
return err
}
if _, err := db.
NewAddColumn().
Table("personal_access_tokens").
ColumnExpr("revoked BOOLEAN NOT NULL DEFAULT FALSE").
Apply(WrapIfNotExists(ctx, db, "personal_access_tokens", "revoked")).
Exec(ctx); err != nil && err != ErrNoExecute {
return err
}
if _, err := db.
NewAddColumn().
Table("personal_access_tokens").
ColumnExpr("updated_by_user_id TEXT NOT NULL DEFAULT ''").
Apply(WrapIfNotExists(ctx, db, "personal_access_tokens", "updated_by_user_id")).
Exec(ctx); err != nil && err != ErrNoExecute {
return err
}
return nil
}
func (migration *addPats) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@ -33,8 +33,8 @@ func newConfig() factory.Config {
}
func (c Config) Validate() error {
if c.Lock.Timeout < c.Lock.Interval {
return errors.New("lock_timeout must be greater than lock_interval")
if c.Lock.Timeout <= c.Lock.Interval {
return errors.New("lock::timeout must be greater than lock::interval")
}
return nil

View File

@ -93,13 +93,15 @@ func (migrator *migrator) Lock(ctx context.Context) error {
select {
case <-timer.C:
err := errors.New("timed out waiting for lock")
migrator.settings.Logger().ErrorContext(ctx, "cannot acquire lock", "error", err, "lock_timeout", migrator.config.Lock.Timeout, "dialect", migrator.dialect)
migrator.settings.Logger().ErrorContext(ctx, "cannot acquire lock", "error", err, "lock_timeout", migrator.config.Lock.Timeout.String(), "dialect", migrator.dialect)
return err
case <-ticker.C:
if err := migrator.migrator.Lock(ctx); err == nil {
var err error
if err = migrator.migrator.Lock(ctx); err == nil {
migrator.settings.Logger().InfoContext(ctx, "acquired migration lock", "dialect", migrator.dialect)
return nil
}
migrator.settings.Logger().ErrorContext(ctx, "attempt to acquire lock failed", "error", err, "lock_interval", migrator.config.Lock.Interval.String(), "dialect", migrator.dialect)
case <-ctx.Done():
return ctx.Err()
}

View File

@ -1,6 +1,7 @@
package telemetrystore
import (
"fmt"
"time"
"go.signoz.io/signoz/pkg/factory"
@ -50,13 +51,15 @@ func newConfig() factory.Config {
DialTimeout: 5 * time.Second,
},
ClickHouse: ClickHouseConfig{
DSN: "http://localhost:9000",
// No default query settings, as default's are set in ch config
DSN: "tcp://localhost:9000",
},
}
}
func (c Config) Validate() error {
if c.Provider != "clickhouse" {
return fmt.Errorf("provider: %q is not supported", c.Provider)
}
return nil
}

View File

@ -8,7 +8,6 @@ import (
)
type TelemetryStore interface {
// Returns the SigNoz Wrapper for Clickhouse
ClickHouseDB() clickhouse.Conn
}

View File

@ -3,8 +3,11 @@ package telemetrystoretest
import (
"github.com/ClickHouse/clickhouse-go/v2"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"go.signoz.io/signoz/pkg/telemetrystore"
)
var _ telemetrystore.TelemetryStore = (*Provider)(nil)
// Provider represents a mock telemetry store provider for testing
type Provider struct {
mock cmock.ClickConnMockCommon
@ -23,8 +26,8 @@ func New() (*Provider, error) {
}, nil
}
// Clickhouse returns the mock Clickhouse connection
func (p *Provider) Clickhouse() clickhouse.Conn {
// ClickhouseDB returns the mock Clickhouse connection
func (p *Provider) ClickHouseDB() clickhouse.Conn {
return p.mock.(clickhouse.Conn)
}

View File

@ -3,8 +3,6 @@ package telemetrystoretest
import (
"testing"
"github.com/ClickHouse/clickhouse-go/v2"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/assert"
)
@ -31,14 +29,7 @@ func TestNew(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, provider)
assert.NotNil(t, provider.Mock())
assert.NotNil(t, provider.Clickhouse())
// Verify the returned interfaces implement the expected types
_, ok := provider.Mock().(cmock.ClickConnMockCommon)
assert.True(t, ok, "Mock() should return cmock.ClickConnMockCommon")
_, ok = provider.Clickhouse().(clickhouse.Conn)
assert.True(t, ok, "Clickhouse() should return clickhouse.Conn")
assert.NotNil(t, provider.ClickHouseDB())
})
}
}