fix: improve user telemetry (#3972)

* fix: improve user telemetry
- move GetEmailFromJwt to common function
- update AttachJwtToContext() to use standard way of attaching value to context
- update userEmail in every possible sendEvent call
- send groupId in sendEvent call for SaaS operator analytics

* chore: added DEFAULT_CLOUD_EMAIL const

* chore: add AttachJwtToContext to analytics middleware

* test: added AttachJwtToContext to logs pipelines
This commit is contained in:
Vishal Sharma 2023-11-16 15:11:38 +05:30 committed by GitHub
parent d8a8430a5b
commit 5b419cb668
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 109 additions and 85 deletions

View File

@ -52,7 +52,6 @@ func (ah *APIHandler) listLicenses(w http.ResponseWriter, r *http.Request) {
}
func (ah *APIHandler) applyLicense(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
var l model.License
if err := json.NewDecoder(r.Body).Decode(&l); err != nil {
@ -64,8 +63,7 @@ func (ah *APIHandler) applyLicense(w http.ResponseWriter, r *http.Request) {
RespondError(w, model.BadRequest(fmt.Errorf("license key is required")), nil)
return
}
license, apiError := ah.LM().Activate(ctx, l.Key)
license, apiError := ah.LM().Activate(r.Context(), l.Key)
if apiError != nil {
RespondError(w, apiError, nil)
return

View File

@ -23,6 +23,7 @@ import (
"go.signoz.io/signoz/ee/query-service/constants"
"go.signoz.io/signoz/ee/query-service/dao"
"go.signoz.io/signoz/ee/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/auth"
baseInterface "go.signoz.io/signoz/pkg/query-service/interfaces"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
@ -437,7 +438,10 @@ func extractQueryRangeV3Data(path string, r *http.Request) (map[string]interface
telemetry.GetInstance().AddActiveLogsUser()
}
data["dataSources"] = dataSources
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_V3, data, true)
userEmail, err := auth.GetEmailFromJwt(r.Context())
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_V3, data, userEmail, true)
}
}
return data, true
}
@ -458,6 +462,8 @@ func getActiveLogs(path string, r *http.Request) {
func (s *Server) analyticsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := auth.AttachJwtToContext(r.Context(), r)
r = r.WithContext(ctx)
route := mux.CurrentRoute(r)
path, _ := route.GetPathTemplate()
@ -475,7 +481,10 @@ func (s *Server) analyticsMiddleware(next http.Handler) http.Handler {
}
if _, ok := telemetry.IgnoredPaths()[path]; !ok {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data)
userEmail, err := auth.GetEmailFromJwt(r.Context())
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data, userEmail)
}
}
})

View File

@ -10,6 +10,7 @@ import (
"sync"
"go.signoz.io/signoz/pkg/query-service/auth"
baseconstants "go.signoz.io/signoz/pkg/query-service/constants"
validate "go.signoz.io/signoz/ee/query-service/integrations/signozio"
@ -203,7 +204,7 @@ func (lm *Manager) Validate(ctx context.Context) (reterr error) {
zap.S().Errorf("License validation completed with error", reterr)
atomic.AddUint64(&lm.failedAttempts, 1)
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_LICENSE_CHECK_FAILED,
map[string]interface{}{"err": reterr.Error()})
map[string]interface{}{"err": reterr.Error()}, "")
} else {
zap.S().Info("License validation completed with no errors")
}
@ -259,8 +260,11 @@ func (lm *Manager) Validate(ctx context.Context) (reterr error) {
func (lm *Manager) Activate(ctx context.Context, key string) (licenseResponse *model.License, errResponse *model.ApiError) {
defer func() {
if errResponse != nil {
userEmail, err := auth.GetEmailFromJwt(ctx)
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_LICENSE_ACT_FAILED,
map[string]interface{}{"err": errResponse.Err.Error()})
map[string]interface{}{"err": errResponse.Err.Error()}, userEmail)
}
}
}()

