feat: use new schema flag (#5930)

This commit is contained in:
Nityananda Gohain 2024-09-12 10:58:07 +05:30 committed by GitHub
parent 6e7f04b492
commit 10ebd0cad6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 109 additions and 51 deletions

View File

@ -39,6 +39,8 @@ type APIHandlerOptions struct {
Gateway *httputil.ReverseProxy
// Querier Influx Interval
FluxInterval time.Duration
UseLogsNewSchema bool
}
type APIHandler struct {
@ -63,6 +65,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
LogsParsingPipelineController: opts.LogsParsingPipelineController,
Cache: opts.Cache,
FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema,
})
if err != nil {

View File

@ -25,8 +25,9 @@ func NewDataConnector(
maxOpenConns int,
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
) *ClickhouseReader {
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster)
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema)
return &ClickhouseReader{
conn: ch.GetConn(),
appdb: localDB,

View File

@ -77,6 +77,7 @@ type ServerOptions struct {
FluxInterval string
Cluster string
GatewayUrl string
UseLogsNewSchema bool
}
// Server runs HTTP api service
@ -154,6 +155,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.MaxOpenConns,
serverOptions.DialTimeout,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
)
go qb.Start(readerReady)
reader = qb
@ -176,7 +178,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
localDB,
reader,
serverOptions.DisableRules,
lm)
lm,
serverOptions.UseLogsNewSchema,
)
if err != nil {
return nil, err
@ -265,6 +269,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
Cache: c,
FluxInterval: fluxInterval,
Gateway: gatewayProxy,
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
}
apiHandler, err := api.NewAPIHandler(apiOpts)
@ -728,7 +733,8 @@ func makeRulesManager(
db *sqlx.DB,
ch baseint.Reader,
disableRules bool,
fm baseint.FeatureLookup) (*baserules.Manager, error) {
fm baseint.FeatureLookup,
useLogsNewSchema bool) (*baserules.Manager, error) {
// create engine
pqle, err := pqle.FromConfigPath(promConfigPath)
@ -756,7 +762,8 @@ func makeRulesManager(
Reader: ch,
EvalDelay: baseconst.GetEvalDelay(),
PrepareTaskFunc: rules.PrepareTaskFunc,
PrepareTaskFunc: rules.PrepareTaskFunc,
UseLogsNewSchema: useLogsNewSchema,
}
// create Manager

View File

@ -87,6 +87,7 @@ func main() {
var ruleRepoURL string
var cluster string
var useLogsNewSchema bool
var cacheConfigPath, fluxInterval string
var enableQueryServiceLogOTLPExport bool
var preferSpanMetrics bool
@ -96,6 +97,7 @@ func main() {
var dialTimeout time.Duration
var gatewayUrl string
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
@ -134,6 +136,7 @@ func main() {
FluxInterval: fluxInterval,
Cluster: cluster,
GatewayUrl: gatewayUrl,
UseLogsNewSchema: useLogsNewSchema,
}
// Read the jwt secret key

View File

@ -20,6 +20,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Rule,
opts.FF,
opts.Reader,
opts.UseLogsNewSchema,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
)

View File

@ -132,6 +132,8 @@ type ClickHouseReader struct {
liveTailRefreshSeconds int
cluster string
useLogsNewSchema bool
}
// NewTraceReader returns a TraceReader for the database
@ -143,6 +145,7 @@ func NewReader(
maxOpenConns int,
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
) *ClickHouseReader {
datasource := os.Getenv("ClickHouseUrl")
@ -153,7 +156,7 @@ func NewReader(
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
}
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster)
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema)
}
func NewReaderFromClickhouseConnection(
@ -163,6 +166,7 @@ func NewReaderFromClickhouseConnection(
configFile string,
featureFlag interfaces.FeatureLookup,
cluster string,
useLogsNewSchema bool,
) *ClickHouseReader {
alertManager, err := am.New("")
if err != nil {
@ -219,6 +223,7 @@ func NewReaderFromClickhouseConnection(
featureFlags: featureFlag,
cluster: cluster,
queryProgressTracker: queryprogress.NewQueryProgressTracker(),
useLogsNewSchema: useLogsNewSchema,
}
}

View File

@ -105,6 +105,8 @@ type APIHandler struct {
// Websocket connection upgrader
Upgrader *websocket.Upgrader
UseLogsNewSchema bool
}
type APIHandlerOpts struct {
@ -140,6 +142,9 @@ type APIHandlerOpts struct {
// Querier Influx Interval
FluxInterval time.Duration
// Use new schema
UseLogsNewSchema bool
}
// NewAPIHandler returns an APIHandler
@ -151,19 +156,21 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
}
querierOpts := querier.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema,
}
querierOptsV2 := querierV2.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
UseLogsNewSchema: opts.UseLogsNewSchema,
}
querier := querier.NewQuerier(querierOpts)
@ -185,6 +192,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
LogsParsingPipelineController: opts.LogsParsingPipelineController,
querier: querier,
querierV2: querierv2,
UseLogsNewSchema: opts.UseLogsNewSchema,
}
builderOpts := queryBuilder.QueryBuilderOptions{

View File

@ -54,6 +54,8 @@ type querier struct {
timeRanges [][]int
returnedSeries []*v3.Series
returnedErr error
UseLogsNewSchema bool
}
type QuerierOptions struct {
@ -64,9 +66,10 @@ type QuerierOptions struct {
FeatureLookup interfaces.FeatureLookup
// used for testing
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
UseLogsNewSchema bool
}
func NewQuerier(opts QuerierOptions) interfaces.Querier {

View File

@ -54,6 +54,8 @@ type querier struct {
timeRanges [][]int
returnedSeries []*v3.Series
returnedErr error
UseLogsNewSchema bool
}
type QuerierOptions struct {
@ -64,9 +66,10 @@ type QuerierOptions struct {
FeatureLookup interfaces.FeatureLookup
// used for testing
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
UseLogsNewSchema bool
}
func NewQuerier(opts QuerierOptions) interfaces.Querier {

View File

@ -66,6 +66,7 @@ type ServerOptions struct {
CacheConfigPath string
FluxInterval string
Cluster string
UseLogsNewSchema bool
}
// Server runs HTTP, Mux and a grpc server
@ -128,6 +129,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.MaxOpenConns,
serverOptions.DialTimeout,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
)
go clickhouseReader.Start(readerReady)
reader = clickhouseReader
@ -144,7 +146,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
<-readerReady
rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules, fm)
rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema)
if err != nil {
return nil, err
}
@ -197,6 +199,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
LogsParsingPipelineController: logParsingPipelineController,
Cache: c,
FluxInterval: fluxInterval,
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
})
if err != nil {
return nil, err
@ -713,7 +716,8 @@ func makeRulesManager(
db *sqlx.DB,
ch interfaces.Reader,
disableRules bool,
fm interfaces.FeatureLookup) (*rules.Manager, error) {
fm interfaces.FeatureLookup,
useLogsNewSchema bool) (*rules.Manager, error) {
// create engine
pqle, err := pqle.FromReader(ch)
@ -730,16 +734,17 @@ func makeRulesManager(
// create manager opts
managerOpts := &rules.ManagerOptions{
NotifierOpts: notifierOpts,
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),
Logger: nil,
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
EvalDelay: constants.GetEvalDelay(),
NotifierOpts: notifierOpts,
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),
Logger: nil,
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
EvalDelay: constants.GetEvalDelay(),
UseLogsNewSchema: useLogsNewSchema,
}
// create Manager

View File

@ -33,6 +33,7 @@ func main() {
// disables rule execution but allows change to the rule definition
var disableRules bool
var useLogsNewSchema bool
// the url used to build link in the alert messages in slack and other systems
var ruleRepoURL, cacheConfigPath, fluxInterval string
var cluster string
@ -43,6 +44,7 @@ func main() {
var maxOpenConns int
var dialTimeout time.Duration
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
@ -79,6 +81,7 @@ func main() {
CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval,
Cluster: cluster,
UseLogsNewSchema: useLogsNewSchema,
}
// Read the jwt secret key

View File

@ -35,6 +35,8 @@ type PrepareTaskOptions struct {
FF interfaces.FeatureLookup
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
UseLogsNewSchema bool
}
const taskNamesuffix = "webAppEditor"
@ -75,6 +77,8 @@ type ManagerOptions struct {
EvalDelay time.Duration
PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
UseLogsNewSchema bool
}
// The Manager manages recording and alerting rules.
@ -96,6 +100,8 @@ type Manager struct {
reader interfaces.Reader
prepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
UseLogsNewSchema bool
}
func defaultOptions(o *ManagerOptions) *ManagerOptions {
@ -130,6 +136,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.Rule,
opts.FF,
opts.Reader,
opts.UseLogsNewSchema,
WithEvalDelay(opts.ManagerOpts.EvalDelay),
)
@ -333,6 +340,8 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {
FF: m.featureFlags,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
UseLogsNewSchema: m.opts.UseLogsNewSchema,
})
if err != nil {
@ -452,6 +461,8 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error {
FF: m.featureFlags,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
UseLogsNewSchema: m.opts.UseLogsNewSchema,
})
for _, r := range newTask.Rules() {
@ -794,6 +805,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
parsedRule,
m.featureFlags,
m.reader,
m.opts.UseLogsNewSchema,
WithSendAlways(),
WithSendUnmatched(),
)

View File

@ -60,6 +60,7 @@ func NewThresholdRule(
p *PostableRule,
featureFlags interfaces.FeatureLookup,
reader interfaces.Reader,
useLogsNewSchema bool,
opts ...RuleOption,
) (*ThresholdRule, error) {
@ -77,17 +78,19 @@ func NewThresholdRule(
}
querierOption := querier.QuerierOptions{
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FeatureLookup: featureFlags,
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FeatureLookup: featureFlags,
UseLogsNewSchema: useLogsNewSchema,
}
querierOptsV2 := querierV2.QuerierOptions{
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FeatureLookup: featureFlags,
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FeatureLookup: featureFlags,
UseLogsNewSchema: useLogsNewSchema,
}
t.querier = querier.NewQuerier(querierOption)

View File

@ -685,7 +685,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@ -774,7 +774,7 @@ func TestPrepareLinksToLogs(t *testing.T) {
}
fm := featureManager.StartManager()
rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@ -816,7 +816,7 @@ func TestPrepareLinksToTraces(t *testing.T) {
}
fm := featureManager.StartManager()
rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@ -892,7 +892,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@ -945,7 +945,7 @@ func TestThresholdRuleEvalDelay(t *testing.T) {
fm := featureManager.StartManager()
for idx, c := range cases {
rule, err := NewThresholdRule("69", &postableRule, fm, nil) // no eval delay
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true) // no eval delay
if err != nil {
assert.NoError(t, err)
}
@ -994,7 +994,7 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) {
fm := featureManager.StartManager()
for idx, c := range cases {
rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@ -1135,9 +1135,9 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
}
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true)
rule.temporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
@ -1234,9 +1234,9 @@ func TestThresholdRuleNoData(t *testing.T) {
}
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "")
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true)
rule, err := NewThresholdRule("69", &postableRule, fm, reader)
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true)
rule.temporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,

View File

@ -45,6 +45,7 @@ func NewMockClickhouseReader(
"",
featureFlags,
"",
true,
)
return reader, mockDB