fix: telemetry store (#6923)

* fix: inital changes for telemetry store

* fix: add tests and use proper config for conn

* fix: add telemetry store test

* fix: add backward compatibility for old variables and update example conf

* fix: move wrapper to telemetry store

* fix: no need to pass query for settings

* fix: remove redundant config for ch conn

* fix: use clickhouse dsn instead

* fix: update example config

* fix: update backward compatibility code

* fix: use hooks in telemetrystore

* fix: address minor comments

---------

Co-authored-by: Vibhu Pandey <vibhupandey28@gmail.com>
This commit is contained in:
Nityananda Gohain 2025-01-30 15:51:55 +05:30 committed by GitHub
parent ffd72cf406
commit d1e7cc128f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 578 additions and 305 deletions

View File

@ -78,3 +78,18 @@ apiserver:
logging: logging:
excluded_routes: excluded_routes:
- /api/v1/health - /api/v1/health
##################### TelemetryStore #####################
telemetrystore:
# specifies the telemetrystore provider to use.
provider: clickhouse
clickhouse:
# The DSN to use for ClickHouse.
dsn: http://localhost:9000
# Maximum number of idle connections in the connection pool.
max_idle_conns: 50
# Maximum number of open connections to the database.
max_open_conns: 100
# Maximum time to wait for a connection to be established.
dial_timeout: 5s

View File

@ -26,9 +26,6 @@ type APIHandlerOptions struct {
DataConnector interfaces.DataConnector DataConnector interfaces.DataConnector
SkipConfig *basemodel.SkipConfig SkipConfig *basemodel.SkipConfig
PreferSpanMetrics bool PreferSpanMetrics bool
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
AppDao dao.ModelDao AppDao dao.ModelDao
RulesManager *rules.Manager RulesManager *rules.Manager
UsageManager *usage.Manager UsageManager *usage.Manager
@ -57,9 +54,6 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
Reader: opts.DataConnector, Reader: opts.DataConnector,
SkipConfig: opts.SkipConfig, SkipConfig: opts.SkipConfig,
PreferSpanMetrics: opts.PreferSpanMetrics, PreferSpanMetrics: opts.PreferSpanMetrics,
MaxIdleConns: opts.MaxIdleConns,
MaxOpenConns: opts.MaxOpenConns,
DialTimeout: opts.DialTimeout,
AppDao: opts.AppDao, AppDao: opts.AppDao,
RuleManager: opts.RulesManager, RuleManager: opts.RulesManager,
FeatureFlags: opts.FeatureFlags, FeatureFlags: opts.FeatureFlags,

View File

@ -20,22 +20,20 @@ type ClickhouseReader struct {
func NewDataConnector( func NewDataConnector(
localDB *sqlx.DB, localDB *sqlx.DB,
ch clickhouse.Conn,
promConfigPath string, promConfigPath string,
lm interfaces.FeatureLookup, lm interfaces.FeatureLookup,
maxIdleConns int,
maxOpenConns int,
dialTimeout time.Duration,
cluster string, cluster string,
useLogsNewSchema bool, useLogsNewSchema bool,
useTraceNewSchema bool, useTraceNewSchema bool,
fluxIntervalForTraceDetail time.Duration, fluxIntervalForTraceDetail time.Duration,
cache cache.Cache, cache cache.Cache,
) *ClickhouseReader { ) *ClickhouseReader {
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) chReader := basechr.NewReader(localDB, ch, promConfigPath, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
return &ClickhouseReader{ return &ClickhouseReader{
conn: ch.GetConn(), conn: ch,
appdb: localDB, appdb: localDB,
ClickHouseReader: ch, ClickHouseReader: chReader,
} }
} }

View File

@ -74,9 +74,6 @@ type ServerOptions struct {
DisableRules bool DisableRules bool
RuleRepoURL string RuleRepoURL string
PreferSpanMetrics bool PreferSpanMetrics bool
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
CacheConfigPath string CacheConfigPath string
FluxInterval string FluxInterval string
FluxIntervalForTraceDetail string FluxIntervalForTraceDetail string
@ -157,11 +154,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
zap.L().Info("Using ClickHouse as datastore ...") zap.L().Info("Using ClickHouse as datastore ...")
qb := db.NewDataConnector( qb := db.NewDataConnector(
serverOptions.SigNoz.SQLStore.SQLxDB(), serverOptions.SigNoz.SQLStore.SQLxDB(),
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
serverOptions.PromConfigPath, serverOptions.PromConfigPath,
lm, lm,
serverOptions.MaxIdleConns,
serverOptions.MaxOpenConns,
serverOptions.DialTimeout,
serverOptions.Cluster, serverOptions.Cluster,
serverOptions.UseLogsNewSchema, serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema, serverOptions.UseTraceNewSchema,
@ -245,7 +240,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
} }
// start the usagemanager // start the usagemanager
usageManager, err := usage.New(modelDao, lm.GetRepo(), reader.GetConn()) usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickHouseDB(), serverOptions.Config.TelemetryStore.ClickHouse.DSN)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -266,9 +261,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
DataConnector: reader, DataConnector: reader,
SkipConfig: skipConfig, SkipConfig: skipConfig,
PreferSpanMetrics: serverOptions.PreferSpanMetrics, PreferSpanMetrics: serverOptions.PreferSpanMetrics,
MaxIdleConns: serverOptions.MaxIdleConns,
MaxOpenConns: serverOptions.MaxOpenConns,
DialTimeout: serverOptions.DialTimeout,
AppDao: modelDao, AppDao: modelDao,
RulesManager: rm, RulesManager: rm,
UsageManager: usageManager, UsageManager: usageManager,

View File

@ -141,6 +141,10 @@ func main() {
envprovider.NewFactory(), envprovider.NewFactory(),
fileprovider.NewFactory(), fileprovider.NewFactory(),
}, },
}, signoz.DeprecatedFlags{
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
}) })
if err != nil { if err != nil {
zap.L().Fatal("Failed to create config", zap.Error(err)) zap.L().Fatal("Failed to create config", zap.Error(err))
@ -161,9 +165,6 @@ func main() {
PrivateHostPort: baseconst.PrivateHostPort, PrivateHostPort: baseconst.PrivateHostPort,
DisableRules: disableRules, DisableRules: disableRules,
RuleRepoURL: ruleRepoURL, RuleRepoURL: ruleRepoURL,
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
CacheConfigPath: cacheConfigPath, CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval, FluxInterval: fluxInterval,
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail, FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os"
"regexp" "regexp"
"strings" "strings"
"sync/atomic" "sync/atomic"
@ -46,9 +45,9 @@ type Manager struct {
tenantID string tenantID string
} }
func New(modelDao dao.ModelDao, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn) (*Manager, error) { func New(modelDao dao.ModelDao, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn, chUrl string) (*Manager, error) {
hostNameRegex := regexp.MustCompile(`tcp://(?P<hostname>.*):`) hostNameRegex := regexp.MustCompile(`tcp://(?P<hostname>.*):`)
hostNameRegexMatches := hostNameRegex.FindStringSubmatch(os.Getenv("ClickHouseUrl")) hostNameRegexMatches := hostNameRegex.FindStringSubmatch(chUrl)
tenantID := "" tenantID := ""
if len(hostNameRegexMatches) == 2 { if len(hostNameRegexMatches) == 2 {

View File

@ -1,11 +1,9 @@
package clickhouseReader package clickhouseReader
import ( import (
"context"
"time" "time"
"github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2"
"go.uber.org/zap"
) )
type Encoding string type Encoding string
@ -18,7 +16,6 @@ const (
) )
const ( const (
defaultDatasource string = "tcp://localhost:9000"
defaultTraceDB string = "signoz_traces" defaultTraceDB string = "signoz_traces"
defaultOperationsTable string = "distributed_signoz_operations" defaultOperationsTable string = "distributed_signoz_operations"
defaultIndexTable string = "distributed_signoz_index_v2" defaultIndexTable string = "distributed_signoz_index_v2"
@ -58,9 +55,6 @@ type namespaceConfig struct {
namespace string namespace string
Enabled bool Enabled bool
Datasource string Datasource string
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
TraceDB string TraceDB string
OperationsTable string OperationsTable string
IndexTable string IndexTable string
@ -99,37 +93,6 @@ type namespaceConfig struct {
// Connecto defines how to connect to the database // Connecto defines how to connect to the database
type Connector func(cfg *namespaceConfig) (clickhouse.Conn, error) type Connector func(cfg *namespaceConfig) (clickhouse.Conn, error)
func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) {
ctx := context.Background()
options, err := clickhouse.ParseDSN(cfg.Datasource)
if err != nil {
return nil, err
}
// Check if the DSN contained any of the following options, if not set from configuration
if options.MaxIdleConns == 0 {
options.MaxIdleConns = cfg.MaxIdleConns
}
if options.MaxOpenConns == 0 {
options.MaxOpenConns = cfg.MaxOpenConns
}
if options.DialTimeout == 0 {
options.DialTimeout = cfg.DialTimeout
}
zap.L().Info("Connecting to Clickhouse", zap.String("at", options.Addr[0]), zap.Int("MaxIdleConns", options.MaxIdleConns), zap.Int("MaxOpenConns", options.MaxOpenConns), zap.Duration("DialTimeout", options.DialTimeout))
db, err := clickhouse.Open(options)
if err != nil {
return nil, err
}
if err := db.Ping(ctx); err != nil {
return nil, err
}
return db, nil
}
// Options store storage plugin related configs // Options store storage plugin related configs
type Options struct { type Options struct {
primary *namespaceConfig primary *namespaceConfig
@ -139,26 +102,13 @@ type Options struct {
// NewOptions creates a new Options struct. // NewOptions creates a new Options struct.
func NewOptions( func NewOptions(
datasource string,
maxIdleConns int,
maxOpenConns int,
dialTimeout time.Duration,
primaryNamespace string, primaryNamespace string,
otherNamespaces ...string, otherNamespaces ...string,
) *Options { ) *Options {
if datasource == "" {
datasource = defaultDatasource
}
options := &Options{ options := &Options{
primary: &namespaceConfig{ primary: &namespaceConfig{
namespace: primaryNamespace, namespace: primaryNamespace,
Enabled: true, Enabled: true,
Datasource: datasource,
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
TraceDB: defaultTraceDB, TraceDB: defaultTraceDB,
OperationsTable: defaultOperationsTable, OperationsTable: defaultOperationsTable,
IndexTable: defaultIndexTable, IndexTable: defaultIndexTable,
@ -181,7 +131,6 @@ func NewOptions(
WriteBatchDelay: defaultWriteBatchDelay, WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize, WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding, Encoding: defaultEncoding,
Connector: defaultConnector,
LogsTableV2: defaultLogsTableV2, LogsTableV2: defaultLogsTableV2,
LogsLocalTableV2: defaultLogsLocalTableV2, LogsLocalTableV2: defaultLogsLocalTableV2,
@ -200,7 +149,6 @@ func NewOptions(
if namespace == archiveNamespace { if namespace == archiveNamespace {
options.others[namespace] = &namespaceConfig{ options.others[namespace] = &namespaceConfig{
namespace: namespace, namespace: namespace,
Datasource: datasource,
TraceDB: "", TraceDB: "",
OperationsTable: "", OperationsTable: "",
IndexTable: "", IndexTable: "",
@ -214,7 +162,6 @@ func NewOptions(
WriteBatchDelay: defaultWriteBatchDelay, WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize, WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding, Encoding: defaultEncoding,
Connector: defaultConnector,
} }
} else { } else {
options.others[namespace] = &namespaceConfig{namespace: namespace} options.others[namespace] = &namespaceConfig{namespace: namespace}

View File

@ -166,26 +166,16 @@ type ClickHouseReader struct {
// NewTraceReader returns a TraceReader for the database // NewTraceReader returns a TraceReader for the database
func NewReader( func NewReader(
localDB *sqlx.DB, localDB *sqlx.DB,
db driver.Conn,
configFile string, configFile string,
featureFlag interfaces.FeatureLookup, featureFlag interfaces.FeatureLookup,
maxIdleConns int,
maxOpenConns int,
dialTimeout time.Duration,
cluster string, cluster string,
useLogsNewSchema bool, useLogsNewSchema bool,
useTraceNewSchema bool, useTraceNewSchema bool,
fluxIntervalForTraceDetail time.Duration, fluxIntervalForTraceDetail time.Duration,
cache cache.Cache, cache cache.Cache,
) *ClickHouseReader { ) *ClickHouseReader {
options := NewOptions(primaryNamespace, archiveNamespace)
datasource := os.Getenv("ClickHouseUrl")
options := NewOptions(datasource, maxIdleConns, maxOpenConns, dialTimeout, primaryNamespace, archiveNamespace)
db, err := initialize(options)
if err != nil {
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
}
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
} }
@ -208,29 +198,6 @@ func NewReaderFromClickhouseConnection(
os.Exit(1) os.Exit(1)
} }
regex := os.Getenv("ClickHouseOptimizeReadInOrderRegex")
var regexCompiled *regexp.Regexp
if regex != "" {
regexCompiled, err = regexp.Compile(regex)
if err != nil {
zap.L().Error("Incorrect regex for ClickHouseOptimizeReadInOrderRegex")
os.Exit(1)
}
}
wrap := clickhouseConnWrapper{
conn: db,
settings: ClickhouseQuerySettings{
MaxExecutionTime: os.Getenv("ClickHouseMaxExecutionTime"),
MaxExecutionTimeLeaf: os.Getenv("ClickHouseMaxExecutionTimeLeaf"),
TimeoutBeforeCheckingExecutionSpeed: os.Getenv("ClickHouseTimeoutBeforeCheckingExecutionSpeed"),
MaxBytesToRead: os.Getenv("ClickHouseMaxBytesToRead"),
OptimizeReadInOrderRegex: os.Getenv("ClickHouseOptimizeReadInOrderRegex"),
OptimizeReadInOrderRegexCompiled: regexCompiled,
MaxResultRowsForCHQuery: constants.MaxResultRowsForCHQuery,
},
}
logsTableName := options.primary.LogsTable logsTableName := options.primary.LogsTable
logsLocalTableName := options.primary.LogsLocalTable logsLocalTableName := options.primary.LogsLocalTable
if useLogsNewSchema { if useLogsNewSchema {
@ -246,7 +213,7 @@ func NewReaderFromClickhouseConnection(
} }
return &ClickHouseReader{ return &ClickHouseReader{
db: wrap, db: db,
localDB: localDB, localDB: localDB,
TraceDB: options.primary.TraceDB, TraceDB: options.primary.TraceDB,
alertManager: alertManager, alertManager: alertManager,
@ -438,28 +405,6 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config
return conf, nil return conf, nil
} }
func initialize(options *Options) (clickhouse.Conn, error) {
db, err := connect(options.getPrimary())
if err != nil {
return nil, fmt.Errorf("error connecting to primary db: %v", err)
}
return db, nil
}
func connect(cfg *namespaceConfig) (clickhouse.Conn, error) {
if cfg.Encoding != EncodingJSON && cfg.Encoding != EncodingProto {
return nil, fmt.Errorf("unknown encoding %q, supported: %q, %q", cfg.Encoding, EncodingJSON, EncodingProto)
}
return cfg.Connector(cfg)
}
func (r *ClickHouseReader) GetConn() clickhouse.Conn {
return r.db
}
func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) { func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
qry, err := r.queryEngine.NewInstantQuery(ctx, r.remoteStorage, nil, queryParams.Query, queryParams.Time) qry, err := r.queryEngine.NewInstantQuery(ctx, r.remoteStorage, nil, queryParams.Query, queryParams.Time)
if err != nil { if err != nil {

View File

@ -1,124 +0,0 @@
package clickhouseReader
import (
"context"
"encoding/json"
"regexp"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"go.signoz.io/signoz/pkg/query-service/common"
)
type ClickhouseQuerySettings struct {
MaxExecutionTime string
MaxExecutionTimeLeaf string
TimeoutBeforeCheckingExecutionSpeed string
MaxBytesToRead string
OptimizeReadInOrderRegex string
OptimizeReadInOrderRegexCompiled *regexp.Regexp
MaxResultRowsForCHQuery int
}
type clickhouseConnWrapper struct {
conn clickhouse.Conn
settings ClickhouseQuerySettings
}
func (c clickhouseConnWrapper) Close() error {
return c.conn.Close()
}
func (c clickhouseConnWrapper) Ping(ctx context.Context) error {
return c.conn.Ping(ctx)
}
func (c clickhouseConnWrapper) Stats() driver.Stats {
return c.conn.Stats()
}
func (c clickhouseConnWrapper) addClickHouseSettings(ctx context.Context, query string) context.Context {
settings := clickhouse.Settings{}
logComment := c.getLogComment(ctx)
if logComment != "" {
settings["log_comment"] = logComment
}
if ctx.Value("enforce_max_result_rows") != nil {
settings["max_result_rows"] = c.settings.MaxResultRowsForCHQuery
}
if c.settings.MaxBytesToRead != "" {
settings["max_bytes_to_read"] = c.settings.MaxBytesToRead
}
if c.settings.MaxExecutionTime != "" {
settings["max_execution_time"] = c.settings.MaxExecutionTime
}
if c.settings.MaxExecutionTimeLeaf != "" {
settings["max_execution_time_leaf"] = c.settings.MaxExecutionTimeLeaf
}
if c.settings.TimeoutBeforeCheckingExecutionSpeed != "" {
settings["timeout_before_checking_execution_speed"] = c.settings.TimeoutBeforeCheckingExecutionSpeed
}
// only list queries of
if c.settings.OptimizeReadInOrderRegex != "" && c.settings.OptimizeReadInOrderRegexCompiled.Match([]byte(query)) {
settings["optimize_read_in_order"] = 0
}
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
return ctx
}
func (c clickhouseConnWrapper) getLogComment(ctx context.Context) string {
// Get the key-value pairs from context for log comment
kv := ctx.Value(common.LogCommentKey)
if kv == nil {
return ""
}
logCommentKVs, ok := kv.(map[string]string)
if !ok {
return ""
}
logComment, _ := json.Marshal(logCommentKVs)
return string(logComment)
}
func (c clickhouseConnWrapper) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
return c.conn.Query(c.addClickHouseSettings(ctx, query), query, args...)
}
func (c clickhouseConnWrapper) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
return c.conn.QueryRow(c.addClickHouseSettings(ctx, query), query, args...)
}
func (c clickhouseConnWrapper) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
return c.conn.Select(c.addClickHouseSettings(ctx, query), dest, query, args...)
}
func (c clickhouseConnWrapper) Exec(ctx context.Context, query string, args ...interface{}) error {
return c.conn.Exec(c.addClickHouseSettings(ctx, query), query, args...)
}
func (c clickhouseConnWrapper) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
return c.conn.AsyncInsert(c.addClickHouseSettings(ctx, query), query, wait, args...)
}
func (c clickhouseConnWrapper) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
return c.conn.PrepareBatch(c.addClickHouseSettings(ctx, query), query, opts...)
}
func (c clickhouseConnWrapper) ServerVersion() (*driver.ServerVersion, error) {
return c.conn.ServerVersion()
}
func (c clickhouseConnWrapper) Contributors() []string {
return c.conn.Contributors()
}

View File

@ -97,10 +97,6 @@ type APIHandler struct {
temporalityMap map[string]map[v3.Temporality]bool temporalityMap map[string]map[v3.Temporality]bool
temporalityMux sync.Mutex temporalityMux sync.Mutex
maxIdleConns int
maxOpenConns int
dialTimeout time.Duration
IntegrationsController *integrations.Controller IntegrationsController *integrations.Controller
CloudIntegrationsController *cloudintegrations.Controller CloudIntegrationsController *cloudintegrations.Controller
@ -142,10 +138,6 @@ type APIHandlerOpts struct {
PreferSpanMetrics bool PreferSpanMetrics bool
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
// dao layer to perform crud on app objects like dashboard, alerts etc // dao layer to perform crud on app objects like dashboard, alerts etc
AppDao dao.ModelDao AppDao dao.ModelDao
@ -225,9 +217,6 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
skipConfig: opts.SkipConfig, skipConfig: opts.SkipConfig,
preferSpanMetrics: opts.PreferSpanMetrics, preferSpanMetrics: opts.PreferSpanMetrics,
temporalityMap: make(map[string]map[v3.Temporality]bool), temporalityMap: make(map[string]map[v3.Temporality]bool),
maxIdleConns: opts.MaxIdleConns,
maxOpenConns: opts.MaxOpenConns,
dialTimeout: opts.DialTimeout,
alertManager: alertManager, alertManager: alertManager,
ruleManager: opts.RuleManager, ruleManager: opts.RuleManager,
featureFlags: opts.FeatureFlags, featureFlags: opts.FeatureFlags,

View File

@ -1352,7 +1352,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
} }
testName := "name" testName := "name"
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", "", "archiveNamespace")
// iterate over test data, create reader and run test // iterate over test data, create reader and run test
for _, tc := range testCases { for _, tc := range testCases {

View File

@ -1406,7 +1406,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
} }
testName := "name" testName := "name"
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", "", "archiveNamespace")
// iterate over test data, create reader and run test // iterate over test data, create reader and run test
for _, tc := range testCases { for _, tc := range testCases {

View File

@ -62,9 +62,6 @@ type ServerOptions struct {
DisableRules bool DisableRules bool
RuleRepoURL string RuleRepoURL string
PreferSpanMetrics bool PreferSpanMetrics bool
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
CacheConfigPath string CacheConfigPath string
FluxInterval string FluxInterval string
FluxIntervalForTraceDetail string FluxIntervalForTraceDetail string
@ -132,11 +129,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
zap.L().Info("Using ClickHouse as datastore ...") zap.L().Info("Using ClickHouse as datastore ...")
clickhouseReader := clickhouseReader.NewReader( clickhouseReader := clickhouseReader.NewReader(
serverOptions.SigNoz.SQLStore.SQLxDB(), serverOptions.SigNoz.SQLStore.SQLxDB(),
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
serverOptions.PromConfigPath, serverOptions.PromConfigPath,
fm, fm,
serverOptions.MaxIdleConns,
serverOptions.MaxOpenConns,
serverOptions.DialTimeout,
serverOptions.Cluster, serverOptions.Cluster,
serverOptions.UseLogsNewSchema, serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema, serverOptions.UseTraceNewSchema,
@ -202,9 +197,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
Reader: reader, Reader: reader,
SkipConfig: skipConfig, SkipConfig: skipConfig,
PreferSpanMetrics: serverOptions.PreferSpanMetrics, PreferSpanMetrics: serverOptions.PreferSpanMetrics,
MaxIdleConns: serverOptions.MaxIdleConns,
MaxOpenConns: serverOptions.MaxOpenConns,
DialTimeout: serverOptions.DialTimeout,
AppDao: dao.DB(), AppDao: dao.DB(),
RuleManager: rm, RuleManager: rm,
FeatureFlags: fm, FeatureFlags: fm,

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"time" "time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/stats" "github.com/prometheus/prometheus/util/stats"
@ -85,7 +84,6 @@ type Reader interface {
) (*v3.QBFilterSuggestionsResponse, *model.ApiError) ) (*v3.QBFilterSuggestionsResponse, *model.ApiError)
// Connection needed for rules, not ideal but required // Connection needed for rules, not ideal but required
GetConn() clickhouse.Conn
GetQueryEngine() *promql.Engine GetQueryEngine() *promql.Engine
GetFanoutStorage() *storage.Storage GetFanoutStorage() *storage.Storage

View File

@ -85,6 +85,10 @@ func main() {
envprovider.NewFactory(), envprovider.NewFactory(),
fileprovider.NewFactory(), fileprovider.NewFactory(),
}, },
}, signoz.DeprecatedFlags{
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
}) })
if err != nil { if err != nil {
zap.L().Fatal("Failed to create config", zap.Error(err)) zap.L().Fatal("Failed to create config", zap.Error(err))
@ -104,9 +108,6 @@ func main() {
PrivateHostPort: constants.PrivateHostPort, PrivateHostPort: constants.PrivateHostPort,
DisableRules: disableRules, DisableRules: disableRules,
RuleRepoURL: ruleRepoURL, RuleRepoURL: ruleRepoURL,
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
CacheConfigPath: cacheConfigPath, CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval, FluxInterval: fluxInterval,
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail, FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,

View File

@ -1240,7 +1240,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}", "summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
} }
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
@ -1339,7 +1339,7 @@ func TestThresholdRuleNoData(t *testing.T) {
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}", "summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
} }
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
@ -1447,7 +1447,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}", "summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
} }
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
@ -1572,7 +1572,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}", "summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
} }
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)

View File

@ -40,7 +40,7 @@ func NewMockClickhouseReader(
require.Nil(t, err, "could not init mock clickhouse") require.Nil(t, err, "could not init mock clickhouse")
reader := clickhouseReader.NewReaderFromClickhouseConnection( reader := clickhouseReader.NewReaderFromClickhouseConnection(
mockDB, mockDB,
clickhouseReader.NewOptions("", 10, 10, 10*time.Second, ""), clickhouseReader.NewOptions("", ""),
testDB, testDB,
"", "",
featureFlags, featureFlags,

View File

@ -13,6 +13,7 @@ import (
"go.signoz.io/signoz/pkg/instrumentation" "go.signoz.io/signoz/pkg/instrumentation"
"go.signoz.io/signoz/pkg/sqlmigrator" "go.signoz.io/signoz/pkg/sqlmigrator"
"go.signoz.io/signoz/pkg/sqlstore" "go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/telemetrystore"
"go.signoz.io/signoz/pkg/web" "go.signoz.io/signoz/pkg/web"
) )
@ -35,9 +36,20 @@ type Config struct {
// API Server config // API Server config
APIServer apiserver.Config `mapstructure:"apiserver"` APIServer apiserver.Config `mapstructure:"apiserver"`
// TelemetryStore config
TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"`
} }
func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig) (Config, error) { // DeprecatedFlags are the flags that are deprecated and scheduled for removal.
// These flags are used to ensure backward compatibility with the old flags.
type DeprecatedFlags struct {
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
}
func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprecatedFlags DeprecatedFlags) (Config, error) {
configFactories := []factory.ConfigFactory{ configFactories := []factory.ConfigFactory{
instrumentation.NewConfigFactory(), instrumentation.NewConfigFactory(),
web.NewConfigFactory(), web.NewConfigFactory(),
@ -45,6 +57,7 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig) (Confi
sqlstore.NewConfigFactory(), sqlstore.NewConfigFactory(),
sqlmigrator.NewConfigFactory(), sqlmigrator.NewConfigFactory(),
apiserver.NewConfigFactory(), apiserver.NewConfigFactory(),
telemetrystore.NewConfigFactory(),
} }
conf, err := config.New(ctx, resolverConfig, configFactories) conf, err := config.New(ctx, resolverConfig, configFactories)
@ -57,12 +70,12 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig) (Confi
return Config{}, err return Config{}, err
} }
mergeAndEnsureBackwardCompatibility(&config) mergeAndEnsureBackwardCompatibility(&config, deprecatedFlags)
return config, nil return config, nil
} }
func mergeAndEnsureBackwardCompatibility(config *Config) { func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags DeprecatedFlags) {
// SIGNOZ_LOCAL_DB_PATH // SIGNOZ_LOCAL_DB_PATH
if os.Getenv("SIGNOZ_LOCAL_DB_PATH") != "" { if os.Getenv("SIGNOZ_LOCAL_DB_PATH") != "" {
fmt.Println("[Deprecated] env SIGNOZ_LOCAL_DB_PATH is deprecated and scheduled for removal. Please use SIGNOZ_SQLSTORE_SQLITE_PATH instead.") fmt.Println("[Deprecated] env SIGNOZ_LOCAL_DB_PATH is deprecated and scheduled for removal. Please use SIGNOZ_SQLSTORE_SQLITE_PATH instead.")
@ -87,4 +100,21 @@ func mergeAndEnsureBackwardCompatibility(config *Config) {
fmt.Println("Error parsing CONTEXT_TIMEOUT_MAX_ALLOWED, using default value of 600s") fmt.Println("Error parsing CONTEXT_TIMEOUT_MAX_ALLOWED, using default value of 600s")
} }
} }
if os.Getenv("ClickHouseUrl") != "" {
fmt.Println("[Deprecated] env ClickHouseUrl is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN instead.")
config.TelemetryStore.ClickHouse.DSN = os.Getenv("ClickHouseUrl")
}
if deprecatedFlags.MaxIdleConns != 50 {
fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS env variable instead.")
config.TelemetryStore.Connection.MaxIdleConns = deprecatedFlags.MaxIdleConns
}
if deprecatedFlags.MaxOpenConns != 100 {
fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS env variable instead.")
config.TelemetryStore.Connection.MaxOpenConns = deprecatedFlags.MaxOpenConns
}
if deprecatedFlags.DialTimeout != 5*time.Second {
fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT environment variable instead.")
config.TelemetryStore.Connection.DialTimeout = deprecatedFlags.DialTimeout
}
} }

View File

@ -8,6 +8,9 @@ import (
"go.signoz.io/signoz/pkg/sqlmigration" "go.signoz.io/signoz/pkg/sqlmigration"
"go.signoz.io/signoz/pkg/sqlstore" "go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/sqlstore/sqlitesqlstore" "go.signoz.io/signoz/pkg/sqlstore/sqlitesqlstore"
"go.signoz.io/signoz/pkg/telemetrystore"
"go.signoz.io/signoz/pkg/telemetrystore/clickhousetelemetrystore"
"go.signoz.io/signoz/pkg/telemetrystore/telemetrystorehook"
"go.signoz.io/signoz/pkg/web" "go.signoz.io/signoz/pkg/web"
"go.signoz.io/signoz/pkg/web/noopweb" "go.signoz.io/signoz/pkg/web/noopweb"
"go.signoz.io/signoz/pkg/web/routerweb" "go.signoz.io/signoz/pkg/web/routerweb"
@ -25,9 +28,13 @@ type ProviderConfig struct {
// Map of all sql migration provider factories // Map of all sql migration provider factories
SQLMigrationProviderFactories factory.NamedMap[factory.ProviderFactory[sqlmigration.SQLMigration, sqlmigration.Config]] SQLMigrationProviderFactories factory.NamedMap[factory.ProviderFactory[sqlmigration.SQLMigration, sqlmigration.Config]]
// Map of all telemetrystore provider factories
TelemetryStoreProviderFactories factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]]
} }
func NewProviderConfig() ProviderConfig { func NewProviderConfig() ProviderConfig {
hook := telemetrystorehook.NewFactory()
return ProviderConfig{ return ProviderConfig{
CacheProviderFactories: factory.MustNewNamedMap( CacheProviderFactories: factory.MustNewNamedMap(
memorycache.NewFactory(), memorycache.NewFactory(),
@ -50,5 +57,8 @@ func NewProviderConfig() ProviderConfig {
sqlmigration.NewAddPipelinesFactory(), sqlmigration.NewAddPipelinesFactory(),
sqlmigration.NewAddIntegrationsFactory(), sqlmigration.NewAddIntegrationsFactory(),
), ),
TelemetryStoreProviderFactories: factory.MustNewNamedMap(
clickhousetelemetrystore.NewFactory(hook),
),
} }
} }

View File

@ -7,6 +7,7 @@ import (
"go.signoz.io/signoz/pkg/factory" "go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/instrumentation" "go.signoz.io/signoz/pkg/instrumentation"
"go.signoz.io/signoz/pkg/sqlstore" "go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/telemetrystore"
"go.signoz.io/signoz/pkg/version" "go.signoz.io/signoz/pkg/version"
"go.signoz.io/signoz/pkg/web" "go.signoz.io/signoz/pkg/web"
@ -16,6 +17,7 @@ type SigNoz struct {
Cache cache.Cache Cache cache.Cache
Web web.Web Web web.Web
SQLStore sqlstore.SQLStore SQLStore sqlstore.SQLStore
TelemetryStore telemetrystore.TelemetryStore
} }
func New( func New(
@ -68,9 +70,21 @@ func New(
return nil, err return nil, err
} }
telemetrystore, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
config.TelemetryStore,
providerConfig.TelemetryStoreProviderFactories,
config.TelemetryStore.Provider,
)
if err != nil {
return nil, err
}
return &SigNoz{ return &SigNoz{
Cache: cache, Cache: cache,
Web: web, Web: web,
SQLStore: sqlstore, SQLStore: sqlstore,
TelemetryStore: telemetrystore,
}, nil }, nil
} }

View File

@ -0,0 +1,120 @@
package clickhousetelemetrystore
import (
"context"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/telemetrystore"
)
type provider struct {
settings factory.ScopedProviderSettings
clickHouseConn clickhouse.Conn
hooks []telemetrystore.TelemetryStoreHook
}
func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config]) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
return factory.NewProviderFactory(factory.MustNewName("clickhouse"), func(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStore, error) {
// we want to fail fast so we have hook registration errors before creating the telemetry store
hooks := make([]telemetrystore.TelemetryStoreHook, len(hookFactories))
for i, hookFactory := range hookFactories {
hook, err := hookFactory.New(ctx, providerSettings, config)
if err != nil {
return nil, err
}
hooks[i] = hook
}
return New(ctx, providerSettings, config, hooks...)
})
}
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hooks ...telemetrystore.TelemetryStoreHook) (telemetrystore.TelemetryStore, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/telemetrystore/clickhousetelemetrystore")
options, err := clickhouse.ParseDSN(config.ClickHouse.DSN)
if err != nil {
return nil, err
}
options.MaxIdleConns = config.Connection.MaxIdleConns
options.MaxOpenConns = config.Connection.MaxOpenConns
options.DialTimeout = config.Connection.DialTimeout
chConn, err := clickhouse.Open(options)
if err != nil {
return nil, err
}
return &provider{
settings: settings,
clickHouseConn: chConn,
hooks: hooks,
}, nil
}
func (p *provider) ClickHouseDB() clickhouse.Conn {
return p
}
func (p provider) Close() error {
return p.clickHouseConn.Close()
}
func (p provider) Ping(ctx context.Context) error {
return p.clickHouseConn.Ping(ctx)
}
func (p provider) Stats() driver.Stats {
return p.clickHouseConn.Stats()
}
func (p provider) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
rows, err := p.clickHouseConn.Query(ctx, query, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, rows, err)
return rows, err
}
func (p provider) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
row := p.clickHouseConn.QueryRow(ctx, query, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, nil)
return row
}
func (p provider) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
err := p.clickHouseConn.Select(ctx, dest, query, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
return err
}
func (p provider) Exec(ctx context.Context, query string, args ...interface{}) error {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
err := p.clickHouseConn.Exec(ctx, query, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
return err
}
func (p provider) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...)
err := p.clickHouseConn.AsyncInsert(ctx, query, wait, args...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
return err
}
func (p provider) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
ctx, query, args := telemetrystore.WrapBeforeQuery(p.hooks, ctx, query)
batch, err := p.clickHouseConn.PrepareBatch(ctx, query, opts...)
telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err)
return batch, err
}
func (p provider) ServerVersion() (*driver.ServerVersion, error) {
return p.clickHouseConn.ServerVersion()
}
func (p provider) Contributors() []string {
return p.clickHouseConn.Contributors()
}

