package postgressqlstore import ( "context" "database/sql" "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/factory" "github.com/SigNoz/signoz/pkg/sqlstore" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/stdlib" "github.com/jmoiron/sqlx" "github.com/uptrace/bun" "github.com/uptrace/bun/dialect/pgdialect" ) type provider struct { settings factory.ScopedProviderSettings sqldb *sql.DB bundb *sqlstore.BunDB sqlxdb *sqlx.DB dialect *dialect } func NewFactory(hookFactories ...factory.ProviderFactory[sqlstore.SQLStoreHook, sqlstore.Config]) factory.ProviderFactory[sqlstore.SQLStore, sqlstore.Config] { return factory.NewProviderFactory(factory.MustNewName("postgres"), func(ctx context.Context, providerSettings factory.ProviderSettings, config sqlstore.Config) (sqlstore.SQLStore, error) { hooks := make([]sqlstore.SQLStoreHook, len(hookFactories)) for i, hookFactory := range hookFactories { hook, err := hookFactory.New(ctx, providerSettings, config) if err != nil { return nil, err } hooks[i] = hook } return New(ctx, providerSettings, config, hooks...) }) } func New(ctx context.Context, providerSettings factory.ProviderSettings, config sqlstore.Config, hooks ...sqlstore.SQLStoreHook) (sqlstore.SQLStore, error) { settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/sqlstore/postgressqlstore") pgConfig, err := pgxpool.ParseConfig(config.Postgres.DSN) if err != nil { return nil, err } // Set the maximum number of open connections pgConfig.MaxConns = int32(config.Connection.MaxOpenConns) // Use pgxpool to create a connection pool pool, err := pgxpool.NewWithConfig(ctx, pgConfig) if err != nil { return nil, err } sqldb := stdlib.OpenDBFromPool(pool) return &provider{ settings: settings, sqldb: sqldb, bundb: sqlstore.NewBunDB(settings, sqldb, pgdialect.New(), hooks), sqlxdb: sqlx.NewDb(sqldb, "postgres"), dialect: new(dialect), }, nil } func (provider *provider) BunDB() *bun.DB { return provider.bundb.DB } func (provider *provider) SQLDB() *sql.DB { return provider.sqldb } func (provider *provider) SQLxDB() *sqlx.DB { return provider.sqlxdb } func (provider *provider) Dialect() sqlstore.SQLDialect { return provider.dialect } func (provider *provider) BunDBCtx(ctx context.Context) bun.IDB { return provider.bundb.BunDBCtx(ctx) } func (provider *provider) RunInTxCtx(ctx context.Context, opts *sql.TxOptions, cb func(ctx context.Context) error) error { return provider.bundb.RunInTxCtx(ctx, opts, cb) } func (provider *provider) WrapNotFoundErrf(err error, code errors.Code, format string, args ...any) error { if err == sql.ErrNoRows { return errors.Wrapf(err, errors.TypeNotFound, code, format, args...) } return err } func (provider *provider) WrapAlreadyExistsErrf(err error, code errors.Code, format string, args ...any) error { var pgErr *pgconn.PgError if errors.As(err, &pgErr) && pgErr.Code == "23505" { return errors.Wrapf(err, errors.TypeAlreadyExists, code, format, args...) } return err }