View File

@ -45,6 +45,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/logs"
"go.signoz.io/signoz/pkg/query-service/app/services"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/constants"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"go.signoz.io/signoz/pkg/query-service/interfaces"
@ -3606,7 +3607,10 @@ func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilter
"lenFilters": lenFilters,
}
if lenFilters != 0 {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data)
userEmail, err := auth.GetEmailFromJwt(ctx)
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data, userEmail)
}
}
query := fmt.Sprintf("%s from %s.%s", constants.LogsSQLSelect, r.logsDB, r.logsTable)
@ -3646,7 +3650,10 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC
"lenFilters": lenFilters,
}
if lenFilters != 0 {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data)
userEmail, err := auth.GetEmailFromJwt(ctx)
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data, userEmail)
}
}
if err != nil {
@ -3736,7 +3743,10 @@ func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.Logs
"lenFilters": lenFilters,
}
if lenFilters != 0 {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data)
userEmail, err := auth.GetEmailFromJwt(ctx)
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data, userEmail)
}
}
query := ""

View File

@ -144,7 +144,7 @@ func CreateView(ctx context.Context, view v3.SavedView) (string, error) {
createdAt := time.Now()
updatedAt := time.Now()
email, err := getEmailFromJwt(ctx)
email, err := auth.GetEmailFromJwt(ctx)
if err != nil {
return "", err
}
@ -205,7 +205,7 @@ func UpdateView(ctx context.Context, uuid_ string, view v3.SavedView) error {
return fmt.Errorf("error in marshalling explorer query data: %s", err.Error())
}
email, err := getEmailFromJwt(ctx)
email, err := auth.GetEmailFromJwt(ctx)
if err != nil {
return err
}
@ -228,17 +228,3 @@ func DeleteView(uuid_ string) error {
}
return nil
}
func getEmailFromJwt(ctx context.Context) (string, error) {
jwt, err := auth.ExtractJwtFromContext(ctx)
if err != nil {
return "", err
}
claims, err := auth.ParseJWT(jwt)
if err != nil {
return "", err
}
return claims["email"].(string), nil
}

View File

