From 8bfca9b5644a30370fcddea91efb2a135fe702c0 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Mon, 17 Feb 2025 14:38:13 +0530 Subject: [PATCH] fix: analytics middleware fixed (#7133) --- ee/query-service/app/server.go | 176 ------------------------------ pkg/http/middleware/analytics.go | 6 +- pkg/query-service/app/server.go | 177 ------------------------------- 3 files changed, 3 insertions(+), 356 deletions(-) diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 57212cd33d..a8addbc177 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -1,21 +1,15 @@ package app import ( - "bufio" - "bytes" "context" - "encoding/json" "errors" "fmt" - "io" "net" "net/http" _ "net/http/pprof" // http profiler - "regexp" "time" "github.com/gorilla/handlers" - "github.com/gorilla/mux" "github.com/jmoiron/sqlx" "github.com/rs/cors" @@ -29,8 +23,6 @@ import ( "go.signoz.io/signoz/ee/query-service/interfaces" "go.signoz.io/signoz/ee/query-service/rules" "go.signoz.io/signoz/pkg/http/middleware" - baseauth "go.signoz.io/signoz/pkg/query-service/auth" - v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/signoz" "go.signoz.io/signoz/pkg/web" @@ -395,174 +387,6 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h }, nil } -// TODO(remove): Implemented at pkg/http/middleware/logging.go -type loggingResponseWriter struct { - http.ResponseWriter - statusCode int -} - -// TODO(remove): Implemented at pkg/http/middleware/logging.go -func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter { - // WriteHeader(int) is not called if our response implicitly returns 200 OK, so - // we default to that status code. - return &loggingResponseWriter{w, http.StatusOK} -} - -// TODO(remove): Implemented at pkg/http/middleware/logging.go -func (lrw *loggingResponseWriter) WriteHeader(code int) { - lrw.statusCode = code - lrw.ResponseWriter.WriteHeader(code) -} - -// TODO(remove): Implemented at pkg/http/middleware/logging.go -// Flush implements the http.Flush interface. -func (lrw *loggingResponseWriter) Flush() { - lrw.ResponseWriter.(http.Flusher).Flush() -} - -// TODO(remove): Implemented at pkg/http/middleware/logging.go -// Support websockets -func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { - h, ok := lrw.ResponseWriter.(http.Hijacker) - if !ok { - return nil, nil, errors.New("hijack not supported") - } - return h.Hijack() -} - -func extractQueryRangeData(path string, r *http.Request) (map[string]interface{}, bool) { - pathToExtractBodyFromV3 := "/api/v3/query_range" - pathToExtractBodyFromV4 := "/api/v4/query_range" - - data := map[string]interface{}{} - var postData *v3.QueryRangeParamsV3 - - if (r.Method == "POST") && ((path == pathToExtractBodyFromV3) || (path == pathToExtractBodyFromV4)) { - if r.Body != nil { - bodyBytes, err := io.ReadAll(r.Body) - if err != nil { - return nil, false - } - r.Body.Close() // must close - r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) - json.Unmarshal(bodyBytes, &postData) - - } else { - return nil, false - } - - } else { - return nil, false - } - - referrer := r.Header.Get("Referer") - - dashboardMatched, err := regexp.MatchString(`/dashboard/[a-zA-Z0-9\-]+/(new|edit)(?:\?.*)?$`, referrer) - if err != nil { - zap.L().Error("error while matching the referrer", zap.Error(err)) - } - alertMatched, err := regexp.MatchString(`/alerts/(new|edit)(?:\?.*)?$`, referrer) - if err != nil { - zap.L().Error("error while matching the alert: ", zap.Error(err)) - } - logsExplorerMatched, err := regexp.MatchString(`/logs/logs-explorer(?:\?.*)?$`, referrer) - if err != nil { - zap.L().Error("error while matching the logs explorer: ", zap.Error(err)) - } - traceExplorerMatched, err := regexp.MatchString(`/traces-explorer(?:\?.*)?$`, referrer) - if err != nil { - zap.L().Error("error while matching the trace explorer: ", zap.Error(err)) - } - - queryInfoResult := telemetry.GetInstance().CheckQueryInfo(postData) - - if (queryInfoResult.MetricsUsed || queryInfoResult.LogsUsed || queryInfoResult.TracesUsed) && (queryInfoResult.FilterApplied) { - if queryInfoResult.MetricsUsed { - telemetry.GetInstance().AddActiveMetricsUser() - } - if queryInfoResult.LogsUsed { - telemetry.GetInstance().AddActiveLogsUser() - } - if queryInfoResult.TracesUsed { - telemetry.GetInstance().AddActiveTracesUser() - } - data["metricsUsed"] = queryInfoResult.MetricsUsed - data["logsUsed"] = queryInfoResult.LogsUsed - data["tracesUsed"] = queryInfoResult.TracesUsed - data["filterApplied"] = queryInfoResult.FilterApplied - data["groupByApplied"] = queryInfoResult.GroupByApplied - data["aggregateOperator"] = queryInfoResult.AggregateOperator - data["aggregateAttributeKey"] = queryInfoResult.AggregateAttributeKey - data["numberOfQueries"] = queryInfoResult.NumberOfQueries - data["queryType"] = queryInfoResult.QueryType - data["panelType"] = queryInfoResult.PanelType - - userEmail, err := baseauth.GetEmailFromJwt(r.Context()) - if err == nil { - // switch case to set data["screen"] based on the referrer - switch { - case dashboardMatched: - data["screen"] = "panel" - case alertMatched: - data["screen"] = "alert" - case logsExplorerMatched: - data["screen"] = "logs-explorer" - case traceExplorerMatched: - data["screen"] = "traces-explorer" - default: - data["screen"] = "unknown" - return data, true - } - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_API, data, userEmail, true, false) - } - } - return data, true -} - -func getActiveLogs(path string, r *http.Request) { - // if path == "/api/v1/dashboards/{uuid}" { - // telemetry.GetInstance().AddActiveMetricsUser() - // } - if path == "/api/v1/logs" { - hasFilters := len(r.URL.Query().Get("q")) - if hasFilters > 0 { - telemetry.GetInstance().AddActiveLogsUser() - } - - } - -} - -func (s *Server) analyticsMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ctx := baseauth.AttachJwtToContext(r.Context(), r) - r = r.WithContext(ctx) - route := mux.CurrentRoute(r) - path, _ := route.GetPathTemplate() - - queryRangeData, metadataExists := extractQueryRangeData(path, r) - getActiveLogs(path, r) - - lrw := NewLoggingResponseWriter(w) - next.ServeHTTP(lrw, r) - - data := map[string]interface{}{"path": path, "statusCode": lrw.statusCode} - if metadataExists { - for key, value := range queryRangeData { - data[key] = value - } - } - - if _, ok := telemetry.EnabledPaths()[path]; ok { - userEmail, err := baseauth.GetEmailFromJwt(r.Context()) - if err == nil { - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data, userEmail, true, false) - } - } - - }) -} - // initListeners initialises listeners of the server func (s *Server) initListeners() error { // listen on public port diff --git a/pkg/http/middleware/analytics.go b/pkg/http/middleware/analytics.go index c08d73cc9c..f9922d878f 100644 --- a/pkg/http/middleware/analytics.go +++ b/pkg/http/middleware/analytics.go @@ -35,13 +35,13 @@ func (a *Analytics) Wrap(next http.Handler) http.Handler { route := mux.CurrentRoute(r) path, _ := route.GetPathTemplate() + queryRangeData, metadataExists := a.extractQueryRangeData(path, r) + a.getActiveLogs(path, r) + badResponseBuffer := new(bytes.Buffer) writer := newBadResponseLoggingWriter(w, badResponseBuffer) next.ServeHTTP(writer, r) - queryRangeData, metadataExists := a.extractQueryRangeData(path, r) - a.getActiveLogs(path, r) - data := map[string]interface{}{"path": path, "statusCode": writer.StatusCode()} if metadataExists { for key, value := range queryRangeData { diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 328bfd8976..6509c161bb 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -1,21 +1,15 @@ package app import ( - "bufio" - "bytes" "context" - "encoding/json" "errors" "fmt" - "io" "net" "net/http" _ "net/http/pprof" // http profiler - "regexp" "time" "github.com/gorilla/handlers" - "github.com/gorilla/mux" "github.com/jmoiron/sqlx" "github.com/rs/cors" @@ -30,7 +24,6 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/opamp" opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model" "go.signoz.io/signoz/pkg/query-service/app/preferences" - v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/signoz" "go.signoz.io/signoz/pkg/web" @@ -338,176 +331,6 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server, }, nil } -// TODO(remove): Implemented at pkg/http/middleware/logging.go -type loggingResponseWriter struct { - http.ResponseWriter - statusCode int -} - -// TODO(remove): Implemented at pkg/http/middleware/logging.go -func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter { - // WriteHeader(int) is not called if our response implicitly returns 200 OK, so - // we default to that status code. - return &loggingResponseWriter{w, http.StatusOK} -} - -// TODO(remove): Implemented at pkg/http/middleware/logging.go -func (lrw *loggingResponseWriter) WriteHeader(code int) { - lrw.statusCode = code - lrw.ResponseWriter.WriteHeader(code) -} - -// TODO(remove): Implemented at pkg/http/middleware/logging.go -// Flush implements the http.Flush interface. -func (lrw *loggingResponseWriter) Flush() { - lrw.ResponseWriter.(http.Flusher).Flush() -} - -// TODO(remove): Implemented at pkg/http/middleware/logging.go -// Support websockets -func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { - h, ok := lrw.ResponseWriter.(http.Hijacker) - if !ok { - return nil, nil, errors.New("hijack not supported") - } - return h.Hijack() -} - -func extractQueryRangeV3Data(path string, r *http.Request) (map[string]interface{}, bool) { - pathToExtractBodyFromV3 := "/api/v3/query_range" - pathToExtractBodyFromV4 := "/api/v4/query_range" - - data := map[string]interface{}{} - var postData *v3.QueryRangeParamsV3 - - if (r.Method == "POST") && ((path == pathToExtractBodyFromV3) || (path == pathToExtractBodyFromV4)) { - if r.Body != nil { - bodyBytes, err := io.ReadAll(r.Body) - if err != nil { - return nil, false - } - r.Body.Close() // must close - r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) - json.Unmarshal(bodyBytes, &postData) - - } else { - return nil, false - } - - } else { - return nil, false - } - - referrer := r.Header.Get("Referer") - - dashboardMatched, err := regexp.MatchString(`/dashboard/[a-zA-Z0-9\-]+/(new|edit)(?:\?.*)?$`, referrer) - if err != nil { - zap.L().Error("error while matching the referrer", zap.Error(err)) - } - alertMatched, err := regexp.MatchString(`/alerts/(new|edit)(?:\?.*)?$`, referrer) - if err != nil { - zap.L().Error("error while matching the alert: ", zap.Error(err)) - } - logsExplorerMatched, err := regexp.MatchString(`/logs/logs-explorer(?:\?.*)?$`, referrer) - if err != nil { - zap.L().Error("error while matching the logs explorer: ", zap.Error(err)) - } - traceExplorerMatched, err := regexp.MatchString(`/traces-explorer(?:\?.*)?$`, referrer) - if err != nil { - zap.L().Error("error while matching the trace explorer: ", zap.Error(err)) - } - - queryInfoResult := telemetry.GetInstance().CheckQueryInfo(postData) - - if (queryInfoResult.MetricsUsed || queryInfoResult.LogsUsed || queryInfoResult.TracesUsed) && (queryInfoResult.FilterApplied) { - if queryInfoResult.MetricsUsed { - telemetry.GetInstance().AddActiveMetricsUser() - } - if queryInfoResult.LogsUsed { - telemetry.GetInstance().AddActiveLogsUser() - } - if queryInfoResult.TracesUsed { - telemetry.GetInstance().AddActiveTracesUser() - } - data["metricsUsed"] = queryInfoResult.MetricsUsed - data["logsUsed"] = queryInfoResult.LogsUsed - data["tracesUsed"] = queryInfoResult.TracesUsed - data["filterApplied"] = queryInfoResult.FilterApplied - data["groupByApplied"] = queryInfoResult.GroupByApplied - data["aggregateOperator"] = queryInfoResult.AggregateOperator - data["aggregateAttributeKey"] = queryInfoResult.AggregateAttributeKey - data["numberOfQueries"] = queryInfoResult.NumberOfQueries - data["queryType"] = queryInfoResult.QueryType - data["panelType"] = queryInfoResult.PanelType - - userEmail, err := auth.GetEmailFromJwt(r.Context()) - if err == nil { - // switch case to set data["screen"] based on the referrer - switch { - case dashboardMatched: - data["screen"] = "panel" - case alertMatched: - data["screen"] = "alert" - case logsExplorerMatched: - data["screen"] = "logs-explorer" - case traceExplorerMatched: - data["screen"] = "traces-explorer" - default: - data["screen"] = "unknown" - return data, true - } - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_API, data, userEmail, true, false) - } - } - return data, true -} - -func getActiveLogs(path string, r *http.Request) { - // if path == "/api/v1/dashboards/{uuid}" { - // telemetry.GetInstance().AddActiveMetricsUser() - // } - if path == "/api/v1/logs" { - hasFilters := len(r.URL.Query().Get("q")) - if hasFilters > 0 { - telemetry.GetInstance().AddActiveLogsUser() - } - - } - -} - -func (s *Server) analyticsMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ctx := auth.AttachJwtToContext(r.Context(), r) - r = r.WithContext(ctx) - route := mux.CurrentRoute(r) - path, _ := route.GetPathTemplate() - - queryRangeV3data, metadataExists := extractQueryRangeV3Data(path, r) - getActiveLogs(path, r) - - lrw := NewLoggingResponseWriter(w) - next.ServeHTTP(lrw, r) - - data := map[string]interface{}{"path": path, "statusCode": lrw.statusCode} - if metadataExists { - for key, value := range queryRangeV3data { - data[key] = value - } - } - - // if telemetry.GetInstance().IsSampled() { - if _, ok := telemetry.EnabledPaths()[path]; ok { - userEmail, err := auth.GetEmailFromJwt(r.Context()) - if err == nil { - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data, userEmail, true, false) - } - } - // } - - }) -} - // initListeners initialises listeners of the server func (s *Server) initListeners() error { // listen on public port