From abb8b2b1222f5dc2a724dedc18da7a6afec3a0a1 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 10 Aug 2023 17:20:34 +0530 Subject: [PATCH] fix: lowercase filter operator and increase default conn settings (#3310) --- ee/query-service/app/api/api.go | 7 ++++++ ee/query-service/app/db/reader.go | 13 ++++++++-- ee/query-service/app/server.go | 15 ++++++++++- ee/query-service/main.go | 10 ++++++++ .../app/clickhouseReader/options.go | 25 +++++++++++++++++-- .../app/clickhouseReader/reader.go | 11 ++++++-- pkg/query-service/app/http_handler.go | 12 +++++++++ pkg/query-service/app/server.go | 15 ++++++++++- .../app/traces/v3/query_builder.go | 1 + pkg/query-service/main.go | 11 ++++++++ 10 files changed, 112 insertions(+), 8 deletions(-) diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index 89a9ed24cb..9239be2d99 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -2,6 +2,7 @@ package api import ( "net/http" + "time" "github.com/gorilla/mux" "go.signoz.io/signoz/ee/query-service/dao" @@ -20,6 +21,9 @@ type APIHandlerOptions struct { SkipConfig *basemodel.SkipConfig PreferDelta bool PreferSpanMetrics bool + MaxIdleConns int + MaxOpenConns int + DialTimeout time.Duration AppDao dao.ModelDao RulesManager *rules.Manager FeatureFlags baseint.FeatureLookup @@ -40,6 +44,9 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) { SkipConfig: opts.SkipConfig, PerferDelta: opts.PreferDelta, PreferSpanMetrics: opts.PreferSpanMetrics, + MaxIdleConns: opts.MaxIdleConns, + MaxOpenConns: opts.MaxOpenConns, + DialTimeout: opts.DialTimeout, AppDao: opts.AppDao, RuleManager: opts.RulesManager, FeatureFlags: opts.FeatureFlags, diff --git a/ee/query-service/app/db/reader.go b/ee/query-service/app/db/reader.go index fc26ec3ce2..c0236548b1 100644 --- a/ee/query-service/app/db/reader.go +++ b/ee/query-service/app/db/reader.go @@ -1,6 +1,8 @@ package db import ( + "time" + "github.com/ClickHouse/clickhouse-go/v2" "github.com/jmoiron/sqlx" @@ -15,8 +17,15 @@ type ClickhouseReader struct { *basechr.ClickHouseReader } -func NewDataConnector(localDB *sqlx.DB, promConfigPath string, lm interfaces.FeatureLookup) *ClickhouseReader { - ch := basechr.NewReader(localDB, promConfigPath, lm) +func NewDataConnector( + localDB *sqlx.DB, + promConfigPath string, + lm interfaces.FeatureLookup, + maxIdleConns int, + maxOpenConns int, + dialTimeout time.Duration, +) *ClickhouseReader { + ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout) return &ClickhouseReader{ conn: ch.GetConn(), appdb: localDB, diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 497100e573..9f3a08a394 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -59,6 +59,9 @@ type ServerOptions struct { RuleRepoURL string PreferDelta bool PreferSpanMetrics bool + MaxIdleConns int + MaxOpenConns int + DialTimeout time.Duration } // Server runs HTTP api service @@ -122,7 +125,14 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { storage := os.Getenv("STORAGE") if storage == "clickhouse" { zap.S().Info("Using ClickHouse as datastore ...") - qb := db.NewDataConnector(localDB, serverOptions.PromConfigPath, lm) + qb := db.NewDataConnector( + localDB, + serverOptions.PromConfigPath, + lm, + serverOptions.MaxIdleConns, + serverOptions.MaxOpenConns, + serverOptions.DialTimeout, + ) go qb.Start(readerReady) reader = qb } else { @@ -184,6 +194,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { SkipConfig: skipConfig, PreferDelta: serverOptions.PreferDelta, PreferSpanMetrics: serverOptions.PreferSpanMetrics, + MaxIdleConns: serverOptions.MaxIdleConns, + MaxOpenConns: serverOptions.MaxOpenConns, + DialTimeout: serverOptions.DialTimeout, AppDao: modelDao, RulesManager: rm, FeatureFlags: lm, diff --git a/ee/query-service/main.go b/ee/query-service/main.go index ffd439cd32..dcdedeb9db 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -86,11 +86,18 @@ func main() { var preferDelta bool var preferSpanMetrics bool + var maxIdleConns int + var maxOpenConns int + var dialTimeout time.Duration + flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") flag.BoolVar(&preferDelta, "prefer-delta", false, "(prefer delta over cumulative metrics)") flag.BoolVar(&preferSpanMetrics, "prefer-span-metrics", false, "(prefer span metrics for service level metrics)") + flag.IntVar(&maxIdleConns, "max-idle-conns", 50, "(number of connections to maintain in the pool.)") + flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)") + flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)") flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)") flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)") flag.Parse() @@ -111,6 +118,9 @@ func main() { PrivateHostPort: baseconst.PrivateHostPort, DisableRules: disableRules, RuleRepoURL: ruleRepoURL, + MaxIdleConns: maxIdleConns, + MaxOpenConns: maxOpenConns, + DialTimeout: dialTimeout, } // Read the jwt secret key diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index 1f45e47115..f03da2505a 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -6,6 +6,7 @@ import ( "time" "github.com/ClickHouse/clickhouse-go/v2" + "go.uber.org/zap" ) type Encoding string @@ -58,6 +59,9 @@ type namespaceConfig struct { namespace string Enabled bool Datasource string + MaxIdleConns int + MaxOpenConns int + DialTimeout time.Duration TraceDB string OperationsTable string IndexTable string @@ -88,8 +92,14 @@ type Connector func(cfg *namespaceConfig) (clickhouse.Conn, error) func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) { ctx := context.Background() dsnURL, err := url.Parse(cfg.Datasource) + if err != nil { + return nil, err + } options := &clickhouse.Options{ - Addr: []string{dsnURL.Host}, + Addr: []string{dsnURL.Host}, + MaxOpenConns: cfg.MaxOpenConns, + MaxIdleConns: cfg.MaxIdleConns, + DialTimeout: cfg.DialTimeout, } if dsnURL.Query().Get("username") != "" { auth := clickhouse.Auth{ @@ -98,6 +108,7 @@ func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) { } options.Auth = auth } + zap.S().Infof("Connecting to Clickhouse at %s, MaxIdleConns: %d, MaxOpenConns: %d, DialTimeout: %s", dsnURL.Host, options.MaxIdleConns, options.MaxOpenConns, options.DialTimeout) db, err := clickhouse.Open(options) if err != nil { return nil, err @@ -118,7 +129,14 @@ type Options struct { } // NewOptions creates a new Options struct. -func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...string) *Options { +func NewOptions( + datasource string, + maxIdleConns int, + maxOpenConns int, + dialTimeout time.Duration, + primaryNamespace string, + otherNamespaces ...string, +) *Options { if datasource == "" { datasource = defaultDatasource @@ -129,6 +147,9 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s namespace: primaryNamespace, Enabled: true, Datasource: datasource, + MaxIdleConns: maxIdleConns, + MaxOpenConns: maxOpenConns, + DialTimeout: dialTimeout, TraceDB: defaultTraceDB, OperationsTable: defaultOperationsTable, IndexTable: defaultIndexTable, diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index bb3072df12..0965f4804f 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -119,10 +119,17 @@ type ClickHouseReader struct { } // NewTraceReader returns a TraceReader for the database -func NewReader(localDB *sqlx.DB, configFile string, featureFlag interfaces.FeatureLookup) *ClickHouseReader { +func NewReader( + localDB *sqlx.DB, + configFile string, + featureFlag interfaces.FeatureLookup, + maxIdleConns int, + maxOpenConns int, + dialTimeout time.Duration, +) *ClickHouseReader { datasource := os.Getenv("ClickHouseUrl") - options := NewOptions(datasource, primaryNamespace, archiveNamespace) + options := NewOptions(datasource, maxIdleConns, maxOpenConns, dialTimeout, primaryNamespace, archiveNamespace) db, err := initialize(options) if err != nil { diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 46fcd00676..2e5b6aeca5 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -77,6 +77,10 @@ type APIHandler struct { preferDelta bool preferSpanMetrics bool + maxIdleConns int + maxOpenConns int + dialTimeout time.Duration + LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController // SetupCompleted indicates if SigNoz is ready for general use. @@ -94,6 +98,11 @@ type APIHandlerOpts struct { PerferDelta bool PreferSpanMetrics bool + + MaxIdleConns int + MaxOpenConns int + DialTimeout time.Duration + // dao layer to perform crud on app objects like dashboard, alerts etc AppDao dao.ModelDao @@ -121,6 +130,9 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { skipConfig: opts.SkipConfig, preferDelta: opts.PerferDelta, preferSpanMetrics: opts.PreferSpanMetrics, + maxIdleConns: opts.MaxIdleConns, + maxOpenConns: opts.MaxOpenConns, + dialTimeout: opts.DialTimeout, alertManager: alertManager, ruleManager: opts.RuleManager, featureFlags: opts.FeatureFlags, diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index f01dd69760..45bb2ac91e 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -51,6 +51,9 @@ type ServerOptions struct { RuleRepoURL string PreferDelta bool PreferSpanMetrics bool + MaxIdleConns int + MaxOpenConns int + DialTimeout time.Duration } // Server runs HTTP, Mux and a grpc server @@ -103,7 +106,14 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { storage := os.Getenv("STORAGE") if storage == "clickhouse" { zap.S().Info("Using ClickHouse as datastore ...") - clickhouseReader := clickhouseReader.NewReader(localDB, serverOptions.PromConfigPath, fm) + clickhouseReader := clickhouseReader.NewReader( + localDB, + serverOptions.PromConfigPath, + fm, + serverOptions.MaxIdleConns, + serverOptions.MaxOpenConns, + serverOptions.DialTimeout, + ) go clickhouseReader.Start(readerReady) reader = clickhouseReader } else { @@ -136,6 +146,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { SkipConfig: skipConfig, PerferDelta: serverOptions.PreferDelta, PreferSpanMetrics: serverOptions.PreferSpanMetrics, + MaxIdleConns: serverOptions.MaxIdleConns, + MaxOpenConns: serverOptions.MaxOpenConns, + DialTimeout: serverOptions.DialTimeout, AppDao: dao.DB(), RuleManager: rm, FeatureFlags: fm, diff --git a/pkg/query-service/app/traces/v3/query_builder.go b/pkg/query-service/app/traces/v3/query_builder.go index 1f9dfefbd9..b7ce646b9f 100644 --- a/pkg/query-service/app/traces/v3/query_builder.go +++ b/pkg/query-service/app/traces/v3/query_builder.go @@ -153,6 +153,7 @@ func buildTracesFilterQuery(fs *v3.FilterSet, keys map[string]v3.AttributeKey) ( columnName := getColumnName(item.Key, keys) var fmtVal string key := enrichKeyWithMetadata(item.Key, keys) + item.Operator = v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator)))) if item.Operator != v3.FilterOperatorExists && item.Operator != v3.FilterOperatorNotExists { var err error val, err = utils.ValidateAndCastValue(val, key.DataType) diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index bd401d6be7..c0fdc3d5d2 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -6,6 +6,7 @@ import ( "os" "os/signal" "syscall" + "time" "go.signoz.io/signoz/pkg/query-service/app" "go.signoz.io/signoz/pkg/query-service/auth" @@ -37,11 +38,18 @@ func main() { var preferDelta bool var preferSpanMetrics bool + var maxIdleConns int + var maxOpenConns int + var dialTimeout time.Duration + flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") flag.BoolVar(&preferDelta, "prefer-delta", false, "(prefer delta over cumulative metrics)") flag.BoolVar(&preferSpanMetrics, "prefer-span-metrics", false, "(prefer span metrics for service level metrics)") + flag.IntVar(&maxIdleConns, "max-idle-conns", 50, "(number of connections to maintain in the pool.)") + flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)") + flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)") flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)") flag.Parse() @@ -61,6 +69,9 @@ func main() { PrivateHostPort: constants.PrivateHostPort, DisableRules: disableRules, RuleRepoURL: ruleRepoURL, + MaxIdleConns: maxIdleConns, + MaxOpenConns: maxOpenConns, + DialTimeout: dialTimeout, } // Read the jwt secret key