mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-11 17:09:01 +08:00
fix: reuse the query engine and storage for alerts pqlEngine (#1558)
This commit is contained in:
parent
8556c87d46
commit
ac86d840f9
@ -94,6 +94,7 @@ type ClickHouseReader struct {
|
||||
logsResourceKeys string
|
||||
queryEngine *promql.Engine
|
||||
remoteStorage *remote.Storage
|
||||
fanoutStorage *storage.Storage
|
||||
|
||||
promConfigFile string
|
||||
promConfig *config.Config
|
||||
@ -143,7 +144,7 @@ func NewReader(localDB *sqlx.DB, configFile string) *ClickHouseReader {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) Start() {
|
||||
func (r *ClickHouseReader) Start(readerReady chan bool) {
|
||||
logLevel := promlog.AllowedLevel{}
|
||||
logLevel.Set("debug")
|
||||
// allowedFormat := promlog.AllowedFormat{}
|
||||
@ -311,6 +312,8 @@ func (r *ClickHouseReader) Start() {
|
||||
}
|
||||
r.queryEngine = queryEngine
|
||||
r.remoteStorage = remoteStorage
|
||||
r.fanoutStorage = &fanoutStorage
|
||||
readerReady <- true
|
||||
|
||||
if err := g.Run(); err != nil {
|
||||
level.Error(logger).Log("err", err)
|
||||
@ -319,6 +322,14 @@ func (r *ClickHouseReader) Start() {
|
||||
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetQueryEngine() *promql.Engine {
|
||||
return r.queryEngine
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetFanoutStorage() *storage.Storage {
|
||||
return r.fanoutStorage
|
||||
}
|
||||
|
||||
func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (promConfig *config.Config, err error) {
|
||||
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
|
||||
|
||||
|
@ -77,18 +77,20 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
}
|
||||
|
||||
localDB.SetMaxOpenConns(10)
|
||||
readerReady := make(chan bool)
|
||||
|
||||
var reader interfaces.Reader
|
||||
storage := os.Getenv("STORAGE")
|
||||
if storage == "clickhouse" {
|
||||
zap.S().Info("Using ClickHouse as datastore ...")
|
||||
clickhouseReader := clickhouseReader.NewReader(localDB, serverOptions.PromConfigPath)
|
||||
go clickhouseReader.Start()
|
||||
go clickhouseReader.Start(readerReady)
|
||||
reader = clickhouseReader
|
||||
} else {
|
||||
return nil, fmt.Errorf("Storage type: %s is not supported in query service", storage)
|
||||
}
|
||||
|
||||
<-readerReady
|
||||
rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -362,7 +364,7 @@ func makeRulesManager(
|
||||
disableRules bool) (*rules.Manager, error) {
|
||||
|
||||
// create engine
|
||||
pqle, err := pqle.FromConfigPath(promConfigPath)
|
||||
pqle, err := pqle.FromReader(ch)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create pql engine : %v", err)
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/stats"
|
||||
am "go.signoz.io/query-service/integrations/alertManager"
|
||||
"go.signoz.io/query-service/model"
|
||||
@ -70,6 +71,8 @@ type Reader interface {
|
||||
|
||||
// Connection needed for rules, not ideal but required
|
||||
GetConn() clickhouse.Conn
|
||||
GetQueryEngine() *promql.Engine
|
||||
GetFanoutStorage() *storage.Storage
|
||||
|
||||
QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error)
|
||||
}
|
||||
|
@ -3,6 +3,8 @@ package promql
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/log"
|
||||
pmodel "github.com/prometheus/common/model"
|
||||
plog "github.com/prometheus/common/promlog"
|
||||
@ -11,7 +13,7 @@ import (
|
||||
pql "github.com/prometheus/prometheus/promql"
|
||||
pstorage "github.com/prometheus/prometheus/storage"
|
||||
premote "github.com/prometheus/prometheus/storage/remote"
|
||||
"time"
|
||||
"go.signoz.io/query-service/interfaces"
|
||||
)
|
||||
|
||||
type PqlEngine struct {
|
||||
@ -29,6 +31,13 @@ func FromConfigPath(promConfigPath string) (*PqlEngine, error) {
|
||||
return NewPqlEngine(c)
|
||||
}
|
||||
|
||||
func FromReader(ch interfaces.Reader) (*PqlEngine, error) {
|
||||
return &PqlEngine{
|
||||
engine: ch.GetQueryEngine(),
|
||||
fanoutStorage: *ch.GetFanoutStorage(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewPqlEngine(config *pconfig.Config) (*PqlEngine, error) {
|
||||
|
||||
logLevel := plog.AllowedLevel{}
|
||||
|
Loading…
x
Reference in New Issue
Block a user