View File

@ -0,0 +1,62 @@
package telemetrystore
import (
"time"
"go.signoz.io/signoz/pkg/factory"
)
type Config struct {
// Provider is the provider to use
Provider string `mapstructure:"provider"`
// Connection is the connection configuration
Connection ConnectionConfig `mapstructure:",squash"`
// Clickhouse is the clickhouse configuration
ClickHouse ClickHouseConfig `mapstructure:"clickhouse"`
}
type ConnectionConfig struct {
// MaxOpenConns is the maximum number of open connections to the database.
MaxOpenConns int `mapstructure:"max_open_conns"`
MaxIdleConns int `mapstructure:"max_idle_conns"`
DialTimeout time.Duration `mapstructure:"dial_timeout"`
}
type ClickHouseQuerySettings struct {
MaxExecutionTime int `mapstructure:"max_execution_time"`
MaxExecutionTimeLeaf int `mapstructure:"max_execution_time_leaf"`
TimeoutBeforeCheckingExecutionSpeed int `mapstructure:"timeout_before_checking_execution_speed"`
MaxBytesToRead int `mapstructure:"max_bytes_to_read"`
MaxResultRowsForCHQuery int `mapstructure:"max_result_rows_for_ch_query"`
}
type ClickHouseConfig struct {
DSN string `mapstructure:"dsn"`
QuerySettings ClickHouseQuerySettings `mapstructure:"settings"`
}
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("telemetrystore"), newConfig)
}
func newConfig() factory.Config {
return Config{
Provider: "clickhouse",
Connection: ConnectionConfig{
MaxOpenConns: 100,
MaxIdleConns: 50,
DialTimeout: 5 * time.Second,
},
ClickHouse: ClickHouseConfig{
DSN: "http://localhost:9000",
// No default query settings, as default's are set in ch config
},
}
}
func (c Config) Validate() error {
return nil
}

