fix: analytics middleware fixed (#7133)

This commit is contained in:
Nityananda Gohain 2025-02-17 14:38:13 +05:30 committed by GitHub
parent 5a107f33f2
commit 8bfca9b564
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 3 additions and 356 deletions

View File

@ -1,21 +1,15 @@
package app
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
_ "net/http/pprof" // http profiler
"regexp"
"time"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/jmoiron/sqlx"
"github.com/rs/cors"
@ -29,8 +23,6 @@ import (
"go.signoz.io/signoz/ee/query-service/interfaces"
"go.signoz.io/signoz/ee/query-service/rules"
"go.signoz.io/signoz/pkg/http/middleware"
baseauth "go.signoz.io/signoz/pkg/query-service/auth"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/web"
@ -395,174 +387,6 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
}, nil
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
type loggingResponseWriter struct {
http.ResponseWriter
statusCode int
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
// WriteHeader(int) is not called if our response implicitly returns 200 OK, so
// we default to that status code.
return &loggingResponseWriter{w, http.StatusOK}
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
// Flush implements the http.Flush interface.
func (lrw *loggingResponseWriter) Flush() {
lrw.ResponseWriter.(http.Flusher).Flush()
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
// Support websockets
func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
h, ok := lrw.ResponseWriter.(http.Hijacker)
if !ok {
return nil, nil, errors.New("hijack not supported")
}
return h.Hijack()
}
func extractQueryRangeData(path string, r *http.Request) (map[string]interface{}, bool) {
pathToExtractBodyFromV3 := "/api/v3/query_range"
pathToExtractBodyFromV4 := "/api/v4/query_range"
data := map[string]interface{}{}
var postData *v3.QueryRangeParamsV3
if (r.Method == "POST") && ((path == pathToExtractBodyFromV3) || (path == pathToExtractBodyFromV4)) {
if r.Body != nil {
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
return nil, false
}
r.Body.Close() // must close
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
json.Unmarshal(bodyBytes, &postData)
} else {
return nil, false
}
} else {
return nil, false
}
referrer := r.Header.Get("Referer")
dashboardMatched, err := regexp.MatchString(`/dashboard/[a-zA-Z0-9\-]+/(new|edit)(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the referrer", zap.Error(err))
}
alertMatched, err := regexp.MatchString(`/alerts/(new|edit)(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the alert: ", zap.Error(err))
}
logsExplorerMatched, err := regexp.MatchString(`/logs/logs-explorer(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the logs explorer: ", zap.Error(err))
}
traceExplorerMatched, err := regexp.MatchString(`/traces-explorer(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the trace explorer: ", zap.Error(err))
}
queryInfoResult := telemetry.GetInstance().CheckQueryInfo(postData)
if (queryInfoResult.MetricsUsed || queryInfoResult.LogsUsed || queryInfoResult.TracesUsed) && (queryInfoResult.FilterApplied) {
if queryInfoResult.MetricsUsed {
telemetry.GetInstance().AddActiveMetricsUser()
}
if queryInfoResult.LogsUsed {
telemetry.GetInstance().AddActiveLogsUser()
}
if queryInfoResult.TracesUsed {
telemetry.GetInstance().AddActiveTracesUser()
}
data["metricsUsed"] = queryInfoResult.MetricsUsed
data["logsUsed"] = queryInfoResult.LogsUsed
data["tracesUsed"] = queryInfoResult.TracesUsed
data["filterApplied"] = queryInfoResult.FilterApplied
data["groupByApplied"] = queryInfoResult.GroupByApplied
data["aggregateOperator"] = queryInfoResult.AggregateOperator
data["aggregateAttributeKey"] = queryInfoResult.AggregateAttributeKey
data["numberOfQueries"] = queryInfoResult.NumberOfQueries
data["queryType"] = queryInfoResult.QueryType
data["panelType"] = queryInfoResult.PanelType
userEmail, err := baseauth.GetEmailFromJwt(r.Context())
if err == nil {
// switch case to set data["screen"] based on the referrer
switch {
case dashboardMatched:
data["screen"] = "panel"
case alertMatched:
data["screen"] = "alert"
case logsExplorerMatched:
data["screen"] = "logs-explorer"
case traceExplorerMatched:
data["screen"] = "traces-explorer"
default:
data["screen"] = "unknown"
return data, true
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_API, data, userEmail, true, false)
}
}
return data, true
}
func getActiveLogs(path string, r *http.Request) {
// if path == "/api/v1/dashboards/{uuid}" {
// telemetry.GetInstance().AddActiveMetricsUser()
// }
if path == "/api/v1/logs" {
hasFilters := len(r.URL.Query().Get("q"))
if hasFilters > 0 {
telemetry.GetInstance().AddActiveLogsUser()
}
}
}
func (s *Server) analyticsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := baseauth.AttachJwtToContext(r.Context(), r)
r = r.WithContext(ctx)
route := mux.CurrentRoute(r)
path, _ := route.GetPathTemplate()
queryRangeData, metadataExists := extractQueryRangeData(path, r)
getActiveLogs(path, r)
lrw := NewLoggingResponseWriter(w)
next.ServeHTTP(lrw, r)
data := map[string]interface{}{"path": path, "statusCode": lrw.statusCode}
if metadataExists {
for key, value := range queryRangeData {
data[key] = value
}
}
if _, ok := telemetry.EnabledPaths()[path]; ok {
userEmail, err := baseauth.GetEmailFromJwt(r.Context())
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data, userEmail, true, false)
}
}
})
}
// initListeners initialises listeners of the server
func (s *Server) initListeners() error {
// listen on public port

View File

@ -35,13 +35,13 @@ func (a *Analytics) Wrap(next http.Handler) http.Handler {
route := mux.CurrentRoute(r)
path, _ := route.GetPathTemplate()
queryRangeData, metadataExists := a.extractQueryRangeData(path, r)
a.getActiveLogs(path, r)
badResponseBuffer := new(bytes.Buffer)
writer := newBadResponseLoggingWriter(w, badResponseBuffer)
next.ServeHTTP(writer, r)
queryRangeData, metadataExists := a.extractQueryRangeData(path, r)
a.getActiveLogs(path, r)
data := map[string]interface{}{"path": path, "statusCode": writer.StatusCode()}
if metadataExists {
for key, value := range queryRangeData {

View File

@ -1,21 +1,15 @@
package app
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
_ "net/http/pprof" // http profiler
"regexp"
"time"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/jmoiron/sqlx"
"github.com/rs/cors"
@ -30,7 +24,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/opamp"
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
"go.signoz.io/signoz/pkg/query-service/app/preferences"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/web"
@ -338,176 +331,6 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
}, nil
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
type loggingResponseWriter struct {
http.ResponseWriter
statusCode int
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
// WriteHeader(int) is not called if our response implicitly returns 200 OK, so
// we default to that status code.
return &loggingResponseWriter{w, http.StatusOK}
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
// Flush implements the http.Flush interface.
func (lrw *loggingResponseWriter) Flush() {
lrw.ResponseWriter.(http.Flusher).Flush()
}
// TODO(remove): Implemented at pkg/http/middleware/logging.go
// Support websockets
func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
h, ok := lrw.ResponseWriter.(http.Hijacker)
if !ok {
return nil, nil, errors.New("hijack not supported")
}
return h.Hijack()
}
func extractQueryRangeV3Data(path string, r *http.Request) (map[string]interface{}, bool) {
pathToExtractBodyFromV3 := "/api/v3/query_range"
pathToExtractBodyFromV4 := "/api/v4/query_range"
data := map[string]interface{}{}
var postData *v3.QueryRangeParamsV3
if (r.Method == "POST") && ((path == pathToExtractBodyFromV3) || (path == pathToExtractBodyFromV4)) {
if r.Body != nil {
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
return nil, false
}
r.Body.Close() // must close
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
json.Unmarshal(bodyBytes, &postData)
} else {
return nil, false
}
} else {
return nil, false
}
referrer := r.Header.Get("Referer")
dashboardMatched, err := regexp.MatchString(`/dashboard/[a-zA-Z0-9\-]+/(new|edit)(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the referrer", zap.Error(err))
}
alertMatched, err := regexp.MatchString(`/alerts/(new|edit)(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the alert: ", zap.Error(err))
}
logsExplorerMatched, err := regexp.MatchString(`/logs/logs-explorer(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the logs explorer: ", zap.Error(err))
}
traceExplorerMatched, err := regexp.MatchString(`/traces-explorer(?:\?.*)?$`, referrer)
if err != nil {
zap.L().Error("error while matching the trace explorer: ", zap.Error(err))
}
queryInfoResult := telemetry.GetInstance().CheckQueryInfo(postData)
if (queryInfoResult.MetricsUsed || queryInfoResult.LogsUsed || queryInfoResult.TracesUsed) && (queryInfoResult.FilterApplied) {
if queryInfoResult.MetricsUsed {
telemetry.GetInstance().AddActiveMetricsUser()
}
if queryInfoResult.LogsUsed {
telemetry.GetInstance().AddActiveLogsUser()
}
if queryInfoResult.TracesUsed {
telemetry.GetInstance().AddActiveTracesUser()
}
data["metricsUsed"] = queryInfoResult.MetricsUsed
data["logsUsed"] = queryInfoResult.LogsUsed
data["tracesUsed"] = queryInfoResult.TracesUsed
data["filterApplied"] = queryInfoResult.FilterApplied
data["groupByApplied"] = queryInfoResult.GroupByApplied
data["aggregateOperator"] = queryInfoResult.AggregateOperator
data["aggregateAttributeKey"] = queryInfoResult.AggregateAttributeKey
data["numberOfQueries"] = queryInfoResult.NumberOfQueries
data["queryType"] = queryInfoResult.QueryType
data["panelType"] = queryInfoResult.PanelType
userEmail, err := auth.GetEmailFromJwt(r.Context())
if err == nil {
// switch case to set data["screen"] based on the referrer
switch {
case dashboardMatched:
data["screen"] = "panel"
case alertMatched:
data["screen"] = "alert"
case logsExplorerMatched:
data["screen"] = "logs-explorer"
case traceExplorerMatched:
data["screen"] = "traces-explorer"
default:
data["screen"] = "unknown"
return data, true
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_API, data, userEmail, true, false)
}
}
return data, true
}
func getActiveLogs(path string, r *http.Request) {
// if path == "/api/v1/dashboards/{uuid}" {
// telemetry.GetInstance().AddActiveMetricsUser()
// }
if path == "/api/v1/logs" {
hasFilters := len(r.URL.Query().Get("q"))
if hasFilters > 0 {
telemetry.GetInstance().AddActiveLogsUser()
}
}
}
func (s *Server) analyticsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := auth.AttachJwtToContext(r.Context(), r)
r = r.WithContext(ctx)
route := mux.CurrentRoute(r)
path, _ := route.GetPathTemplate()
queryRangeV3data, metadataExists := extractQueryRangeV3Data(path, r)
getActiveLogs(path, r)
lrw := NewLoggingResponseWriter(w)
next.ServeHTTP(lrw, r)
data := map[string]interface{}{"path": path, "statusCode": lrw.statusCode}
if metadataExists {
for key, value := range queryRangeV3data {
data[key] = value
}
}
// if telemetry.GetInstance().IsSampled() {
if _, ok := telemetry.EnabledPaths()[path]; ok {
userEmail, err := auth.GetEmailFromJwt(r.Context())
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data, userEmail, true, false)
}
}
// }
})
}
// initListeners initialises listeners of the server
func (s *Server) initListeners() error {
// listen on public port