diff --git a/ee/query-service/app/api/license.go b/ee/query-service/app/api/license.go index 66d108d468..e382461d54 100644 --- a/ee/query-service/app/api/license.go +++ b/ee/query-service/app/api/license.go @@ -52,7 +52,6 @@ func (ah *APIHandler) listLicenses(w http.ResponseWriter, r *http.Request) { } func (ah *APIHandler) applyLicense(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() var l model.License if err := json.NewDecoder(r.Body).Decode(&l); err != nil { @@ -64,8 +63,7 @@ func (ah *APIHandler) applyLicense(w http.ResponseWriter, r *http.Request) { RespondError(w, model.BadRequest(fmt.Errorf("license key is required")), nil) return } - - license, apiError := ah.LM().Activate(ctx, l.Key) + license, apiError := ah.LM().Activate(r.Context(), l.Key) if apiError != nil { RespondError(w, apiError, nil) return diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 3d50ec5ede..bed3855f17 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -23,6 +23,7 @@ import ( "go.signoz.io/signoz/ee/query-service/constants" "go.signoz.io/signoz/ee/query-service/dao" "go.signoz.io/signoz/ee/query-service/interfaces" + "go.signoz.io/signoz/pkg/query-service/auth" baseInterface "go.signoz.io/signoz/pkg/query-service/interfaces" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" @@ -437,7 +438,10 @@ func extractQueryRangeV3Data(path string, r *http.Request) (map[string]interface telemetry.GetInstance().AddActiveLogsUser() } data["dataSources"] = dataSources - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_V3, data, true) + userEmail, err := auth.GetEmailFromJwt(r.Context()) + if err == nil { + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_V3, data, userEmail, true) + } } return data, true } @@ -458,6 +462,8 @@ func getActiveLogs(path string, r *http.Request) { func (s *Server) analyticsMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := auth.AttachJwtToContext(r.Context(), r) + r = r.WithContext(ctx) route := mux.CurrentRoute(r) path, _ := route.GetPathTemplate() @@ -475,7 +481,10 @@ func (s *Server) analyticsMiddleware(next http.Handler) http.Handler { } if _, ok := telemetry.IgnoredPaths()[path]; !ok { - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data) + userEmail, err := auth.GetEmailFromJwt(r.Context()) + if err == nil { + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data, userEmail) + } } }) diff --git a/ee/query-service/license/manager.go b/ee/query-service/license/manager.go index 7a1be5118f..dcfa8235b1 100644 --- a/ee/query-service/license/manager.go +++ b/ee/query-service/license/manager.go @@ -10,6 +10,7 @@ import ( "sync" + "go.signoz.io/signoz/pkg/query-service/auth" baseconstants "go.signoz.io/signoz/pkg/query-service/constants" validate "go.signoz.io/signoz/ee/query-service/integrations/signozio" @@ -203,7 +204,7 @@ func (lm *Manager) Validate(ctx context.Context) (reterr error) { zap.S().Errorf("License validation completed with error", reterr) atomic.AddUint64(&lm.failedAttempts, 1) telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_LICENSE_CHECK_FAILED, - map[string]interface{}{"err": reterr.Error()}) + map[string]interface{}{"err": reterr.Error()}, "") } else { zap.S().Info("License validation completed with no errors") } @@ -259,8 +260,11 @@ func (lm *Manager) Validate(ctx context.Context) (reterr error) { func (lm *Manager) Activate(ctx context.Context, key string) (licenseResponse *model.License, errResponse *model.ApiError) { defer func() { if errResponse != nil { - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_LICENSE_ACT_FAILED, - map[string]interface{}{"err": errResponse.Err.Error()}) + userEmail, err := auth.GetEmailFromJwt(ctx) + if err == nil { + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_LICENSE_ACT_FAILED, + map[string]interface{}{"err": errResponse.Err.Error()}, userEmail) + } } }() diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 7b32fbff45..82ce3fc551 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -45,6 +45,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/logs" "go.signoz.io/signoz/pkg/query-service/app/services" + "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/constants" am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" "go.signoz.io/signoz/pkg/query-service/interfaces" @@ -3606,7 +3607,10 @@ func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilter "lenFilters": lenFilters, } if lenFilters != 0 { - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data) + userEmail, err := auth.GetEmailFromJwt(ctx) + if err == nil { + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data, userEmail) + } } query := fmt.Sprintf("%s from %s.%s", constants.LogsSQLSelect, r.logsDB, r.logsTable) @@ -3646,7 +3650,10 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC "lenFilters": lenFilters, } if lenFilters != 0 { - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data) + userEmail, err := auth.GetEmailFromJwt(ctx) + if err == nil { + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data, userEmail) + } } if err != nil { @@ -3736,7 +3743,10 @@ func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.Logs "lenFilters": lenFilters, } if lenFilters != 0 { - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data) + userEmail, err := auth.GetEmailFromJwt(ctx) + if err == nil { + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LOGS_FILTERS, data, userEmail) + } } query := "" diff --git a/pkg/query-service/app/explorer/db.go b/pkg/query-service/app/explorer/db.go index 7e520abfe8..e0fbee4e51 100644 --- a/pkg/query-service/app/explorer/db.go +++ b/pkg/query-service/app/explorer/db.go @@ -144,7 +144,7 @@ func CreateView(ctx context.Context, view v3.SavedView) (string, error) { createdAt := time.Now() updatedAt := time.Now() - email, err := getEmailFromJwt(ctx) + email, err := auth.GetEmailFromJwt(ctx) if err != nil { return "", err } @@ -205,7 +205,7 @@ func UpdateView(ctx context.Context, uuid_ string, view v3.SavedView) error { return fmt.Errorf("error in marshalling explorer query data: %s", err.Error()) } - email, err := getEmailFromJwt(ctx) + email, err := auth.GetEmailFromJwt(ctx) if err != nil { return err } @@ -228,17 +228,3 @@ func DeleteView(uuid_ string) error { } return nil } - -func getEmailFromJwt(ctx context.Context) (string, error) { - jwt, err := auth.ExtractJwtFromContext(ctx) - if err != nil { - return "", err - } - - claims, err := auth.ParseJWT(jwt) - if err != nil { - return "", err - } - - return claims["email"].(string), nil -} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 824400cc51..a6f517bc25 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -1397,8 +1397,10 @@ func (aH *APIHandler) submitFeedback(w http.ResponseWriter, r *http.Request) { "email": email, "message": message, } - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_INPRODUCT_FEEDBACK, data) - + userEmail, err := auth.GetEmailFromJwt(r.Context()) + if err == nil { + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_INPRODUCT_FEEDBACK, data, userEmail) + } } func (aH *APIHandler) getTopOperations(w http.ResponseWriter, r *http.Request) { @@ -1476,8 +1478,11 @@ func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) { data := map[string]interface{}{ "number": len(*result), } + userEmail, err := auth.GetEmailFromJwt(r.Context()) + if err == nil { + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_NUMBER_OF_SERVICES, data, userEmail) + } - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_NUMBER_OF_SERVICES, data) if (data["number"] != 0) && (data["number"] != telemetry.DEFAULT_NUMBER_OF_SERVICES) { telemetry.GetInstance().AddActiveTracesUser() } @@ -1796,8 +1801,7 @@ func (aH *APIHandler) inviteUser(w http.ResponseWriter, r *http.Request) { return } - ctx := auth.AttachJwtToContext(context.Background(), r) - resp, err := auth.Invite(ctx, req) + resp, err := auth.Invite(r.Context(), req) if err != nil { RespondError(w, &model.ApiError{Err: err, Typ: model.ErrorInternal}, nil) return @@ -1822,8 +1826,7 @@ func (aH *APIHandler) getInvite(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) revokeInvite(w http.ResponseWriter, r *http.Request) { email := mux.Vars(r)["email"] - ctx := auth.AttachJwtToContext(context.Background(), r) - if err := auth.RevokeInvite(ctx, email); err != nil { + if err := auth.RevokeInvite(r.Context(), email); err != nil { RespondError(w, &model.ApiError{Err: err, Typ: model.ErrorInternal}, nil) return } @@ -2201,8 +2204,8 @@ func (aH *APIHandler) editOrg(w http.ResponseWriter, r *http.Request) { "isAnonymous": req.IsAnonymous, "organizationName": req.Name, } - - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_ORG_SETTINGS, data) + userEmail, err := auth.GetEmailFromJwt(r.Context()) + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_ORG_SETTINGS, data, userEmail) aH.WriteJSON(w, r, map[string]string{"data": "org updated successfully"}) } @@ -2365,7 +2368,6 @@ func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) { RespondError(w, apiErr, "Incorrect params") return } - res, apiErr := aH.reader.GetLogs(r.Context(), params) if apiErr != nil { RespondError(w, apiErr, "Failed to fetch logs from the DB") @@ -2426,7 +2428,6 @@ func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) { RespondError(w, apiErr, "Incorrect params") return } - res, apiErr := aH.reader.AggregateLogs(r.Context(), params) if apiErr != nil { RespondError(w, apiErr, "Failed to fetch logs aggregate from the DB") @@ -2559,8 +2560,6 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) return } - ctx := auth.AttachJwtToContext(context.Background(), r) - createPipeline := func( ctx context.Context, postable []logparsingpipeline.PostablePipeline, @@ -2578,7 +2577,7 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) return ah.LogsParsingPipelineController.ApplyPipelines(ctx, postable) } - res, err := createPipeline(ctx, req.Pipelines) + res, err := createPipeline(r.Context(), req.Pipelines) if err != nil { RespondError(w, err, nil) return @@ -2613,8 +2612,7 @@ func (aH *APIHandler) createSavedViews(w http.ResponseWriter, r *http.Request) { RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) return } - ctx := auth.AttachJwtToContext(context.Background(), r) - uuid, err := explorer.CreateView(ctx, view) + uuid, err := explorer.CreateView(r.Context(), view) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) return @@ -2648,8 +2646,7 @@ func (aH *APIHandler) updateSavedView(w http.ResponseWriter, r *http.Request) { return } - ctx := auth.AttachJwtToContext(context.Background(), r) - err = explorer.UpdateView(ctx, viewID, view) + err = explorer.UpdateView(r.Context(), viewID, view) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) return diff --git a/pkg/query-service/app/logparsingpipeline/db.go b/pkg/query-service/app/logparsingpipeline/db.go index ae4effb590..df187f0de3 100644 --- a/pkg/query-service/app/logparsingpipeline/db.go +++ b/pkg/query-service/app/logparsingpipeline/db.go @@ -55,8 +55,8 @@ func (r *Repo) insertPipeline( )) } - jwt, err := auth.ExtractJwtFromContext(ctx) - if err != nil { + jwt, ok := auth.ExtractJwtFromContext(ctx) + if !ok { return nil, model.UnauthorizedError(err) } diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index c3968e81be..7af82b072a 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -374,7 +374,10 @@ func extractQueryRangeV3Data(path string, r *http.Request) (map[string]interface telemetry.GetInstance().AddActiveLogsUser() } data["dataSources"] = dataSources - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_V3, data, true) + userEmail, err := auth.GetEmailFromJwt(r.Context()) + if err == nil { + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_QUERY_RANGE_V3, data, userEmail, true) + } } return data, true } @@ -395,6 +398,8 @@ func getActiveLogs(path string, r *http.Request) { func (s *Server) analyticsMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := auth.AttachJwtToContext(r.Context(), r) + r = r.WithContext(ctx) route := mux.CurrentRoute(r) path, _ := route.GetPathTemplate() @@ -413,7 +418,10 @@ func (s *Server) analyticsMiddleware(next http.Handler) http.Handler { // if telemetry.GetInstance().IsSampled() { if _, ok := telemetry.IgnoredPaths()[path]; !ok { - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data) + userEmail, err := auth.GetEmailFromJwt(r.Context()) + if err == nil { + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_PATH, data, userEmail) + } } // } diff --git a/pkg/query-service/auth/auth.go b/pkg/query-service/auth/auth.go index 6190e87826..6ce7c425c9 100644 --- a/pkg/query-service/auth/auth.go +++ b/pkg/query-service/auth/auth.go @@ -49,8 +49,8 @@ func Invite(ctx context.Context, req *model.InviteRequest) (*model.InviteRespons return nil, errors.Wrap(err, "invalid invite request") } - jwtAdmin, err := ExtractJwtFromContext(ctx) - if err != nil { + jwtAdmin, ok := ExtractJwtFromContext(ctx) + if !ok { return nil, errors.Wrap(err, "failed to extract admin jwt token") } diff --git a/pkg/query-service/auth/jwt.go b/pkg/query-service/auth/jwt.go index df70064c51..90e2f7008d 100644 --- a/pkg/query-service/auth/jwt.go +++ b/pkg/query-service/auth/jwt.go @@ -11,7 +11,6 @@ import ( "github.com/pkg/errors" "go.signoz.io/signoz/pkg/query-service/model" "go.uber.org/zap" - "google.golang.org/grpc/metadata" ) var ( @@ -65,29 +64,12 @@ func AttachJwtToContext(ctx context.Context, r *http.Request) context.Context { return ctx } - if len(token) > 0 { - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - md = metadata.New(nil) - } - - md.Append("accessJwt", token) - ctx = metadata.NewIncomingContext(ctx, md) - } - return ctx + return context.WithValue(ctx, "accessJwt", token) } -func ExtractJwtFromContext(ctx context.Context) (string, error) { - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return "", errors.New("No JWT metadata token found") - } - accessJwt := md.Get("accessJwt") - if len(accessJwt) == 0 { - return "", errors.New("No JWT token found") - } - - return accessJwt[0], nil +func ExtractJwtFromContext(ctx context.Context) (string, bool) { + jwtToken, ok := ctx.Value("accessJwt").(string) + return jwtToken, ok } func ExtractJwtFromRequest(r *http.Request) (string, error) { @@ -96,9 +78,9 @@ func ExtractJwtFromRequest(r *http.Request) (string, error) { func ExtractUserIdFromContext(ctx context.Context) (string, error) { userId := "" - jwt, err := ExtractJwtFromContext(ctx) - if err != nil { - return "", model.InternalError(fmt.Errorf("failed to extract jwt from context %v", err)) + jwt, ok := ExtractJwtFromContext(ctx) + if !ok { + return "", model.InternalError(fmt.Errorf("failed to extract jwt from context")) } claims, err := ParseJWT(jwt) @@ -111,3 +93,17 @@ func ExtractUserIdFromContext(ctx context.Context) (string, error) { } return userId, nil } + +func GetEmailFromJwt(ctx context.Context) (string, error) { + jwt, ok := ExtractJwtFromContext(ctx) + if !ok { + return "", model.InternalError(fmt.Errorf("failed to extract jwt from context")) + } + + claims, err := ParseJWT(jwt) + if err != nil { + return "", model.InternalError(fmt.Errorf("failed get claims from jwt %v", err)) + } + + return claims["email"].(string), nil +} diff --git a/pkg/query-service/dao/sqlite/rbac.go b/pkg/query-service/dao/sqlite/rbac.go index 07e870fdff..c28a2b675c 100644 --- a/pkg/query-service/dao/sqlite/rbac.go +++ b/pkg/query-service/dao/sqlite/rbac.go @@ -203,7 +203,7 @@ func (mds *ModelDaoSqlite) CreateUser(ctx context.Context, } telemetry.GetInstance().IdentifyUser(user) - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_USER, data) + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_USER, data, user.Email) return user, nil } diff --git a/pkg/query-service/telemetry/telemetry.go b/pkg/query-service/telemetry/telemetry.go index 9aa88a6376..10ce722af5 100644 --- a/pkg/query-service/telemetry/telemetry.go +++ b/pkg/query-service/telemetry/telemetry.go @@ -40,6 +40,7 @@ const ( TELEMETRY_EVENT_QUERY_RANGE_V3 = "Query Range V3 Metadata" TELEMETRY_EVENT_ACTIVE_USER = "Active User" TELEMETRY_EVENT_ACTIVE_USER_PH = "Active User V2" + DEFAULT_CLOUD_EMAIL = "admin@signoz.cloud" ) const api_key = "4Gmoa4ixJAUHx2BpJxsjwA1bEfnwEeRz" @@ -165,7 +166,7 @@ func createTelemetry() { data := map[string]interface{}{} telemetry.SetTelemetryEnabled(constants.IsTelemetryEnabled()) - telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data) + telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data, "") ticker := time.NewTicker(HEART_BEAT_DURATION) activeUserTicker := time.NewTicker(ACTIVE_USER_DURATION) @@ -187,7 +188,7 @@ func createTelemetry() { if (telemetry.activeUser["traces"] != 0) || (telemetry.activeUser["metrics"] != 0) || (telemetry.activeUser["logs"] != 0) { telemetry.activeUser["any"] = 1 } - telemetry.SendEvent(TELEMETRY_EVENT_ACTIVE_USER, map[string]interface{}{"traces": telemetry.activeUser["traces"], "metrics": telemetry.activeUser["metrics"], "logs": telemetry.activeUser["logs"], "any": telemetry.activeUser["any"]}) + telemetry.SendEvent(TELEMETRY_EVENT_ACTIVE_USER, map[string]interface{}{"traces": telemetry.activeUser["traces"], "metrics": telemetry.activeUser["metrics"], "logs": telemetry.activeUser["logs"], "any": telemetry.activeUser["any"]}, "") telemetry.activeUser = map[string]int8{"traces": 0, "metrics": 0, "logs": 0, "any": 0} case <-ticker.C: @@ -195,11 +196,11 @@ func createTelemetry() { tagsInfo, _ := telemetry.reader.GetTagsInfoInLastHeartBeatInterval(context.Background()) if len(tagsInfo.Env) != 0 { - telemetry.SendEvent(TELEMETRY_EVENT_ENVIRONMENT, map[string]interface{}{"value": tagsInfo.Env}) + telemetry.SendEvent(TELEMETRY_EVENT_ENVIRONMENT, map[string]interface{}{"value": tagsInfo.Env}, "") } for language, _ := range tagsInfo.Languages { - telemetry.SendEvent(TELEMETRY_EVENT_LANGUAGE, map[string]interface{}{"language": language}) + telemetry.SendEvent(TELEMETRY_EVENT_LANGUAGE, map[string]interface{}{"language": language}, "") } totalSpans, _ := telemetry.reader.GetTotalSpans(context.Background()) @@ -226,10 +227,10 @@ func createTelemetry() { for key, value := range tsInfo { data[key] = value } - telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data) + telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data, "") getDistributedInfoInLastHeartBeatInterval, _ := telemetry.reader.GetDistributedInfoInLastHeartBeatInterval(context.Background()) - telemetry.SendEvent(TELEMETRY_EVENT_DISTRIBUTED, getDistributedInfoInLastHeartBeatInterval) + telemetry.SendEvent(TELEMETRY_EVENT_DISTRIBUTED, getDistributedInfoInLastHeartBeatInterval, "") } } @@ -259,7 +260,7 @@ func getOutboundIP() string { } func (a *Telemetry) IdentifyUser(user *model.User) { - if user.Email == "admin@admin.com" || user.Email == "admin@signoz.cloud" { + if user.Email == DEFAULT_CLOUD_EMAIL { return } a.SetCompanyDomain(user.Email) @@ -334,8 +335,16 @@ func (a *Telemetry) checkEvents(event string) bool { return sendEvent } -func (a *Telemetry) SendEvent(event string, data map[string]interface{}, opts ...bool) { +func (a *Telemetry) SendEvent(event string, data map[string]interface{}, userEmail string, opts ...bool) { + // ignore telemetry for default user + if userEmail == DEFAULT_CLOUD_EMAIL { + return + } + + if userEmail != "" { + a.SetUserEmail(userEmail) + } rateLimitFlag := true if len(opts) > 0 { rateLimitFlag = opts[0] @@ -388,6 +397,11 @@ func (a *Telemetry) SendEvent(event string, data map[string]interface{}, opts .. Event: event, UserId: a.GetUserEmail(), Properties: properties, + Context: &analytics.Context{ + Extra: map[string]interface{}{ + "groupId": a.getCompanyDomain(), + }, + }, }) } diff --git a/pkg/query-service/tests/integration/logparsingpipeline_test.go b/pkg/query-service/tests/integration/logparsingpipeline_test.go index 1b370e7804..0b4d22973c 100644 --- a/pkg/query-service/tests/integration/logparsingpipeline_test.go +++ b/pkg/query-service/tests/integration/logparsingpipeline_test.go @@ -436,6 +436,8 @@ func (tb *LogPipelinesTestBed) PostPipelinesToQSExpectingStatusCode( } respWriter := httptest.NewRecorder() + ctx := auth.AttachJwtToContext(req.Context(), req) + req = req.WithContext(ctx) tb.apiHandler.CreateLogsPipeline(respWriter, req) response := respWriter.Result()