View File

@ -0,0 +1,95 @@
package telemetrystore
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/config"
"go.signoz.io/signoz/pkg/config/envprovider"
"go.signoz.io/signoz/pkg/factory"
)
func TestNewWithEnvProvider(t *testing.T) {
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN", "http://localhost:9000")
t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS", "60")
t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS", "150")
t.Setenv("SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT", "5s")
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DEBUG", "true")
conf, err := config.New(
context.Background(),
config.ResolverConfig{
Uris: []string{"env:"},
ProviderFactories: []config.ProviderFactory{
envprovider.NewFactory(),
},
},
[]factory.ConfigFactory{
NewConfigFactory(),
},
)
require.NoError(t, err)
actual := &Config{}
err = conf.Unmarshal("telemetrystore", actual)
require.NoError(t, err)
expected := &Config{
Provider: "clickhouse",
Connection: ConnectionConfig{
MaxOpenConns: 150,
MaxIdleConns: 60,
DialTimeout: 5 * time.Second,
},
ClickHouse: ClickHouseConfig{
DSN: "http://localhost:9000",
},
}
assert.Equal(t, expected, actual)
}
func TestNewWithEnvProviderWithQuerySettings(t *testing.T) {
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__EXECUTION__TIME", "10")
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__EXECUTION__TIME__LEAF", "10")
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_TIMEOUT__BEFORE__CHECKING__EXECUTION__SPEED", "10")
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__BYTES__TO__READ", "1000000")
t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__RESULT__ROWS__FOR__CH__QUERY", "10000")
conf, err := config.New(
context.Background(),
config.ResolverConfig{
Uris: []string{"env:"},
ProviderFactories: []config.ProviderFactory{
envprovider.NewFactory(),
},
},
[]factory.ConfigFactory{
NewConfigFactory(),
},
)
require.NoError(t, err)
actual := &Config{}
err = conf.Unmarshal("telemetrystore", actual)
require.NoError(t, err)
expected := &Config{
ClickHouse: ClickHouseConfig{
QuerySettings: ClickHouseQuerySettings{
MaxExecutionTime: 10,
MaxExecutionTimeLeaf: 10,
TimeoutBeforeCheckingExecutionSpeed: 10,
MaxBytesToRead: 1000000,
MaxResultRowsForCHQuery: 10000,
},
},
}
assert.Equal(t, expected.ClickHouse.QuerySettings, actual.ClickHouse.QuerySettings)
}

