From ac86d840f970e10f20733c5ee2633eea8b39502e Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Mon, 12 Sep 2022 12:30:36 +0530 Subject: [PATCH] fix: reuse the query engine and storage for alerts pqlEngine (#1558) --- pkg/query-service/app/clickhouseReader/reader.go | 13 ++++++++++++- pkg/query-service/app/server.go | 6 ++++-- pkg/query-service/interfaces/interface.go | 3 +++ pkg/query-service/pqlEngine/engine.go | 11 ++++++++++- 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 024cb1cd65..cabb84f3a4 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -94,6 +94,7 @@ type ClickHouseReader struct { logsResourceKeys string queryEngine *promql.Engine remoteStorage *remote.Storage + fanoutStorage *storage.Storage promConfigFile string promConfig *config.Config @@ -143,7 +144,7 @@ func NewReader(localDB *sqlx.DB, configFile string) *ClickHouseReader { } } -func (r *ClickHouseReader) Start() { +func (r *ClickHouseReader) Start(readerReady chan bool) { logLevel := promlog.AllowedLevel{} logLevel.Set("debug") // allowedFormat := promlog.AllowedFormat{} @@ -311,6 +312,8 @@ func (r *ClickHouseReader) Start() { } r.queryEngine = queryEngine r.remoteStorage = remoteStorage + r.fanoutStorage = &fanoutStorage + readerReady <- true if err := g.Run(); err != nil { level.Error(logger).Log("err", err) @@ -319,6 +322,14 @@ func (r *ClickHouseReader) Start() { } +func (r *ClickHouseReader) GetQueryEngine() *promql.Engine { + return r.queryEngine +} + +func (r *ClickHouseReader) GetFanoutStorage() *storage.Storage { + return r.fanoutStorage +} + func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (promConfig *config.Config, err error) { level.Info(logger).Log("msg", "Loading configuration file", "filename", filename) diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index a34bb0b477..2a04302ef7 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -77,18 +77,20 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { } localDB.SetMaxOpenConns(10) + readerReady := make(chan bool) var reader interfaces.Reader storage := os.Getenv("STORAGE") if storage == "clickhouse" { zap.S().Info("Using ClickHouse as datastore ...") clickhouseReader := clickhouseReader.NewReader(localDB, serverOptions.PromConfigPath) - go clickhouseReader.Start() + go clickhouseReader.Start(readerReady) reader = clickhouseReader } else { return nil, fmt.Errorf("Storage type: %s is not supported in query service", storage) } + <-readerReady rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules) if err != nil { return nil, err @@ -362,7 +364,7 @@ func makeRulesManager( disableRules bool) (*rules.Manager, error) { // create engine - pqle, err := pqle.FromConfigPath(promConfigPath) + pqle, err := pqle.FromReader(ch) if err != nil { return nil, fmt.Errorf("failed to create pql engine : %v", err) } diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 9dc8b23b07..4f4da853c6 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -5,6 +5,7 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/stats" am "go.signoz.io/query-service/integrations/alertManager" "go.signoz.io/query-service/model" @@ -70,6 +71,8 @@ type Reader interface { // Connection needed for rules, not ideal but required GetConn() clickhouse.Conn + GetQueryEngine() *promql.Engine + GetFanoutStorage() *storage.Storage QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error) } diff --git a/pkg/query-service/pqlEngine/engine.go b/pkg/query-service/pqlEngine/engine.go index e9a45ad542..47bde314ee 100644 --- a/pkg/query-service/pqlEngine/engine.go +++ b/pkg/query-service/pqlEngine/engine.go @@ -3,6 +3,8 @@ package promql import ( "context" "fmt" + "time" + "github.com/go-kit/log" pmodel "github.com/prometheus/common/model" plog "github.com/prometheus/common/promlog" @@ -11,7 +13,7 @@ import ( pql "github.com/prometheus/prometheus/promql" pstorage "github.com/prometheus/prometheus/storage" premote "github.com/prometheus/prometheus/storage/remote" - "time" + "go.signoz.io/query-service/interfaces" ) type PqlEngine struct { @@ -29,6 +31,13 @@ func FromConfigPath(promConfigPath string) (*PqlEngine, error) { return NewPqlEngine(c) } +func FromReader(ch interfaces.Reader) (*PqlEngine, error) { + return &PqlEngine{ + engine: ch.GetQueryEngine(), + fanoutStorage: *ch.GetFanoutStorage(), + }, nil +} + func NewPqlEngine(config *pconfig.Config) (*PqlEngine, error) { logLevel := plog.AllowedLevel{}