@ -1397,8 +1397,10 @@ func (aH *APIHandler) submitFeedback(w http.ResponseWriter, r *http.Request) {
"email": email,
"message": message,
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_INPRODUCT_FEEDBACK, data)
userEmail, err := auth.GetEmailFromJwt(r.Context())
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_INPRODUCT_FEEDBACK, data, userEmail)
}
}
func (aH *APIHandler) getTopOperations(w http.ResponseWriter, r *http.Request) {
@ -1476,8 +1478,11 @@ func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) {
data := map[string]interface{}{
"number": len(*result),
}
userEmail, err := auth.GetEmailFromJwt(r.Context())
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_NUMBER_OF_SERVICES, data, userEmail)
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_NUMBER_OF_SERVICES, data)
if (data["number"] != 0) && (data["number"] != telemetry.DEFAULT_NUMBER_OF_SERVICES) {
telemetry.GetInstance().AddActiveTracesUser()
}
@ -1796,8 +1801,7 @@ func (aH *APIHandler) inviteUser(w http.ResponseWriter, r *http.Request) {
return
}
ctx := auth.AttachJwtToContext(context.Background(), r)
resp, err := auth.Invite(ctx, req)
resp, err := auth.Invite(r.Context(), req)
if err != nil {
RespondError(w, &model.ApiError{Err: err, Typ: model.ErrorInternal}, nil)
return
@ -1822,8 +1826,7 @@ func (aH *APIHandler) getInvite(w http.ResponseWriter, r *http.Request) {
func (aH *APIHandler) revokeInvite(w http.ResponseWriter, r *http.Request) {
email := mux.Vars(r)["email"]
ctx := auth.AttachJwtToContext(context.Background(), r)
if err := auth.RevokeInvite(ctx, email); err != nil {
if err := auth.RevokeInvite(r.Context(), email); err != nil {
RespondError(w, &model.ApiError{Err: err, Typ: model.ErrorInternal}, nil)
return
}
@ -2201,8 +2204,8 @@ func (aH *APIHandler) editOrg(w http.ResponseWriter, r *http.Request) {
"isAnonymous": req.IsAnonymous,
"organizationName": req.Name,
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_ORG_SETTINGS, data)
userEmail, err := auth.GetEmailFromJwt(r.Context())
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_ORG_SETTINGS, data, userEmail)
aH.WriteJSON(w, r, map[string]string{"data": "org updated successfully"})
}
@ -2365,7 +2368,6 @@ func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) {
RespondError(w, apiErr, "Incorrect params")
return
}
res, apiErr := aH.reader.GetLogs(r.Context(), params)
if apiErr != nil {
RespondError(w, apiErr, "Failed to fetch logs from the DB")
@ -2426,7 +2428,6 @@ func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) {
RespondError(w, apiErr, "Incorrect params")
return
}
res, apiErr := aH.reader.AggregateLogs(r.Context(), params)
if apiErr != nil {
RespondError(w, apiErr, "Failed to fetch logs aggregate from the DB")
@ -2559,8 +2560,6 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
return
}
ctx := auth.AttachJwtToContext(context.Background(), r)
createPipeline := func(
ctx context.Context,
postable []logparsingpipeline.PostablePipeline,
@ -2578,7 +2577,7 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
return ah.LogsParsingPipelineController.ApplyPipelines(ctx, postable)
}
res, err := createPipeline(ctx, req.Pipelines)
res, err := createPipeline(r.Context(), req.Pipelines)
if err != nil {
RespondError(w, err, nil)
return
@ -2613,8 +2612,7 @@ func (aH *APIHandler) createSavedViews(w http.ResponseWriter, r *http.Request) {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
ctx := auth.AttachJwtToContext(context.Background(), r)
uuid, err := explorer.CreateView(ctx, view)
uuid, err := explorer.CreateView(r.Context(), view)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
@ -2648,8 +2646,7 @@ func (aH *APIHandler) updateSavedView(w http.ResponseWriter, r *http.Request) {
return
}
ctx := auth.AttachJwtToContext(context.Background(), r)
err = explorer.UpdateView(ctx, viewID, view)
err = explorer.UpdateView(r.Context(), viewID, view)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return

View File

@ -55,8 +55,8 @@ func (r *Repo) insertPipeline(
))
}
jwt, err := auth.ExtractJwtFromContext(ctx)
if err != nil {
jwt, ok := auth.ExtractJwtFromContext(ctx)
if !ok {
return nil, model.UnauthorizedError(err)
}

View File

@ -374,7 +374,10 @@ func extractQueryRangeV3Data(path string, r *http.Request) (map[string]interface
telemetry.GetInstance().AddActiveLogsUser()
}
data["dataSources"] = dataSources
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_V3, data, true)
userEmail, err := auth.GetEmailFromJwt(r.Context())
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_V3, data, userEmail, true)
}
}
return data, true
}
@ -395,6 +398,8 @@ func getActiveLogs(path string, r *http.Request) {
func (s *Server) analyticsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := auth.AttachJwtToContext(r.Context(), r)
r = r.WithContext(ctx)
route := mux.CurrentRoute(r)
path, _ := route.GetPathTemplate()
@ -413,7 +418,10 @@ func (s *Server) analyticsMiddleware(next http.Handler) http.Handler {
// if telemetry.GetInstance().IsSampled() {
if _, ok := telemetry.IgnoredPaths()[path]; !ok {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data)
userEmail, err := auth.GetEmailFromJwt(r.Context())
if err == nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data, userEmail)
}
}
// }

View File