View File

@ -0,0 +1,32 @@
package telemetrystore
import (
"context"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)
type TelemetryStore interface {
// Returns the SigNoz Wrapper for Clickhouse
ClickHouseDB() clickhouse.Conn
}
type TelemetryStoreHook interface {
BeforeQuery(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{})
AfterQuery(ctx context.Context, query string, args []interface{}, rows driver.Rows, err error)
}
func WrapBeforeQuery(hooks []TelemetryStoreHook, ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
for _, hook := range hooks {
ctx, query, args = hook.BeforeQuery(ctx, query, args...)
}
return ctx, query, args
}
// runAfterHooks executes all after hooks in order
func WrapAfterQuery(hooks []TelemetryStoreHook, ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) {
for _, hook := range hooks {
hook.AfterQuery(ctx, query, args, rows, err)
}
}

View File

@ -0,0 +1,85 @@
package telemetrystorehook
import (
"context"
"encoding/json"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/telemetrystore"
)
type provider struct {
settings telemetrystore.ClickHouseQuerySettings
}
func NewFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return factory.NewProviderFactory(factory.MustNewName("clickhousesettings"), New)
}
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
return &provider{
settings: config.ClickHouse.QuerySettings,
}, nil
}
func (h *provider) BeforeQuery(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
return h.clickHouseSettings(ctx, query, args...)
}
func (h *provider) AfterQuery(ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) {
return
}
// clickHouseSettings adds clickhouse settings to queries
func (h *provider) clickHouseSettings(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) {
settings := clickhouse.Settings{}
// Apply default settings
logComment := h.getLogComment(ctx)
if logComment != "" {
settings["log_comment"] = logComment
}
if ctx.Value("enforce_max_result_rows") != nil {
settings["max_result_rows"] = h.settings.MaxResultRowsForCHQuery
}
if h.settings.MaxBytesToRead != 0 {
settings["max_bytes_to_read"] = h.settings.MaxBytesToRead
}
if h.settings.MaxExecutionTime != 0 {
settings["max_execution_time"] = h.settings.MaxExecutionTime
}
if h.settings.MaxExecutionTimeLeaf != 0 {
settings["max_execution_time_leaf"] = h.settings.MaxExecutionTimeLeaf
}
if h.settings.TimeoutBeforeCheckingExecutionSpeed != 0 {
settings["timeout_before_checking_execution_speed"] = h.settings.TimeoutBeforeCheckingExecutionSpeed
}
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
return ctx, query, args
}
func (h *provider) getLogComment(ctx context.Context) string {
// Get the key-value pairs from context for log comment
kv := ctx.Value(common.LogCommentKey)
if kv == nil {
return ""
}
logCommentKVs, ok := kv.(map[string]string)
if !ok {
return ""
}
logComment, _ := json.Marshal(logCommentKVs)
return string(logComment)
}

