fix: lowercase filter operator and increase default conn settings (#3310)

This commit is contained in:
Srikanth Chekuri 2023-08-10 17:20:34 +05:30 committed by GitHub
parent c35f2ec0aa
commit abb8b2b122
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 112 additions and 8 deletions

View File

@ -2,6 +2,7 @@ package api
import (
"net/http"
"time"
"github.com/gorilla/mux"
"go.signoz.io/signoz/ee/query-service/dao"
@ -20,6 +21,9 @@ type APIHandlerOptions struct {
SkipConfig *basemodel.SkipConfig
PreferDelta bool
PreferSpanMetrics bool
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
AppDao dao.ModelDao
RulesManager *rules.Manager
FeatureFlags baseint.FeatureLookup
@ -40,6 +44,9 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
SkipConfig: opts.SkipConfig,
PerferDelta: opts.PreferDelta,
PreferSpanMetrics: opts.PreferSpanMetrics,
MaxIdleConns: opts.MaxIdleConns,
MaxOpenConns: opts.MaxOpenConns,
DialTimeout: opts.DialTimeout,
AppDao: opts.AppDao,
RuleManager: opts.RulesManager,
FeatureFlags: opts.FeatureFlags,

View File

@ -1,6 +1,8 @@
package db
import (
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/jmoiron/sqlx"
@ -15,8 +17,15 @@ type ClickhouseReader struct {
*basechr.ClickHouseReader
}
func NewDataConnector(localDB *sqlx.DB, promConfigPath string, lm interfaces.FeatureLookup) *ClickhouseReader {
ch := basechr.NewReader(localDB, promConfigPath, lm)
func NewDataConnector(
localDB *sqlx.DB,
promConfigPath string,
lm interfaces.FeatureLookup,
maxIdleConns int,
maxOpenConns int,
dialTimeout time.Duration,
) *ClickhouseReader {
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout)
return &ClickhouseReader{
conn: ch.GetConn(),
appdb: localDB,

View File

@ -59,6 +59,9 @@ type ServerOptions struct {
RuleRepoURL string
PreferDelta bool
PreferSpanMetrics bool
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
}
// Server runs HTTP api service
@ -122,7 +125,14 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
storage := os.Getenv("STORAGE")
if storage == "clickhouse" {
zap.S().Info("Using ClickHouse as datastore ...")
qb := db.NewDataConnector(localDB, serverOptions.PromConfigPath, lm)
qb := db.NewDataConnector(
localDB,
serverOptions.PromConfigPath,
lm,
serverOptions.MaxIdleConns,
serverOptions.MaxOpenConns,
serverOptions.DialTimeout,
)
go qb.Start(readerReady)
reader = qb
} else {
@ -184,6 +194,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
SkipConfig: skipConfig,
PreferDelta: serverOptions.PreferDelta,
PreferSpanMetrics: serverOptions.PreferSpanMetrics,
MaxIdleConns: serverOptions.MaxIdleConns,
MaxOpenConns: serverOptions.MaxOpenConns,
DialTimeout: serverOptions.DialTimeout,
AppDao: modelDao,
RulesManager: rm,
FeatureFlags: lm,

View File

@ -86,11 +86,18 @@ func main() {
var preferDelta bool
var preferSpanMetrics bool
var maxIdleConns int
var maxOpenConns int
var dialTimeout time.Duration
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
flag.BoolVar(&preferDelta, "prefer-delta", false, "(prefer delta over cumulative metrics)")
flag.BoolVar(&preferSpanMetrics, "prefer-span-metrics", false, "(prefer span metrics for service level metrics)")
flag.IntVar(&maxIdleConns, "max-idle-conns", 50, "(number of connections to maintain in the pool.)")
flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)")
flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)")
flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)")
flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)")
flag.Parse()
@ -111,6 +118,9 @@ func main() {
PrivateHostPort: baseconst.PrivateHostPort,
DisableRules: disableRules,
RuleRepoURL: ruleRepoURL,
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
}
// Read the jwt secret key

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"go.uber.org/zap"
)
type Encoding string
@ -58,6 +59,9 @@ type namespaceConfig struct {
namespace string
Enabled bool
Datasource string
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
TraceDB string
OperationsTable string
IndexTable string
@ -88,8 +92,14 @@ type Connector func(cfg *namespaceConfig) (clickhouse.Conn, error)
func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) {
ctx := context.Background()
dsnURL, err := url.Parse(cfg.Datasource)
if err != nil {
return nil, err
}
options := &clickhouse.Options{
Addr: []string{dsnURL.Host},
Addr: []string{dsnURL.Host},
MaxOpenConns: cfg.MaxOpenConns,
MaxIdleConns: cfg.MaxIdleConns,
DialTimeout: cfg.DialTimeout,
}
if dsnURL.Query().Get("username") != "" {
auth := clickhouse.Auth{
@ -98,6 +108,7 @@ func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) {
}
options.Auth = auth
}
zap.S().Infof("Connecting to Clickhouse at %s, MaxIdleConns: %d, MaxOpenConns: %d, DialTimeout: %s", dsnURL.Host, options.MaxIdleConns, options.MaxOpenConns, options.DialTimeout)
db, err := clickhouse.Open(options)
if err != nil {
return nil, err
@ -118,7 +129,14 @@ type Options struct {
}
// NewOptions creates a new Options struct.
func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...string) *Options {
func NewOptions(
datasource string,
maxIdleConns int,
maxOpenConns int,
dialTimeout time.Duration,
primaryNamespace string,
otherNamespaces ...string,
) *Options {
if datasource == "" {
datasource = defaultDatasource
@ -129,6 +147,9 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s
namespace: primaryNamespace,
Enabled: true,
Datasource: datasource,
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
TraceDB: defaultTraceDB,
OperationsTable: defaultOperationsTable,
IndexTable: defaultIndexTable,

View File

@ -119,10 +119,17 @@ type ClickHouseReader struct {
}
// NewTraceReader returns a TraceReader for the database
func NewReader(localDB *sqlx.DB, configFile string, featureFlag interfaces.FeatureLookup) *ClickHouseReader {
func NewReader(
localDB *sqlx.DB,
configFile string,
featureFlag interfaces.FeatureLookup,
maxIdleConns int,
maxOpenConns int,
dialTimeout time.Duration,
) *ClickHouseReader {
datasource := os.Getenv("ClickHouseUrl")
options := NewOptions(datasource, primaryNamespace, archiveNamespace)
options := NewOptions(datasource, maxIdleConns, maxOpenConns, dialTimeout, primaryNamespace, archiveNamespace)
db, err := initialize(options)
if err != nil {

View File

@ -77,6 +77,10 @@ type APIHandler struct {
preferDelta bool
preferSpanMetrics bool
maxIdleConns int
maxOpenConns int
dialTimeout time.Duration
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
// SetupCompleted indicates if SigNoz is ready for general use.
@ -94,6 +98,11 @@ type APIHandlerOpts struct {
PerferDelta bool
PreferSpanMetrics bool
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
// dao layer to perform crud on app objects like dashboard, alerts etc
AppDao dao.ModelDao
@ -121,6 +130,9 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
skipConfig: opts.SkipConfig,
preferDelta: opts.PerferDelta,
preferSpanMetrics: opts.PreferSpanMetrics,
maxIdleConns: opts.MaxIdleConns,
maxOpenConns: opts.MaxOpenConns,
dialTimeout: opts.DialTimeout,
alertManager: alertManager,
ruleManager: opts.RuleManager,
featureFlags: opts.FeatureFlags,

View File

@ -51,6 +51,9 @@ type ServerOptions struct {
RuleRepoURL string
PreferDelta bool
PreferSpanMetrics bool
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
}
// Server runs HTTP, Mux and a grpc server
@ -103,7 +106,14 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
storage := os.Getenv("STORAGE")
if storage == "clickhouse" {
zap.S().Info("Using ClickHouse as datastore ...")
clickhouseReader := clickhouseReader.NewReader(localDB, serverOptions.PromConfigPath, fm)
clickhouseReader := clickhouseReader.NewReader(
localDB,
serverOptions.PromConfigPath,
fm,
serverOptions.MaxIdleConns,
serverOptions.MaxOpenConns,
serverOptions.DialTimeout,
)
go clickhouseReader.Start(readerReady)
reader = clickhouseReader
} else {
@ -136,6 +146,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
SkipConfig: skipConfig,
PerferDelta: serverOptions.PreferDelta,
PreferSpanMetrics: serverOptions.PreferSpanMetrics,
MaxIdleConns: serverOptions.MaxIdleConns,
MaxOpenConns: serverOptions.MaxOpenConns,
DialTimeout: serverOptions.DialTimeout,
AppDao: dao.DB(),
RuleManager: rm,
FeatureFlags: fm,

View File

@ -153,6 +153,7 @@ func buildTracesFilterQuery(fs *v3.FilterSet, keys map[string]v3.AttributeKey) (
columnName := getColumnName(item.Key, keys)
var fmtVal string
key := enrichKeyWithMetadata(item.Key, keys)
item.Operator = v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
if item.Operator != v3.FilterOperatorExists && item.Operator != v3.FilterOperatorNotExists {
var err error
val, err = utils.ValidateAndCastValue(val, key.DataType)

View File

@ -6,6 +6,7 @@ import (
"os"
"os/signal"
"syscall"
"time"
"go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/auth"
@ -37,11 +38,18 @@ func main() {
var preferDelta bool
var preferSpanMetrics bool
var maxIdleConns int
var maxOpenConns int
var dialTimeout time.Duration
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)")
flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)")
flag.BoolVar(&preferDelta, "prefer-delta", false, "(prefer delta over cumulative metrics)")
flag.BoolVar(&preferSpanMetrics, "prefer-span-metrics", false, "(prefer span metrics for service level metrics)")
flag.IntVar(&maxIdleConns, "max-idle-conns", 50, "(number of connections to maintain in the pool.)")
flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)")
flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)")
flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)")
flag.Parse()
@ -61,6 +69,9 @@ func main() {
PrivateHostPort: constants.PrivateHostPort,
DisableRules: disableRules,
RuleRepoURL: ruleRepoURL,
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
}
// Read the jwt secret key