@ -49,8 +49,8 @@ func Invite(ctx context.Context, req *model.InviteRequest) (*model.InviteRespons
return nil, errors.Wrap(err, "invalid invite request")
}
jwtAdmin, err := ExtractJwtFromContext(ctx)
if err != nil {
jwtAdmin, ok := ExtractJwtFromContext(ctx)
if !ok {
return nil, errors.Wrap(err, "failed to extract admin jwt token")
}

View File

@ -11,7 +11,6 @@ import (
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
"google.golang.org/grpc/metadata"
)
var (
@ -65,29 +64,12 @@ func AttachJwtToContext(ctx context.Context, r *http.Request) context.Context {
return ctx
}
if len(token) > 0 {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(nil)
return context.WithValue(ctx, "accessJwt", token)
}
md.Append("accessJwt", token)
ctx = metadata.NewIncomingContext(ctx, md)
}
return ctx
}
func ExtractJwtFromContext(ctx context.Context) (string, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", errors.New("No JWT metadata token found")
}
accessJwt := md.Get("accessJwt")
if len(accessJwt) == 0 {
return "", errors.New("No JWT token found")
}
return accessJwt[0], nil
func ExtractJwtFromContext(ctx context.Context) (string, bool) {
jwtToken, ok := ctx.Value("accessJwt").(string)
return jwtToken, ok
}
func ExtractJwtFromRequest(r *http.Request) (string, error) {
@ -96,9 +78,9 @@ func ExtractJwtFromRequest(r *http.Request) (string, error) {
func ExtractUserIdFromContext(ctx context.Context) (string, error) {
userId := ""
jwt, err := ExtractJwtFromContext(ctx)
if err != nil {
return "", model.InternalError(fmt.Errorf("failed to extract jwt from context %v", err))
jwt, ok := ExtractJwtFromContext(ctx)
if !ok {
return "", model.InternalError(fmt.Errorf("failed to extract jwt from context"))
}
claims, err := ParseJWT(jwt)
@ -111,3 +93,17 @@ func ExtractUserIdFromContext(ctx context.Context) (string, error) {
}
return userId, nil
}
func GetEmailFromJwt(ctx context.Context) (string, error) {
jwt, ok := ExtractJwtFromContext(ctx)
if !ok {
return "", model.InternalError(fmt.Errorf("failed to extract jwt from context"))
}
claims, err := ParseJWT(jwt)
if err != nil {
return "", model.InternalError(fmt.Errorf("failed get claims from jwt %v", err))
}
return claims["email"].(string), nil
}

View File

@ -203,7 +203,7 @@ func (mds *ModelDaoSqlite) CreateUser(ctx context.Context,
}
telemetry.GetInstance().IdentifyUser(user)
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_USER, data)
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_USER, data, user.Email)
return user, nil
}

View File