View File

@ -0,0 +1,34 @@
package telemetrystoretest
import (
"github.com/ClickHouse/clickhouse-go/v2"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
)
// Provider represents a mock telemetry store provider for testing
type Provider struct {
mock cmock.ClickConnMockCommon
}
// New creates a new mock telemetry store provider
func New() (*Provider, error) {
options := &clickhouse.Options{} // Default options
mock, err := cmock.NewClickHouseNative(options)
if err != nil {
return nil, err
}
return &Provider{
mock: mock,
}, nil
}
// Clickhouse returns the mock Clickhouse connection
func (p *Provider) Clickhouse() clickhouse.Conn {
return p.mock.(clickhouse.Conn)
}
// Mock returns the underlying Clickhouse mock instance for setting expectations
func (p *Provider) Mock() cmock.ClickConnMockCommon {
return p.mock
}

View File

@ -0,0 +1,44 @@
package telemetrystoretest
import (
"testing"
"github.com/ClickHouse/clickhouse-go/v2"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/assert"
)
func TestNew(t *testing.T) {
tests := []struct {
name string
wantErr bool
}{
{
name: "should create new provider successfully",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider, err := New()
if tt.wantErr {
assert.Error(t, err)
assert.Nil(t, provider)
return
}
assert.NoError(t, err)
assert.NotNil(t, provider)
assert.NotNil(t, provider.Mock())
assert.NotNil(t, provider.Clickhouse())
// Verify the returned interfaces implement the expected types
_, ok := provider.Mock().(cmock.ClickConnMockCommon)
assert.True(t, ok, "Mock() should return cmock.ClickConnMockCommon")
_, ok = provider.Clickhouse().(clickhouse.Conn)
assert.True(t, ok, "Clickhouse() should return clickhouse.Conn")
})
}
}