mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-14 04:45:57 +08:00
feat: ability to configure noisy top level operations to discard (#2978)
This commit is contained in:
parent
0f998a4845
commit
20687d5184
@ -16,6 +16,7 @@ import (
|
||||
|
||||
type APIHandlerOptions struct {
|
||||
DataConnector interfaces.DataConnector
|
||||
SkipConfig *basemodel.SkipConfig
|
||||
AppDao dao.ModelDao
|
||||
RulesManager *rules.Manager
|
||||
FeatureFlags baseint.FeatureLookup
|
||||
@ -32,6 +33,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
|
||||
|
||||
baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{
|
||||
Reader: opts.DataConnector,
|
||||
SkipConfig: opts.SkipConfig,
|
||||
AppDao: opts.AppDao,
|
||||
RuleManager: opts.RulesManager,
|
||||
FeatureFlags: opts.FeatureFlags})
|
||||
|
@ -49,9 +49,10 @@ import (
|
||||
const AppDbEngine = "sqlite"
|
||||
|
||||
type ServerOptions struct {
|
||||
PromConfigPath string
|
||||
HTTPHostPort string
|
||||
PrivateHostPort string
|
||||
PromConfigPath string
|
||||
SkipTopLvlOpsPath string
|
||||
HTTPHostPort string
|
||||
PrivateHostPort string
|
||||
// alert specific params
|
||||
DisableRules bool
|
||||
RuleRepoURL string
|
||||
@ -119,7 +120,15 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
go qb.Start(readerReady)
|
||||
reader = qb
|
||||
} else {
|
||||
return nil, fmt.Errorf("Storage type: %s is not supported in query service", storage)
|
||||
return nil, fmt.Errorf("storage type: %s is not supported in query service", storage)
|
||||
}
|
||||
skipConfig := &basemodel.SkipConfig{}
|
||||
if serverOptions.SkipTopLvlOpsPath != "" {
|
||||
// read skip config
|
||||
skipConfig, err = basemodel.ReadSkipConfig(serverOptions.SkipTopLvlOpsPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
<-readerReady
|
||||
@ -160,6 +169,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
|
||||
apiOpts := api.APIHandlerOptions{
|
||||
DataConnector: reader,
|
||||
SkipConfig: skipConfig,
|
||||
AppDao: modelDao,
|
||||
RulesManager: rm,
|
||||
FeatureFlags: lm,
|
||||
|
@ -74,7 +74,7 @@ func initZapLog(enableQueryServiceLogOTLPExport bool) *zap.Logger {
|
||||
}
|
||||
|
||||
func main() {
|
||||
var promConfigPath string
|
||||
var promConfigPath, skipTopLvlOpsPath string
|
||||
|
||||
// disables rule execution but allows change to the rule definition
|
||||
var disableRules bool
|
||||
@ -85,6 +85,7 @@ func main() {
|
||||
var enableQueryServiceLogOTLPExport bool
|
||||
|
||||
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
|
||||
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
|
||||
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
|
||||
flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)")
|
||||
flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)")
|
||||
@ -98,11 +99,12 @@ func main() {
|
||||
version.PrintVersion()
|
||||
|
||||
serverOptions := &app.ServerOptions{
|
||||
HTTPHostPort: baseconst.HTTPHostPort,
|
||||
PromConfigPath: promConfigPath,
|
||||
PrivateHostPort: baseconst.PrivateHostPort,
|
||||
DisableRules: disableRules,
|
||||
RuleRepoURL: ruleRepoURL,
|
||||
HTTPHostPort: baseconst.HTTPHostPort,
|
||||
PromConfigPath: promConfigPath,
|
||||
SkipTopLvlOpsPath: skipTopLvlOpsPath,
|
||||
PrivateHostPort: baseconst.PrivateHostPort,
|
||||
DisableRules: disableRules,
|
||||
RuleRepoURL: ruleRepoURL,
|
||||
}
|
||||
|
||||
// Read the jwt secret key
|
||||
|
@ -716,7 +716,7 @@ func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, erro
|
||||
return &services, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context) (*map[string][]string, *model.ApiError) {
|
||||
func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig) (*map[string][]string, *model.ApiError) {
|
||||
|
||||
operations := map[string][]string{}
|
||||
query := fmt.Sprintf(`SELECT DISTINCT name, serviceName FROM %s.%s`, r.TraceDB, r.topLevelOperationsTable)
|
||||
@ -737,18 +737,21 @@ func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context) (*map[stri
|
||||
if _, ok := operations[serviceName]; !ok {
|
||||
operations[serviceName] = []string{}
|
||||
}
|
||||
if skipConfig.ShouldSkip(serviceName, name) {
|
||||
continue
|
||||
}
|
||||
operations[serviceName] = append(operations[serviceName], name)
|
||||
}
|
||||
return &operations, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
||||
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) {
|
||||
|
||||
if r.indexTable == "" {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
|
||||
}
|
||||
|
||||
topLevelOps, apiErr := r.GetTopLevelOperations(ctx)
|
||||
topLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig)
|
||||
if apiErr != nil {
|
||||
return nil, apiErr
|
||||
}
|
||||
@ -839,9 +842,9 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
|
||||
return &serviceItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, *model.ApiError) {
|
||||
func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError) {
|
||||
|
||||
topLevelOps, apiErr := r.GetTopLevelOperations(ctx)
|
||||
topLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig)
|
||||
if apiErr != nil {
|
||||
return nil, apiErr
|
||||
}
|
||||
|
@ -63,6 +63,7 @@ type APIHandler struct {
|
||||
basePath string
|
||||
apiPrefix string
|
||||
reader interfaces.Reader
|
||||
skipConfig *model.SkipConfig
|
||||
appDao dao.ModelDao
|
||||
alertManager am.Manager
|
||||
ruleManager *rules.Manager
|
||||
@ -81,6 +82,7 @@ type APIHandlerOpts struct {
|
||||
// business data reader e.g. clickhouse
|
||||
Reader interfaces.Reader
|
||||
|
||||
SkipConfig *model.SkipConfig
|
||||
// dao layer to perform crud on app objects like dashboard, alerts etc
|
||||
AppDao dao.ModelDao
|
||||
|
||||
@ -102,6 +104,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
aH := &APIHandler{
|
||||
reader: opts.Reader,
|
||||
appDao: opts.AppDao,
|
||||
skipConfig: opts.SkipConfig,
|
||||
alertManager: alertManager,
|
||||
ruleManager: opts.RuleManager,
|
||||
featureFlags: opts.FeatureFlags,
|
||||
@ -1316,7 +1319,7 @@ func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request)
|
||||
return
|
||||
}
|
||||
|
||||
result, apiErr := aH.reader.GetServiceOverview(r.Context(), query)
|
||||
result, apiErr := aH.reader.GetServiceOverview(r.Context(), query, aH.skipConfig)
|
||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
@ -1327,7 +1330,7 @@ func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request)
|
||||
|
||||
func (aH *APIHandler) getServicesTopLevelOps(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
result, apiErr := aH.reader.GetTopLevelOperations(r.Context())
|
||||
result, apiErr := aH.reader.GetTopLevelOperations(r.Context(), aH.skipConfig)
|
||||
if apiErr != nil {
|
||||
RespondError(w, apiErr, nil)
|
||||
return
|
||||
@ -1343,7 +1346,7 @@ func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
result, apiErr := aH.reader.GetServices(r.Context(), query)
|
||||
result, apiErr := aH.reader.GetServices(r.Context(), query, aH.skipConfig)
|
||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
|
@ -41,9 +41,10 @@ import (
|
||||
)
|
||||
|
||||
type ServerOptions struct {
|
||||
PromConfigPath string
|
||||
HTTPHostPort string
|
||||
PrivateHostPort string
|
||||
PromConfigPath string
|
||||
SkipTopLvlOpsPath string
|
||||
HTTPHostPort string
|
||||
PrivateHostPort string
|
||||
// alert specific params
|
||||
DisableRules bool
|
||||
RuleRepoURL string
|
||||
@ -105,6 +106,14 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
} else {
|
||||
return nil, fmt.Errorf("Storage type: %s is not supported in query service", storage)
|
||||
}
|
||||
var skipConfig *model.SkipConfig
|
||||
if serverOptions.SkipTopLvlOpsPath != "" {
|
||||
// read skip config
|
||||
skipConfig, err = model.ReadSkipConfig(serverOptions.SkipTopLvlOpsPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
<-readerReady
|
||||
rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules, fm)
|
||||
@ -115,6 +124,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
telemetry.GetInstance().SetReader(reader)
|
||||
apiHandler, err := NewAPIHandler(APIHandlerOpts{
|
||||
Reader: reader,
|
||||
SkipConfig: skipConfig,
|
||||
AppDao: dao.DB(),
|
||||
RuleManager: rm,
|
||||
FeatureFlags: fm,
|
||||
|
@ -21,9 +21,9 @@ type Reader interface {
|
||||
|
||||
GetInstantQueryMetricsResult(ctx context.Context, query *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
|
||||
GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
|
||||
GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, *model.ApiError)
|
||||
GetTopLevelOperations(ctx context.Context) (*map[string][]string, *model.ApiError)
|
||||
GetServices(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError)
|
||||
GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError)
|
||||
GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig) (*map[string][]string, *model.ApiError)
|
||||
GetServices(ctx context.Context, query *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError)
|
||||
GetTopOperations(ctx context.Context, query *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError)
|
||||
GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error)
|
||||
GetServicesList(ctx context.Context) (*[]string, error)
|
||||
|
@ -26,7 +26,7 @@ func initZapLog() *zap.Logger {
|
||||
}
|
||||
|
||||
func main() {
|
||||
var promConfigPath string
|
||||
var promConfigPath, skipTopLvlOpsPath string
|
||||
|
||||
// disables rule execution but allows change to the rule definition
|
||||
var disableRules bool
|
||||
@ -35,6 +35,7 @@ func main() {
|
||||
var ruleRepoURL string
|
||||
|
||||
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
|
||||
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
|
||||
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
|
||||
flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)")
|
||||
flag.Parse()
|
||||
@ -47,11 +48,12 @@ func main() {
|
||||
version.PrintVersion()
|
||||
|
||||
serverOptions := &app.ServerOptions{
|
||||
HTTPHostPort: constants.HTTPHostPort,
|
||||
PromConfigPath: promConfigPath,
|
||||
PrivateHostPort: constants.PrivateHostPort,
|
||||
DisableRules: disableRules,
|
||||
RuleRepoURL: ruleRepoURL,
|
||||
HTTPHostPort: constants.HTTPHostPort,
|
||||
PromConfigPath: promConfigPath,
|
||||
SkipTopLvlOpsPath: skipTopLvlOpsPath,
|
||||
PrivateHostPort: constants.PrivateHostPort,
|
||||
DisableRules: disableRules,
|
||||
RuleRepoURL: ruleRepoURL,
|
||||
}
|
||||
|
||||
// Read the jwt secret key
|
||||
|
57
pkg/query-service/model/config.go
Normal file
57
pkg/query-service/model/config.go
Normal file
@ -0,0 +1,57 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
type SkipConfig struct {
|
||||
Services []ServiceSkipConfig `yaml:"services"`
|
||||
}
|
||||
|
||||
type ServiceSkipConfig struct {
|
||||
Name string `yaml:"name"`
|
||||
Operations []string `yaml:"operations"`
|
||||
}
|
||||
|
||||
func (s *SkipConfig) ShouldSkip(serviceName, name string) bool {
|
||||
for _, service := range s.Services {
|
||||
if service.Name == serviceName {
|
||||
for _, operation := range service.Operations {
|
||||
if name == operation {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func ReadYaml(path string, v interface{}) error {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
decoder := yaml.NewDecoder(f)
|
||||
err = decoder.Decode(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ReadSkipConfig(path string) (*SkipConfig, error) {
|
||||
if path == "" {
|
||||
return &SkipConfig{}, nil
|
||||
}
|
||||
|
||||
skipConfig := &SkipConfig{}
|
||||
err := ReadYaml(path, skipConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return skipConfig, nil
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user