@ -40,6 +40,7 @@ const (
TELEMETRY_EVENT_QUERY_RANGE_V3 = "Query Range V3 Metadata"
TELEMETRY_EVENT_ACTIVE_USER = "Active User"
TELEMETRY_EVENT_ACTIVE_USER_PH = "Active User V2"
DEFAULT_CLOUD_EMAIL = "admin@signoz.cloud"
)
const api_key = "4Gmoa4ixJAUHx2BpJxsjwA1bEfnwEeRz"
@ -165,7 +166,7 @@ func createTelemetry() {
data := map[string]interface{}{}
telemetry.SetTelemetryEnabled(constants.IsTelemetryEnabled())
telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data)
telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data, "")
ticker := time.NewTicker(HEART_BEAT_DURATION)
activeUserTicker := time.NewTicker(ACTIVE_USER_DURATION)
@ -187,7 +188,7 @@ func createTelemetry() {
if (telemetry.activeUser["traces"] != 0) || (telemetry.activeUser["metrics"] != 0) || (telemetry.activeUser["logs"] != 0) {
telemetry.activeUser["any"] = 1
}
telemetry.SendEvent(TELEMETRY_EVENT_ACTIVE_USER, map[string]interface{}{"traces": telemetry.activeUser["traces"], "metrics": telemetry.activeUser["metrics"], "logs": telemetry.activeUser["logs"], "any": telemetry.activeUser["any"]})
telemetry.SendEvent(TELEMETRY_EVENT_ACTIVE_USER, map[string]interface{}{"traces": telemetry.activeUser["traces"], "metrics": telemetry.activeUser["metrics"], "logs": telemetry.activeUser["logs"], "any": telemetry.activeUser["any"]}, "")
telemetry.activeUser = map[string]int8{"traces": 0, "metrics": 0, "logs": 0, "any": 0}
case <-ticker.C:
@ -195,11 +196,11 @@ func createTelemetry() {
tagsInfo, _ := telemetry.reader.GetTagsInfoInLastHeartBeatInterval(context.Background())
if len(tagsInfo.Env) != 0 {
telemetry.SendEvent(TELEMETRY_EVENT_ENVIRONMENT, map[string]interface{}{"value": tagsInfo.Env})
telemetry.SendEvent(TELEMETRY_EVENT_ENVIRONMENT, map[string]interface{}{"value": tagsInfo.Env}, "")
}
for language, _ := range tagsInfo.Languages {
telemetry.SendEvent(TELEMETRY_EVENT_LANGUAGE, map[string]interface{}{"language": language})
telemetry.SendEvent(TELEMETRY_EVENT_LANGUAGE, map[string]interface{}{"language": language}, "")
}
totalSpans, _ := telemetry.reader.GetTotalSpans(context.Background())
@ -226,10 +227,10 @@ func createTelemetry() {
for key, value := range tsInfo {
data[key] = value
}
telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data)
telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data, "")
getDistributedInfoInLastHeartBeatInterval, _ := telemetry.reader.GetDistributedInfoInLastHeartBeatInterval(context.Background())
telemetry.SendEvent(TELEMETRY_EVENT_DISTRIBUTED, getDistributedInfoInLastHeartBeatInterval)
telemetry.SendEvent(TELEMETRY_EVENT_DISTRIBUTED, getDistributedInfoInLastHeartBeatInterval, "")
}
}
@ -259,7 +260,7 @@ func getOutboundIP() string {
}
func (a *Telemetry) IdentifyUser(user *model.User) {
if user.Email == "admin@admin.com" || user.Email == "admin@signoz.cloud" {
if user.Email == DEFAULT_CLOUD_EMAIL {
return
}
a.SetCompanyDomain(user.Email)
@ -334,8 +335,16 @@ func (a *Telemetry) checkEvents(event string) bool {
return sendEvent
}
func (a *Telemetry) SendEvent(event string, data map[string]interface{}, opts ...bool) {
func (a *Telemetry) SendEvent(event string, data map[string]interface{}, userEmail string, opts ...bool) {
// ignore telemetry for default user
if userEmail == DEFAULT_CLOUD_EMAIL {
return
}
if userEmail != "" {
a.SetUserEmail(userEmail)
}
rateLimitFlag := true
if len(opts) > 0 {
rateLimitFlag = opts[0]
@ -388,6 +397,11 @@ func (a *Telemetry) SendEvent(event string, data map[string]interface{}, opts ..
Event: event,
UserId: a.GetUserEmail(),
Properties: properties,
Context: &analytics.Context{
Extra: map[string]interface{}{
"groupId": a.getCompanyDomain(),
},
},
})
}

View File

@ -436,6 +436,8 @@ func (tb *LogPipelinesTestBed) PostPipelinesToQSExpectingStatusCode(
}
respWriter := httptest.NewRecorder()
ctx := auth.AttachJwtToContext(req.Context(), req)
req = req.WithContext(ctx)
tb.apiHandler.CreateLogsPipeline(respWriter, req)
response := respWriter.Result()