package main import ( "context" "flag" "log" "os" "os/signal" "strconv" "syscall" "time" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.signoz.io/signoz/ee/query-service/app" "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/constants" baseconst "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/version" "google.golang.org/grpc" zapotlpencoder "github.com/SigNoz/zap_otlp/zap_otlp_encoder" zapotlpsync "github.com/SigNoz/zap_otlp/zap_otlp_sync" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) func initZapLog(enableQueryServiceLogOTLPExport bool) *zap.Logger { config := zap.NewDevelopmentConfig() ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() config.EncoderConfig.EncodeDuration = zapcore.StringDurationEncoder otlpEncoder := zapotlpencoder.NewOTLPEncoder(config.EncoderConfig) consoleEncoder := zapcore.NewConsoleEncoder(config.EncoderConfig) defaultLogLevel := zapcore.DebugLevel config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder config.EncoderConfig.TimeKey = "timestamp" config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder res := resource.NewWithAttributes( semconv.SchemaURL, semconv.ServiceNameKey.String("query-service"), ) core := zapcore.NewTee( zapcore.NewCore(consoleEncoder, os.Stdout, defaultLogLevel), ) if enableQueryServiceLogOTLPExport == true { conn, err := grpc.DialContext(ctx, constants.OTLPTarget, grpc.WithBlock(), grpc.WithInsecure(), grpc.WithTimeout(time.Second*30)) if err != nil { log.Println("failed to connect to otlp collector to export query service logs with error:", err) } else { logExportBatchSizeInt, err := strconv.Atoi(baseconst.LogExportBatchSize) if err != nil { logExportBatchSizeInt = 1000 } ws := zapcore.AddSync(zapotlpsync.NewOtlpSyncer(conn, zapotlpsync.Options{ BatchSize: logExportBatchSizeInt, ResourceSchema: semconv.SchemaURL, Resource: res, })) core = zapcore.NewTee( zapcore.NewCore(consoleEncoder, os.Stdout, defaultLogLevel), zapcore.NewCore(otlpEncoder, zapcore.NewMultiWriteSyncer(ws), defaultLogLevel), ) } } logger := zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel)) return logger } func main() { var promConfigPath, skipTopLvlOpsPath string // disables rule execution but allows change to the rule definition var disableRules bool // the url used to build link in the alert messages in slack and other systems var ruleRepoURL string var cluster string var cacheConfigPath, fluxInterval string var enableQueryServiceLogOTLPExport bool var preferDelta bool var preferSpanMetrics bool var maxIdleConns int var maxOpenConns int var dialTimeout time.Duration flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") flag.BoolVar(&preferDelta, "prefer-delta", false, "(prefer delta over cumulative metrics)") flag.BoolVar(&preferSpanMetrics, "prefer-span-metrics", false, "(prefer span metrics for service level metrics)") flag.IntVar(&maxIdleConns, "max-idle-conns", 50, "(number of connections to maintain in the pool.)") flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)") flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)") flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)") flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)") flag.StringVar(&fluxInterval, "flux-interval", "5m", "(cache config to use)") flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)") flag.StringVar(&cluster, "cluster", "cluster", "(cluster name - defaults to 'cluster')") flag.Parse() loggerMgr := initZapLog(enableQueryServiceLogOTLPExport) zap.ReplaceGlobals(loggerMgr) defer loggerMgr.Sync() // flushes buffer, if any logger := loggerMgr.Sugar() version.PrintVersion() serverOptions := &app.ServerOptions{ HTTPHostPort: baseconst.HTTPHostPort, PromConfigPath: promConfigPath, SkipTopLvlOpsPath: skipTopLvlOpsPath, PreferDelta: preferDelta, PreferSpanMetrics: preferSpanMetrics, PrivateHostPort: baseconst.PrivateHostPort, DisableRules: disableRules, RuleRepoURL: ruleRepoURL, MaxIdleConns: maxIdleConns, MaxOpenConns: maxOpenConns, DialTimeout: dialTimeout, CacheConfigPath: cacheConfigPath, FluxInterval: fluxInterval, Cluster: cluster, } // Read the jwt secret key auth.JwtSecret = os.Getenv("SIGNOZ_JWT_SECRET") if len(auth.JwtSecret) == 0 { zap.S().Warn("No JWT secret key is specified.") } else { zap.S().Info("No JWT secret key set successfully.") } server, err := app.NewServer(serverOptions) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) } if err := server.Start(); err != nil { logger.Fatal("Could not start servers", zap.Error(err)) } if err := auth.InitAuthCache(context.Background()); err != nil { logger.Fatal("Failed to initialize auth cache", zap.Error(err)) } signalsChannel := make(chan os.Signal, 1) signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) for { select { case status := <-server.HealthCheckStatus(): logger.Info("Received HealthCheck status: ", zap.Int("status", int(status))) case <-signalsChannel: logger.Fatal("Received OS Interrupt Signal ... ") server.Stop() } } }