From 5ae4e05c9624a8b2abca11cdb4be9e8e6d25e2a5 Mon Sep 17 00:00:00 2001 From: Amol Umbark Date: Wed, 8 Jun 2022 12:22:25 +0530 Subject: [PATCH] HTTP listener for internal services (#1238) * feat: added private http server to handle internal service requests * feat: added private port default to constants --- .../clickhouse-setup/docker-compose.yaml | 2 +- pkg/query-service/app/http_handler.go | 5 + pkg/query-service/app/server.go | 225 ++++++++++-------- pkg/query-service/constants/constants.go | 51 ++-- pkg/query-service/main.go | 3 +- 5 files changed, 160 insertions(+), 126 deletions(-) diff --git a/deploy/docker/clickhouse-setup/docker-compose.yaml b/deploy/docker/clickhouse-setup/docker-compose.yaml index e18d584e1d..bfcace3144 100644 --- a/deploy/docker/clickhouse-setup/docker-compose.yaml +++ b/deploy/docker/clickhouse-setup/docker-compose.yaml @@ -30,7 +30,7 @@ services: condition: service_healthy restart: on-failure command: - - --queryService.url=http://query-service:8080 + - --queryService.url=http://query-service:8085 - --storage.path=/data # Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md` diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index cb7779c29e..ee4633ff1a 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -277,6 +277,11 @@ func AdminAccess(f func(http.ResponseWriter, *http.Request)) http.HandlerFunc { } } +// RegisterPrivateRoutes registers routes for this handler on the given router +func (aH *APIHandler) RegisterPrivateRoutes(router *mux.Router) { + router.HandleFunc("/api/v1/channels", aH.listChannels).Methods(http.MethodGet) +} + // RegisterRoutes registers routes for this handler on the given router func (aH *APIHandler) RegisterRoutes(router *mux.Router) { router.HandleFunc("/api/v1/query_range", ViewAccess(aH.queryRangeMetrics)).Methods(http.MethodGet) diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index e157153dcc..7904e813e2 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -25,23 +25,24 @@ import ( ) type ServerOptions struct { - HTTPHostPort string + HTTPHostPort string + PrivateHostPort string } // Server runs HTTP, Mux and a grpc server type Server struct { // logger *zap.Logger - // querySvc *querysvc.QueryService - // queryOptions *QueryOptions - // tracer opentracing.Tracer // TODO make part of flags.Service serverOptions *ServerOptions - conn net.Listener - // grpcConn net.Listener - httpConn net.Listener - // grpcServer *grpc.Server - httpServer *http.Server - separatePorts bool + + // public http router + httpConn net.Listener + httpServer *http.Server + + // private http + privateConn net.Listener + privateHTTP *http.Server + unavailableChannel chan healthcheck.Status } @@ -51,59 +52,20 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status { } // NewServer creates and initializes Server -// func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) { func NewServer(serverOptions *ServerOptions) (*Server, error) { - // _, httpPort, err := net.SplitHostPort(serverOptions.HTTPHostPort) - // if err != nil { - // return nil, err - // } - - // _, grpcPort, err := net.SplitHostPort(options.GRPCHostPort) - // if err != nil { - // return nil, err - // } - - // grpcServer, err := createGRPCServer(querySvc, options, logger, tracer) - // if err != nil { - // return nil, err - // } - if err := dao.InitDao("sqlite", constants.RELATIONAL_DATASOURCE_PATH); err != nil { return nil, err } - - s := &Server{ - // logger: logger, - // querySvc: querySvc, - // queryOptions: options, - // tracer: tracer, - // grpcServer: grpcServer, - serverOptions: serverOptions, - separatePorts: true, - // separatePorts: grpcPort != httpPort, - unavailableChannel: make(chan healthcheck.Status), - } - httpServer, err := s.createHTTPServer() - - if err != nil { - return nil, err - } - s.httpServer = httpServer - - return s, nil -} - -func (s *Server) createHTTPServer() (*http.Server, error) { - localDB, err := dashboards.InitDB(constants.RELATIONAL_DATASOURCE_PATH) + if err != nil { return nil, err } + localDB.SetMaxOpenConns(10) var reader Reader - storage := os.Getenv("STORAGE") if storage == "clickhouse" { zap.S().Info("Using ClickHouse as datastore ...") @@ -119,24 +81,75 @@ func (s *Server) createHTTPServer() (*http.Server, error) { return nil, err } + s := &Server{ + // logger: logger, + // tracer: tracer, + serverOptions: serverOptions, + unavailableChannel: make(chan healthcheck.Status), + } + + httpServer, err := s.createPublicServer(apiHandler) + + if err != nil { + return nil, err + } + + s.httpServer = httpServer + + privateServer, err := s.createPrivateServer(apiHandler) + if err != nil { + return nil, err + } + + s.privateHTTP = privateServer + + return s, nil +} + +func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) { + + r := NewRouter() + + r.Use(setTimeoutMiddleware) + r.Use(s.analyticsMiddleware) + r.Use(loggingMiddlewarePrivate) + + api.RegisterPrivateRoutes(r) + + c := cors.New(cors.Options{ + //todo(amol): find out a way to add exact domain or + // ip here for alert manager + AllowedOrigins: []string{"*"}, + AllowedMethods: []string{"GET", "DELETE", "POST", "PUT"}, + AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"}, + }) + + handler := c.Handler(r) + handler = handlers.CompressHandler(handler) + + return &http.Server{ + Handler: handler, + }, nil +} + +func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) { + r := NewRouter() r.Use(setTimeoutMiddleware) r.Use(s.analyticsMiddleware) r.Use(loggingMiddleware) - apiHandler.RegisterRoutes(r) - apiHandler.RegisterMetricsRoutes(r) + api.RegisterRoutes(r) + api.RegisterMetricsRoutes(r) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, - // AllowCredentials: true, AllowedMethods: []string{"GET", "DELETE", "POST", "PUT"}, AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"}, }) handler := c.Handler(r) - // var handler http.Handler = r handler = handlers.CompressHandler(handler) @@ -145,6 +158,7 @@ func (s *Server) createHTTPServer() (*http.Server, error) { }, nil } +// loggingMiddleware is used for logging public api calls func loggingMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { route := mux.CurrentRoute(r) @@ -155,6 +169,18 @@ func loggingMiddleware(next http.Handler) http.Handler { }) } +// loggingMiddlewarePrivate is used for logging private api calls +// from internal services like alert manager +func loggingMiddlewarePrivate(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + route := mux.CurrentRoute(r) + path, _ := route.GetPathTemplate() + startTime := time.Now() + next.ServeHTTP(w, r) + zap.S().Info(path, "\tprivatePort: true", "\ttimeTaken: ", time.Now().Sub(startTime)) + }) +} + type loggingResponseWriter struct { http.ResponseWriter statusCode int @@ -198,61 +224,42 @@ func setTimeoutMiddleware(next http.Handler) http.Handler { }) } -// initListener initialises listeners of the server -func (s *Server) initListener() (cmux.CMux, error) { - if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests - var err error - // s.grpcConn, err = net.Listen("tcp", s.queryOptions.GRPCHostPort) - // if err != nil { - // return nil, err - // } - - s.httpConn, err = net.Listen("tcp", s.serverOptions.HTTPHostPort) - if err != nil { - return nil, err - } - zap.S().Info("Query server started ...") - return nil, nil +// initListeners initialises listeners of the server +func (s *Server) initListeners() error { + // listen on public port + var err error + publicHostPort := s.serverOptions.HTTPHostPort + if publicHostPort == "" { + return fmt.Errorf("constants.HTTPHostPort is required") } - // // old behavior using cmux - // conn, err := net.Listen("tcp", s.queryOptions.HostPort) - // if err != nil { - // return nil, err - // } - // s.conn = conn + s.httpConn, err = net.Listen("tcp", publicHostPort) + if err != nil { + return err + } - // var tcpPort int - // if port, err := netutils + zap.S().Info(fmt.Sprintf("Query server started listening on %s...", s.serverOptions.HTTPHostPort)) - // utils.GetPort(s.conn.Addr()); err == nil { - // tcpPort = port - // } + // listen on private port to support internal services + privateHostPort := s.serverOptions.PrivateHostPort - // zap.S().Info( - // "Query server started", - // zap.Int("port", tcpPort), - // zap.String("addr", s.queryOptions.HostPort)) + if privateHostPort == "" { + return fmt.Errorf("constants.PrivateHostPort is required") + } - // // cmux server acts as a reverse-proxy between HTTP and GRPC backends. - // cmuxServer := cmux.New(s.conn) - - // s.grpcConn = cmuxServer.MatchWithWriters( - // cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"), - // cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"), - // ) - // s.httpConn = cmuxServer.Match(cmux.Any()) - // s.queryOptions.HTTPHostPort = s.queryOptions.HostPort - // s.queryOptions.GRPCHostPort = s.queryOptions.HostPort - - return nil, nil + s.privateConn, err = net.Listen("tcp", privateHostPort) + if err != nil { + return err + } + zap.S().Info(fmt.Sprintf("Query server started listening on private port %s...", s.serverOptions.PrivateHostPort)) + return nil } -// Start http, GRPC and cmux servers concurrently +// Start listening on http and private http port concurrently func (s *Server) Start() error { - _, err := s.initListener() + err := s.initListeners() if err != nil { return err } @@ -283,5 +290,25 @@ func (s *Server) Start() error { } }() + var privatePort int + if port, err := utils.GetPort(s.privateConn.Addr()); err == nil { + privatePort = port + } + fmt.Println("starting private http") + go func() { + zap.S().Info("Starting Private HTTP server", zap.Int("port", privatePort), zap.String("addr", s.serverOptions.PrivateHostPort)) + + switch err := s.privateHTTP.Serve(s.privateConn); err { + case nil, http.ErrServerClosed, cmux.ErrListenerClosed: + // normal exit, nothing to do + zap.S().Info("private http server closed") + default: + zap.S().Error("Could not start private HTTP server", zap.Error(err)) + } + + s.unavailableChannel <- healthcheck.Unavailable + + }() + return nil } diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index f18737c434..2825ce31ea 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -6,8 +6,9 @@ import ( ) const ( - HTTPHostPort = "0.0.0.0:8080" // Address to serve http (query service) - DebugHttpPort = "0.0.0.0:6060" // Address to serve http (pprof) + HTTPHostPort = "0.0.0.0:8080" // Address to serve http (query service) + PrivateHostPort = "0.0.0.0:8085" // Address to server internal services like alert manager + DebugHttpPort = "0.0.0.0:6060" // Address to serve http (pprof) ) var DEFAULT_TELEMETRY_ANONYMOUS = false @@ -37,29 +38,29 @@ var AmChannelApiPath = GetOrDefaultEnv("ALERTMANAGER_API_CHANNEL_PATH", "v1/rout var RELATIONAL_DATASOURCE_PATH = GetOrDefaultEnv("SIGNOZ_LOCAL_DB_PATH", "/var/lib/signoz/signoz.db") const ( - ServiceName = "serviceName" - HttpRoute = "httpRoute" - HttpCode = "httpCode" - HttpHost = "httpHost" - HttpUrl = "httpUrl" - HttpMethod = "httpMethod" - Component = "component" - OperationDB = "name" - OperationRequest = "operation" - Status = "status" - Duration = "duration" - DBName = "dbName" - DBOperation = "dbOperation" - DBSystem = "dbSystem" - MsgSystem = "msgSystem" - MsgOperation = "msgOperation" - Timestamp = "timestamp" - Descending = "descending" - Ascending = "ascending" - ContextTimeout = 60 // seconds - StatusPending = "pending" - StatusFailed = "failed" - StatusSuccess = "success" + ServiceName = "serviceName" + HttpRoute = "httpRoute" + HttpCode = "httpCode" + HttpHost = "httpHost" + HttpUrl = "httpUrl" + HttpMethod = "httpMethod" + Component = "component" + OperationDB = "name" + OperationRequest = "operation" + Status = "status" + Duration = "duration" + DBName = "dbName" + DBOperation = "dbOperation" + DBSystem = "dbSystem" + MsgSystem = "msgSystem" + MsgOperation = "msgOperation" + Timestamp = "timestamp" + Descending = "descending" + Ascending = "ascending" + ContextTimeout = 60 // seconds + StatusPending = "pending" + StatusFailed = "failed" + StatusSuccess = "success" ) func GetOrDefaultEnv(key string, fallback string) string { diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index 8eb1c48ccd..b837560531 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -34,7 +34,8 @@ func main() { version.PrintVersion() serverOptions := &app.ServerOptions{ - HTTPHostPort: constants.HTTPHostPort, + HTTPHostPort: constants.HTTPHostPort, + PrivateHostPort: constants.PrivateHostPort, } // Read the jwt secret key