mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 02:48:59 +08:00
HTTP listener for internal services (#1238)
* feat: added private http server to handle internal service requests * feat: added private port default to constants
This commit is contained in:
parent
b7d52b8fba
commit
5ae4e05c96
@ -30,7 +30,7 @@ services:
|
||||
condition: service_healthy
|
||||
restart: on-failure
|
||||
command:
|
||||
- --queryService.url=http://query-service:8080
|
||||
- --queryService.url=http://query-service:8085
|
||||
- --storage.path=/data
|
||||
|
||||
# Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md`
|
||||
|
@ -277,6 +277,11 @@ func AdminAccess(f func(http.ResponseWriter, *http.Request)) http.HandlerFunc {
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterPrivateRoutes registers routes for this handler on the given router
|
||||
func (aH *APIHandler) RegisterPrivateRoutes(router *mux.Router) {
|
||||
router.HandleFunc("/api/v1/channels", aH.listChannels).Methods(http.MethodGet)
|
||||
}
|
||||
|
||||
// RegisterRoutes registers routes for this handler on the given router
|
||||
func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
|
||||
router.HandleFunc("/api/v1/query_range", ViewAccess(aH.queryRangeMetrics)).Methods(http.MethodGet)
|
||||
|
@ -25,23 +25,24 @@ import (
|
||||
)
|
||||
|
||||
type ServerOptions struct {
|
||||
HTTPHostPort string
|
||||
HTTPHostPort string
|
||||
PrivateHostPort string
|
||||
}
|
||||
|
||||
// Server runs HTTP, Mux and a grpc server
|
||||
type Server struct {
|
||||
// logger *zap.Logger
|
||||
// querySvc *querysvc.QueryService
|
||||
// queryOptions *QueryOptions
|
||||
|
||||
// tracer opentracing.Tracer // TODO make part of flags.Service
|
||||
serverOptions *ServerOptions
|
||||
conn net.Listener
|
||||
// grpcConn net.Listener
|
||||
httpConn net.Listener
|
||||
// grpcServer *grpc.Server
|
||||
httpServer *http.Server
|
||||
separatePorts bool
|
||||
|
||||
// public http router
|
||||
httpConn net.Listener
|
||||
httpServer *http.Server
|
||||
|
||||
// private http
|
||||
privateConn net.Listener
|
||||
privateHTTP *http.Server
|
||||
|
||||
unavailableChannel chan healthcheck.Status
|
||||
}
|
||||
|
||||
@ -51,59 +52,20 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {
|
||||
}
|
||||
|
||||
// NewServer creates and initializes Server
|
||||
// func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {
|
||||
func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
|
||||
// _, httpPort, err := net.SplitHostPort(serverOptions.HTTPHostPort)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
// _, grpcPort, err := net.SplitHostPort(options.GRPCHostPort)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
// grpcServer, err := createGRPCServer(querySvc, options, logger, tracer)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
if err := dao.InitDao("sqlite", constants.RELATIONAL_DATASOURCE_PATH); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
// logger: logger,
|
||||
// querySvc: querySvc,
|
||||
// queryOptions: options,
|
||||
// tracer: tracer,
|
||||
// grpcServer: grpcServer,
|
||||
serverOptions: serverOptions,
|
||||
separatePorts: true,
|
||||
// separatePorts: grpcPort != httpPort,
|
||||
unavailableChannel: make(chan healthcheck.Status),
|
||||
}
|
||||
httpServer, err := s.createHTTPServer()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.httpServer = httpServer
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Server) createHTTPServer() (*http.Server, error) {
|
||||
|
||||
localDB, err := dashboards.InitDB(constants.RELATIONAL_DATASOURCE_PATH)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
localDB.SetMaxOpenConns(10)
|
||||
|
||||
var reader Reader
|
||||
|
||||
storage := os.Getenv("STORAGE")
|
||||
if storage == "clickhouse" {
|
||||
zap.S().Info("Using ClickHouse as datastore ...")
|
||||
@ -119,24 +81,75 @@ func (s *Server) createHTTPServer() (*http.Server, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
// logger: logger,
|
||||
// tracer: tracer,
|
||||
serverOptions: serverOptions,
|
||||
unavailableChannel: make(chan healthcheck.Status),
|
||||
}
|
||||
|
||||
httpServer, err := s.createPublicServer(apiHandler)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.httpServer = httpServer
|
||||
|
||||
privateServer, err := s.createPrivateServer(apiHandler)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.privateHTTP = privateServer
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) {
|
||||
|
||||
r := NewRouter()
|
||||
|
||||
r.Use(setTimeoutMiddleware)
|
||||
r.Use(s.analyticsMiddleware)
|
||||
r.Use(loggingMiddlewarePrivate)
|
||||
|
||||
api.RegisterPrivateRoutes(r)
|
||||
|
||||
c := cors.New(cors.Options{
|
||||
//todo(amol): find out a way to add exact domain or
|
||||
// ip here for alert manager
|
||||
AllowedOrigins: []string{"*"},
|
||||
AllowedMethods: []string{"GET", "DELETE", "POST", "PUT"},
|
||||
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"},
|
||||
})
|
||||
|
||||
handler := c.Handler(r)
|
||||
handler = handlers.CompressHandler(handler)
|
||||
|
||||
return &http.Server{
|
||||
Handler: handler,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) {
|
||||
|
||||
r := NewRouter()
|
||||
|
||||
r.Use(setTimeoutMiddleware)
|
||||
r.Use(s.analyticsMiddleware)
|
||||
r.Use(loggingMiddleware)
|
||||
|
||||
apiHandler.RegisterRoutes(r)
|
||||
apiHandler.RegisterMetricsRoutes(r)
|
||||
api.RegisterRoutes(r)
|
||||
api.RegisterMetricsRoutes(r)
|
||||
|
||||
c := cors.New(cors.Options{
|
||||
AllowedOrigins: []string{"*"},
|
||||
// AllowCredentials: true,
|
||||
AllowedMethods: []string{"GET", "DELETE", "POST", "PUT"},
|
||||
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"},
|
||||
})
|
||||
|
||||
handler := c.Handler(r)
|
||||
// var handler http.Handler = r
|
||||
|
||||
handler = handlers.CompressHandler(handler)
|
||||
|
||||
@ -145,6 +158,7 @@ func (s *Server) createHTTPServer() (*http.Server, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// loggingMiddleware is used for logging public api calls
|
||||
func loggingMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
route := mux.CurrentRoute(r)
|
||||
@ -155,6 +169,18 @@ func loggingMiddleware(next http.Handler) http.Handler {
|
||||
})
|
||||
}
|
||||
|
||||
// loggingMiddlewarePrivate is used for logging private api calls
|
||||
// from internal services like alert manager
|
||||
func loggingMiddlewarePrivate(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
route := mux.CurrentRoute(r)
|
||||
path, _ := route.GetPathTemplate()
|
||||
startTime := time.Now()
|
||||
next.ServeHTTP(w, r)
|
||||
zap.S().Info(path, "\tprivatePort: true", "\ttimeTaken: ", time.Now().Sub(startTime))
|
||||
})
|
||||
}
|
||||
|
||||
type loggingResponseWriter struct {
|
||||
http.ResponseWriter
|
||||
statusCode int
|
||||
@ -198,61 +224,42 @@ func setTimeoutMiddleware(next http.Handler) http.Handler {
|
||||
})
|
||||
}
|
||||
|
||||
// initListener initialises listeners of the server
|
||||
func (s *Server) initListener() (cmux.CMux, error) {
|
||||
if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests
|
||||
var err error
|
||||
// s.grpcConn, err = net.Listen("tcp", s.queryOptions.GRPCHostPort)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
s.httpConn, err = net.Listen("tcp", s.serverOptions.HTTPHostPort)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
zap.S().Info("Query server started ...")
|
||||
return nil, nil
|
||||
// initListeners initialises listeners of the server
|
||||
func (s *Server) initListeners() error {
|
||||
// listen on public port
|
||||
var err error
|
||||
publicHostPort := s.serverOptions.HTTPHostPort
|
||||
if publicHostPort == "" {
|
||||
return fmt.Errorf("constants.HTTPHostPort is required")
|
||||
}
|
||||
|
||||
// // old behavior using cmux
|
||||
// conn, err := net.Listen("tcp", s.queryOptions.HostPort)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// s.conn = conn
|
||||
s.httpConn, err = net.Listen("tcp", publicHostPort)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// var tcpPort int
|
||||
// if port, err := netutils
|
||||
zap.S().Info(fmt.Sprintf("Query server started listening on %s...", s.serverOptions.HTTPHostPort))
|
||||
|
||||
// utils.GetPort(s.conn.Addr()); err == nil {
|
||||
// tcpPort = port
|
||||
// }
|
||||
// listen on private port to support internal services
|
||||
privateHostPort := s.serverOptions.PrivateHostPort
|
||||
|
||||
// zap.S().Info(
|
||||
// "Query server started",
|
||||
// zap.Int("port", tcpPort),
|
||||
// zap.String("addr", s.queryOptions.HostPort))
|
||||
if privateHostPort == "" {
|
||||
return fmt.Errorf("constants.PrivateHostPort is required")
|
||||
}
|
||||
|
||||
// // cmux server acts as a reverse-proxy between HTTP and GRPC backends.
|
||||
// cmuxServer := cmux.New(s.conn)
|
||||
|
||||
// s.grpcConn = cmuxServer.MatchWithWriters(
|
||||
// cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"),
|
||||
// cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"),
|
||||
// )
|
||||
// s.httpConn = cmuxServer.Match(cmux.Any())
|
||||
// s.queryOptions.HTTPHostPort = s.queryOptions.HostPort
|
||||
// s.queryOptions.GRPCHostPort = s.queryOptions.HostPort
|
||||
|
||||
return nil, nil
|
||||
s.privateConn, err = net.Listen("tcp", privateHostPort)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
zap.S().Info(fmt.Sprintf("Query server started listening on private port %s...", s.serverOptions.PrivateHostPort))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start http, GRPC and cmux servers concurrently
|
||||
// Start listening on http and private http port concurrently
|
||||
func (s *Server) Start() error {
|
||||
|
||||
_, err := s.initListener()
|
||||
err := s.initListeners()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -283,5 +290,25 @@ func (s *Server) Start() error {
|
||||
}
|
||||
}()
|
||||
|
||||
var privatePort int
|
||||
if port, err := utils.GetPort(s.privateConn.Addr()); err == nil {
|
||||
privatePort = port
|
||||
}
|
||||
fmt.Println("starting private http")
|
||||
go func() {
|
||||
zap.S().Info("Starting Private HTTP server", zap.Int("port", privatePort), zap.String("addr", s.serverOptions.PrivateHostPort))
|
||||
|
||||
switch err := s.privateHTTP.Serve(s.privateConn); err {
|
||||
case nil, http.ErrServerClosed, cmux.ErrListenerClosed:
|
||||
// normal exit, nothing to do
|
||||
zap.S().Info("private http server closed")
|
||||
default:
|
||||
zap.S().Error("Could not start private HTTP server", zap.Error(err))
|
||||
}
|
||||
|
||||
s.unavailableChannel <- healthcheck.Unavailable
|
||||
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -6,8 +6,9 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
HTTPHostPort = "0.0.0.0:8080" // Address to serve http (query service)
|
||||
DebugHttpPort = "0.0.0.0:6060" // Address to serve http (pprof)
|
||||
HTTPHostPort = "0.0.0.0:8080" // Address to serve http (query service)
|
||||
PrivateHostPort = "0.0.0.0:8085" // Address to server internal services like alert manager
|
||||
DebugHttpPort = "0.0.0.0:6060" // Address to serve http (pprof)
|
||||
)
|
||||
|
||||
var DEFAULT_TELEMETRY_ANONYMOUS = false
|
||||
@ -37,29 +38,29 @@ var AmChannelApiPath = GetOrDefaultEnv("ALERTMANAGER_API_CHANNEL_PATH", "v1/rout
|
||||
var RELATIONAL_DATASOURCE_PATH = GetOrDefaultEnv("SIGNOZ_LOCAL_DB_PATH", "/var/lib/signoz/signoz.db")
|
||||
|
||||
const (
|
||||
ServiceName = "serviceName"
|
||||
HttpRoute = "httpRoute"
|
||||
HttpCode = "httpCode"
|
||||
HttpHost = "httpHost"
|
||||
HttpUrl = "httpUrl"
|
||||
HttpMethod = "httpMethod"
|
||||
Component = "component"
|
||||
OperationDB = "name"
|
||||
OperationRequest = "operation"
|
||||
Status = "status"
|
||||
Duration = "duration"
|
||||
DBName = "dbName"
|
||||
DBOperation = "dbOperation"
|
||||
DBSystem = "dbSystem"
|
||||
MsgSystem = "msgSystem"
|
||||
MsgOperation = "msgOperation"
|
||||
Timestamp = "timestamp"
|
||||
Descending = "descending"
|
||||
Ascending = "ascending"
|
||||
ContextTimeout = 60 // seconds
|
||||
StatusPending = "pending"
|
||||
StatusFailed = "failed"
|
||||
StatusSuccess = "success"
|
||||
ServiceName = "serviceName"
|
||||
HttpRoute = "httpRoute"
|
||||
HttpCode = "httpCode"
|
||||
HttpHost = "httpHost"
|
||||
HttpUrl = "httpUrl"
|
||||
HttpMethod = "httpMethod"
|
||||
Component = "component"
|
||||
OperationDB = "name"
|
||||
OperationRequest = "operation"
|
||||
Status = "status"
|
||||
Duration = "duration"
|
||||
DBName = "dbName"
|
||||
DBOperation = "dbOperation"
|
||||
DBSystem = "dbSystem"
|
||||
MsgSystem = "msgSystem"
|
||||
MsgOperation = "msgOperation"
|
||||
Timestamp = "timestamp"
|
||||
Descending = "descending"
|
||||
Ascending = "ascending"
|
||||
ContextTimeout = 60 // seconds
|
||||
StatusPending = "pending"
|
||||
StatusFailed = "failed"
|
||||
StatusSuccess = "success"
|
||||
)
|
||||
|
||||
func GetOrDefaultEnv(key string, fallback string) string {
|
||||
|
@ -34,7 +34,8 @@ func main() {
|
||||
version.PrintVersion()
|
||||
|
||||
serverOptions := &app.ServerOptions{
|
||||
HTTPHostPort: constants.HTTPHostPort,
|
||||
HTTPHostPort: constants.HTTPHostPort,
|
||||
PrivateHostPort: constants.PrivateHostPort,
|
||||
}
|
||||
|
||||
// Read the jwt secret key
|
||||
|
Loading…
x
Reference in New Issue
Block a user