diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index b13b24a5d0..870b2bee09 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -385,6 +385,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h apiHandler.RegisterMessagingQueuesRoutes(r, am) apiHandler.RegisterThirdPartyApiRoutes(r, am) apiHandler.MetricExplorerRoutes(r, am) + apiHandler.RegisterTraceFunnelsRoutes(r, am) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, diff --git a/pkg/alertmanager/alertmanagerserver/server.go b/pkg/alertmanager/alertmanagerserver/server.go index f53be783db..7e5fd1d32f 100644 --- a/pkg/alertmanager/alertmanagerserver/server.go +++ b/pkg/alertmanager/alertmanagerserver/server.go @@ -66,7 +66,7 @@ type Server struct { func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore) (*Server, error) { server := &Server{ - logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/alertmanagerserver"), + logger: logger.With("pkg", "github.com/SigNoz/pkg/alertmanager/alertmanagerserver"), registry: registry, srvConfig: srvConfig, orgID: orgID, diff --git a/pkg/factory/registry.go b/pkg/factory/registry.go index be7d95c6d4..5a7a7bac26 100644 --- a/pkg/factory/registry.go +++ b/pkg/factory/registry.go @@ -33,7 +33,7 @@ func NewRegistry(logger *slog.Logger, services ...NamedService) (*Registry, erro } return &Registry{ - logger: logger.With("pkg", "go.signoz.io/pkg/factory"), + logger: logger.With("pkg", "github.com/SigNoz/pkg/factory"), services: m, startCh: make(chan error, 1), stopCh: make(chan error, len(services)), diff --git a/pkg/http/middleware/middleware.go b/pkg/http/middleware/middleware.go index 6313089aa4..99ae9dbb77 100644 --- a/pkg/http/middleware/middleware.go +++ b/pkg/http/middleware/middleware.go @@ -3,7 +3,7 @@ package middleware import "net/http" const ( - pkgname string = "go.signoz.io/pkg/http/middleware" + pkgname string = "github.com/SigNoz/pkg/http/middleware" ) // Wrapper is an interface implemented by all middlewares diff --git a/pkg/http/server/server.go b/pkg/http/server/server.go index 449eff28f8..32db64174c 100644 --- a/pkg/http/server/server.go +++ b/pkg/http/server/server.go @@ -38,7 +38,7 @@ func New(logger *zap.Logger, cfg Config, handler http.Handler) (*Server, error) return &Server{ srv: srv, - logger: logger.Named("go.signoz.io/pkg/http/server"), + logger: logger.Named("github.com/SigNoz/pkg/http/server"), handler: handler, cfg: cfg, }, nil diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index ab746c3ccb..9873623a99 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -18,6 +18,9 @@ import ( "text/template" "time" + "github.com/SigNoz/signoz/pkg/query-service/app/integrations/traceFunnels" + "github.com/google/uuid" + "github.com/SigNoz/signoz/pkg/alertmanager" errorsV2 "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/http/render" @@ -5562,3 +5565,546 @@ func (aH *APIHandler) getDomainInfo(w http.ResponseWriter, r *http.Request) { } aH.Respond(w, resp) } + +// RegisterTraceFunnelsRoutes adds trace funnels routes +func (aH *APIHandler) RegisterTraceFunnelsRoutes(router *mux.Router, am *AuthMiddleware) { + + // Main messaging queues router + traceFunnelsRouter := router.PathPrefix("/api/v1/trace-funnels").Subrouter() + + // API endpoints + traceFunnelsRouter.HandleFunc("/new-funnel", aH.handleNewFunnel).Methods("POST") + traceFunnelsRouter.HandleFunc("/steps/update", aH.handleUpdateFunnelStep).Methods("PUT") + traceFunnelsRouter.HandleFunc("/list", aH.handleListFunnels).Methods("GET") + traceFunnelsRouter.HandleFunc("/get/{funnel_id}", aH.handleGetFunnel).Methods("GET") + traceFunnelsRouter.HandleFunc("/delete/{funnel_id}", aH.handleDeleteFunnel).Methods("DELETE") + traceFunnelsRouter.HandleFunc("/save", aH.handleSaveFunnel).Methods("POST") + + //// Analytics endpoints + traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/validate", aH.handleValidateTraces).Methods("POST") + traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/overview", aH.handleFunnelAnalytics).Methods("POST") + traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/steps", aH.handleStepAnalytics).Methods("POST") + traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/slow-traces", func(w http.ResponseWriter, r *http.Request) { + aH.handleSlowTraces(w, r, false) + }).Methods("POST") + traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/error-traces", func(w http.ResponseWriter, r *http.Request) { + aH.handleSlowTraces(w, r, true) + }).Methods("POST") + +} + +// handleNewFunnel creates a new funnel without steps +// Steps should be added separately using the update endpoint +func (aH *APIHandler) handleNewFunnel(w http.ResponseWriter, r *http.Request) { + var req traceFunnels.NewFunnelRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + http.Error(w, "unauthenticated", http.StatusUnauthorized) + return + } + userID := claims.UserID + orgID := claims.OrgID + + // Validate timestamp is provided and in milliseconds format + if err := traceFunnels.ValidateTimestamp(req.Timestamp, "creation_timestamp"); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // Check for name collision in the SQLite database + var count int + + err := aH.Signoz.SQLStore.SQLDB().QueryRow( + "SELECT COUNT(*) FROM saved_views WHERE name = ? AND created_by = ? AND category = 'funnel'", + req.Name, userID, + ).Scan(&count) + + if err != nil { + zap.L().Error("Error checking for funnel name collision in SQLite: %v", zap.Error(err)) + } else if count > 0 { + http.Error(w, fmt.Sprintf("funnel with name '%s' already exists for user '%s' in the database", req.Name, userID), http.StatusBadRequest) + return + } + + funnel := &traceFunnels.Funnel{ + ID: uuid.New().String(), + Name: req.Name, + CreatedAt: req.Timestamp * 1000000, // Convert milliseconds to nanoseconds for internal storage + CreatedBy: userID, + OrgID: orgID, + Steps: make([]traceFunnels.FunnelStep, 0), + } + + funnelData, err := json.Marshal(funnel) + if err != nil { + http.Error(w, fmt.Sprintf("failed to marshal funnel data: %v", err), http.StatusInternalServerError) + return + } + + createdAt := time.Unix(0, funnel.CreatedAt).UTC().Format(time.RFC3339) + + // Insert new funnel + _, err = aH.Signoz.SQLStore.SQLDB().Exec( + "INSERT INTO saved_views (uuid, name, category, created_by, updated_by, source_page, data, created_at, updated_at, org_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + funnel.ID, funnel.Name, "funnel", userID, userID, "trace-funnels", string(funnelData), createdAt, createdAt, orgID, + ) + + if err != nil { + http.Error(w, fmt.Sprintf("failed to save funnel to database: %v", err), http.StatusInternalServerError) + return + } + + response := traceFunnels.NewFunnelResponse{ + ID: funnel.ID, + Name: funnel.Name, + CreatedAt: funnel.CreatedAt / 1000000, + CreatedBy: funnel.CreatedBy, + OrgID: orgID, + } + json.NewEncoder(w).Encode(response) +} + +// handleUpdateFunnelStep adds or updates steps for an existing funnel +// Steps are identified by their step_order, which must be unique within a funnel +func (aH *APIHandler) handleUpdateFunnelStep(w http.ResponseWriter, r *http.Request) { + var req traceFunnels.FunnelStepRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + claims, ok := authtypes.ClaimsFromContext(r.Context()) + if !ok { + http.Error(w, "unauthenticated", http.StatusUnauthorized) + return + } + userID := claims.UserID + + if err := traceFunnels.ValidateTimestamp(req.Timestamp, "updated_timestamp"); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + aH.Signoz.SQLStore.SQLxDB() + dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore) + + funnel, err := dbClient.GetFunnelFromDB(req.FunnelID) + if err != nil { + http.Error(w, fmt.Sprintf("funnel not found: %v", err), http.StatusNotFound) + return + } + + // Process each step in the request + for i := range req.Steps { + if req.Steps[i].StepOrder < 1 { + req.Steps[i].StepOrder = int64(i + 1) // Default to sequential ordering if not specified + } + } + + if err := traceFunnels.ValidateFunnelSteps(req.Steps); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // Normalize step orders + req.Steps = traceFunnels.NormalizeFunnelSteps(req.Steps) + + // Update the funnel with new steps + funnel.Steps = req.Steps + funnel.UpdatedAt = req.Timestamp * 1000000 + funnel.UpdatedBy = userID + + funnelData, err := json.Marshal(funnel) + if err != nil { + http.Error(w, fmt.Sprintf("failed to marshal funnel data: %v", err), http.StatusInternalServerError) + return + } + + updatedAt := time.Unix(0, funnel.UpdatedAt).UTC().Format(time.RFC3339) + + _, err = aH.Signoz.SQLStore.SQLDB().Exec( + "UPDATE saved_views SET data = ?, updated_by = ?, updated_at = ? WHERE uuid = ? AND category = 'funnel'", + string(funnelData), userID, updatedAt, req.FunnelID, + ) + if err != nil { + http.Error(w, fmt.Sprintf("failed to update funnel in database: %v", err), http.StatusInternalServerError) + return + } + + response := map[string]interface{}{ + "id": funnel.ID, + "funnel_name": funnel.Name, + "creation_timestamp": funnel.CreatedAt / 1000000, + "user_id": funnel.CreatedBy, + "org_id": funnel.OrgID, + "updated_timestamp": req.Timestamp, + "updated_by": userID, + "steps": funnel.Steps, + } + + json.NewEncoder(w).Encode(response) +} + +func (aH *APIHandler) handleListFunnels(w http.ResponseWriter, r *http.Request) { + orgID := r.URL.Query().Get("org_id") + + var dbFunnels []*traceFunnels.Funnel + var err error + + dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore) + + if orgID != "" { + dbFunnels, err = dbClient.ListFunnelsFromDB(orgID) + } else { + dbFunnels, err = dbClient.ListAllFunnelsFromDB() + } + + if err != nil { + http.Error(w, fmt.Sprintf("error fetching funnels from database: %v", err), http.StatusInternalServerError) + return + } + + // Convert to response format with additional metadata + response := make([]map[string]interface{}, 0, len(dbFunnels)) + for _, f := range dbFunnels { + funnelInfo := map[string]interface{}{ + "id": f.ID, + "funnel_name": f.Name, + "creation_timestamp": f.CreatedAt / 1000000, + "user_id": f.CreatedBy, + "org_id": f.OrgID, + } + + if f.UpdatedAt > 0 { + funnelInfo["updated_timestamp"] = f.UpdatedAt / 1000000 + } + if f.UpdatedBy != "" { + funnelInfo["updated_by"] = f.UpdatedBy + } + + var extraData, tags string + err := aH.Signoz.SQLStore.SQLDB().QueryRow( + "SELECT IFNULL(extra_data, ''), IFNULL(tags, '') FROM saved_views WHERE uuid = ? AND category = 'funnel'", + f.ID, + ).Scan(&extraData, &tags) + + if err == nil && tags != "" { + funnelInfo["tags"] = tags + } + + if err == nil && extraData != "" { + var extraDataMap map[string]interface{} + if err := json.Unmarshal([]byte(extraData), &extraDataMap); err == nil { + if description, ok := extraDataMap["description"].(string); ok { + funnelInfo["description"] = description + } + } + } + + response = append(response, funnelInfo) + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(response) +} + +func (aH *APIHandler) handleGetFunnel(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + funnelID := vars["funnel_id"] + + dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore) + funnel, err := dbClient.GetFunnelFromDB(funnelID) + if err != nil { + http.Error(w, fmt.Sprintf("funnel not found: %v", err), http.StatusNotFound) + return + } + + var extraData, tags string + + err = aH.Signoz.SQLStore.SQLDB().QueryRow( + "SELECT IFNULL(extra_data, ''), IFNULL(tags, '') FROM saved_views WHERE uuid = ? AND category = 'funnel'", + funnel.ID, + ).Scan(&extraData, &tags) + + response := map[string]interface{}{ + "id": funnel.ID, + "funnel_name": funnel.Name, + "creation_timestamp": funnel.CreatedAt / 1000000, + "user_id": funnel.CreatedBy, + "org_id": funnel.OrgID, + "steps": funnel.Steps, + } + + if funnel.UpdatedAt > 0 { + response["updated_timestamp"] = funnel.UpdatedAt / 1000000 + } + if funnel.UpdatedBy != "" { + response["updated_by"] = funnel.UpdatedBy + } + + if err == nil && tags != "" { + response["tags"] = tags + } + + if err == nil && extraData != "" { + var extraDataMap map[string]interface{} + if err := json.Unmarshal([]byte(extraData), &extraDataMap); err == nil { + if description, ok := extraDataMap["description"].(string); ok { + response["description"] = description + } + } + } + + json.NewEncoder(w).Encode(response) +} + +func (aH *APIHandler) handleDeleteFunnel(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + funnelID := vars["funnel_id"] + + dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore) + err := dbClient.DeleteFunnelFromDB(funnelID) + if err != nil { + http.Error(w, fmt.Sprintf("failed to delete funnel: %v", err), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"status": "success"}) +} + +// handleSaveFunnel saves a funnel to the SQLite database +// Only requires funnel_id and optional description +func (aH *APIHandler) handleSaveFunnel(w http.ResponseWriter, r *http.Request) { + var req traceFunnels.SaveFunnelRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore) + funnel, err := dbClient.GetFunnelFromDB(req.FunnelID) + if err != nil { + http.Error(w, fmt.Sprintf("funnel not found: %v", err), http.StatusNotFound) + return + } + + updateTimestamp := req.Timestamp + if updateTimestamp == 0 { + updateTimestamp = time.Now().UnixMilli() + } else { + if !traceFunnels.ValidateTimestampIsMilliseconds(updateTimestamp) { + http.Error(w, "timestamp must be in milliseconds format (13 digits)", http.StatusBadRequest) + return + } + } + + funnel.UpdatedAt = updateTimestamp * 1000000 // Convert ms to ns + + if req.UserID != "" { + funnel.UpdatedBy = req.UserID + } + extraData := "" + if req.Description != "" { + descriptionJSON, err := json.Marshal(map[string]string{"description": req.Description}) + if err != nil { + http.Error(w, "failed to marshal description: "+err.Error(), http.StatusInternalServerError) + return + } + extraData = string(descriptionJSON) + } + + orgID := req.OrgID + if orgID == "" { + orgID = funnel.OrgID + } + + if err := dbClient.SaveFunnel(funnel, funnel.UpdatedBy, orgID, req.Tags, extraData); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + var createdAt, updatedAt, tags, extraDataFromDB string + err = aH.Signoz.SQLStore.SQLDB().QueryRow( + "SELECT created_at, updated_at, IFNULL(tags, ''), IFNULL(extra_data, '') FROM saved_views WHERE uuid = ? AND category = 'funnel'", + funnel.ID, + ).Scan(&createdAt, &updatedAt, &tags, &extraDataFromDB) + + if err != nil { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{ + "status": "success", + "id": funnel.ID, + "name": funnel.Name, + }) + return + } + + response := map[string]string{ + "status": "success", + "id": funnel.ID, + "name": funnel.Name, + "created_at": createdAt, + "updated_at": updatedAt, + "created_by": funnel.CreatedBy, + "updated_by": funnel.UpdatedBy, + "org_id": funnel.OrgID, + } + + if tags != "" { + response["tags"] = tags + } + + if extraDataFromDB != "" { + var extraDataMap map[string]interface{} + if err := json.Unmarshal([]byte(extraDataFromDB), &extraDataMap); err == nil { + if description, ok := extraDataMap["description"].(string); ok { + response["description"] = description + } + } + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(response) +} + +func (aH *APIHandler) handleValidateTraces(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + funnelID := vars["funnel_id"] + + dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore) + funnel, err := dbClient.GetFunnelFromDB(funnelID) + if err != nil { + http.Error(w, fmt.Sprintf("funnel not found: %v", err), http.StatusNotFound) + return + } + + var timeRange traceFunnels.TimeRange + if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if len(funnel.Steps) < 2 { + http.Error(w, "funnel must have at least 2 steps", http.StatusBadRequest) + return + } + + chq, err := traceFunnels.ValidateTraces(funnel, timeRange) + + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil) + return + } + + results, err := aH.reader.GetListResultV3(r.Context(), chq.Query) + aH.Respond(w, results) +} + +// Analytics handlers +func (aH *APIHandler) handleFunnelAnalytics(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + funnelID := vars["funnel_id"] + + dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore) + + funnel, err := dbClient.GetFunnelFromDB(funnelID) + if err != nil { + http.Error(w, fmt.Sprintf("funnel not found: %v", err), http.StatusNotFound) + return + } + + var timeRange traceFunnels.TimeRange + if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + chq, err := traceFunnels.ValidateTracesWithLatency(funnel, timeRange) + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil) + return + } + + results, err := aH.reader.GetListResultV3(r.Context(), chq.Query) + aH.Respond(w, results) +} + +func (aH *APIHandler) handleStepAnalytics(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + funnelID := vars["funnel_id"] + + // Get funnel directly from SQLite database + dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore) + funnel, err := dbClient.GetFunnelFromDB(funnelID) + if err != nil { + http.Error(w, fmt.Sprintf("funnel not found: %v", err), http.StatusNotFound) + return + } + + var timeRange traceFunnels.TimeRange + if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + chq, err := traceFunnels.GetStepAnalytics(funnel, timeRange) + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil) + return + } + + results, err := aH.reader.GetListResultV3(r.Context(), chq.Query) + aH.Respond(w, results) +} + +func (aH *APIHandler) handleSlowTraces(w http.ResponseWriter, r *http.Request, withErrors bool) { + vars := mux.Vars(r) + funnelID := vars["funnel_id"] + + dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore) + funnel, err := dbClient.GetFunnelFromDB(funnelID) + if err != nil { + http.Error(w, fmt.Sprintf("funnel not found: %v", err), http.StatusNotFound) + return + } + + var req traceFunnels.StepTransitionRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + stepAExists, stepBExists := false, false + for _, step := range funnel.Steps { + if step.StepOrder == req.StepAOrder { + stepAExists = true + } + if step.StepOrder == req.StepBOrder { + stepBExists = true + } + } + + if !stepAExists || !stepBExists { + http.Error(w, fmt.Sprintf("One or both steps not found. Step A Order: %d, Step B Order: %d", req.StepAOrder, req.StepBOrder), http.StatusBadRequest) + return + } + + chq, err := traceFunnels.GetSlowestTraces(funnel, req.StepAOrder, req.StepBOrder, req.TimeRange, withErrors) + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil) + return + } + + results, err := aH.reader.GetListResultV3(r.Context(), chq.Query) + aH.Respond(w, results) +} diff --git a/pkg/query-service/app/integrations/traceFunnels/funnel.go b/pkg/query-service/app/integrations/traceFunnels/funnel.go new file mode 100644 index 0000000000..31bc58fa12 --- /dev/null +++ b/pkg/query-service/app/integrations/traceFunnels/funnel.go @@ -0,0 +1,145 @@ +package traceFunnels + +import ( + "fmt" + "sort" + "sync" + + "github.com/google/uuid" +) + +type FunnelStore struct { + sync.RWMutex + funnels map[string]*Funnel +} + +func (s *FunnelStore) CreateFunnel(name, userID, orgID string, timestamp int64) (*Funnel, error) { + s.Lock() + defer s.Unlock() + + for _, existingFunnel := range s.funnels { + if existingFunnel.Name == name && existingFunnel.CreatedBy == userID { + return nil, fmt.Errorf("funnel with name '%s' already exists for user '%s'", name, userID) + } + } + + if timestamp == 0 { + return nil, fmt.Errorf("timestamp is required") + } + + funnel := &Funnel{ + ID: uuid.New().String(), + Name: name, + CreatedAt: timestamp * 1000000, // Convert milliseconds to nanoseconds for internal storage + CreatedBy: userID, + OrgID: orgID, + Steps: make([]FunnelStep, 0), + } + + s.funnels[funnel.ID] = funnel + return funnel, nil +} + +func (s *FunnelStore) GetFunnel(id string) (*Funnel, error) { + s.RLock() + defer s.RUnlock() + + funnel, ok := s.funnels[id] + if !ok { + return nil, fmt.Errorf("funnel not found") + } + + return funnel, nil +} + +func (s *FunnelStore) ListFunnels() []*Funnel { + s.RLock() + defer s.RUnlock() + + funnels := make([]*Funnel, 0, len(s.funnels)) + for _, funnel := range s.funnels { + funnels = append(funnels, funnel) + } + + return funnels +} + +func (s *FunnelStore) UpdateFunnelSteps(id string, steps []FunnelStep, updatedBy string, updatedAt int64) error { + s.Lock() + defer s.Unlock() + + funnel, ok := s.funnels[id] + if !ok { + return fmt.Errorf("funnel with ID %s not found", id) + } + + funnel.Steps = steps + funnel.UpdatedAt = updatedAt * 1000000 + funnel.UpdatedBy = updatedBy + + return nil +} + +// DeleteFunnel removes a funnel from the in-memory store +func (s *FunnelStore) DeleteFunnel(id string) error { + s.Lock() + defer s.Unlock() + + if _, ok := s.funnels[id]; !ok { + return fmt.Errorf("funnel with ID %s not found", id) + } + + delete(s.funnels, id) + return nil +} + +// ValidateFunnelSteps validates funnel steps and ensures they have unique and correct order +// Rules: At least 2 steps, max 3 steps, orders must be unique and include 1 and 2 +func ValidateFunnelSteps(steps []FunnelStep) error { + if len(steps) < 2 { + return fmt.Errorf("at least 2 funnel steps are required") + } + + if len(steps) > 3 { + return fmt.Errorf("maximum 3 funnel steps are allowed") + } + + orderMap := make(map[int64]bool) + + for _, step := range steps { + if orderMap[step.StepOrder] { + return fmt.Errorf("duplicate step order: %d", step.StepOrder) + } + orderMap[step.StepOrder] = true + + if step.StepOrder < 1 || step.StepOrder > 3 { + return fmt.Errorf("step order must be between 1 and 3, got: %d", step.StepOrder) + } + } + + if !orderMap[1] || !orderMap[2] { + return fmt.Errorf("funnel steps with orders 1 and 2 are mandatory") + } + + return nil +} + +// NormalizeFunnelSteps ensures steps have sequential orders starting from 1 +// This sorts steps by order and then reassigns orders to be sequential +func NormalizeFunnelSteps(steps []FunnelStep) []FunnelStep { + // Create a copy of the input slice + sortedSteps := make([]FunnelStep, len(steps)) + copy(sortedSteps, steps) + + // Sort using Go's built-in sort.Slice function + sort.Slice(sortedSteps, func(i, j int) bool { + return sortedSteps[i].StepOrder < sortedSteps[j].StepOrder + }) + + // Normalize orders to be sequential starting from 1 + for i := 0; i < len(sortedSteps); i++ { + sortedSteps[i].StepOrder = int64(i + 1) + } + + return sortedSteps +} diff --git a/pkg/query-service/app/integrations/traceFunnels/model.go b/pkg/query-service/app/integrations/traceFunnels/model.go new file mode 100644 index 0000000000..928e76cc8f --- /dev/null +++ b/pkg/query-service/app/integrations/traceFunnels/model.go @@ -0,0 +1,81 @@ +package traceFunnels + +import v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" + +type Funnel struct { + ID string `json:"id"` + Name string `json:"funnel_name"` + CreatedAt int64 `json:"creation_timestamp"` + CreatedBy string `json:"user_id"` + OrgID string `json:"org_id"` + UpdatedAt int64 `json:"updated_timestamp,omitempty"` + UpdatedBy string `json:"updated_by,omitempty"` + Steps []FunnelStep `json:"steps"` +} + +// FunnelStep Models +type FunnelStep struct { + StepOrder int64 `json:"step_order"` // Order of the step in the funnel (1-based) + ServiceName string `json:"service_name"` // Service name for the span + SpanName string `json:"span_name"` // Span name to match + Filters *v3.FilterSet `json:"filters"` // Additional SQL filters + LatencyPointer string `json:"latency_pointer"` // "start" or "end" + LatencyType string `json:"latency_type"` // "p99", "p95", "p90" + HasErrors bool `json:"has_errors"` // Whether to include error spans +} + +// NewFunnelRequest Request/Response structures +// NewFunnelRequest is used to create a new funnel without steps +// Steps should be added separately using the update endpoint +type NewFunnelRequest struct { + Name string `json:"funnel_name"` + Timestamp int64 `json:"creation_timestamp"` // Unix milliseconds timestamp +} + +type NewFunnelResponse struct { + ID string `json:"funnel_id"` + Name string `json:"funnel_name"` + CreatedAt int64 `json:"creation_timestamp"` + CreatedBy string `json:"user_id"` + OrgID string `json:"org_id"` +} + +type FunnelListResponse struct { + ID string `json:"id"` + Name string `json:"funnel_name"` + CreatedAt int64 `json:"creation_timestamp"` // Unix nano timestamp + CreatedBy string `json:"user_id"` + OrgID string `json:"org_id,omitempty"` +} + +// FunnelStepRequest is used to add or update steps for an existing funnel +type FunnelStepRequest struct { + FunnelID string `json:"funnel_id"` + Steps []FunnelStep `json:"steps"` + Timestamp int64 `json:"updated_timestamp"` // Unix milliseconds timestamp for update time +} + +// TimeRange Analytics request/response types +type TimeRange struct { + StartTime int64 `json:"start_time"` // Unix nano + EndTime int64 `json:"end_time"` // Unix nano +} + +type StepTransitionRequest struct { + TimeRange + StepAOrder int64 `json:"step_a_order"` // First step in transition + StepBOrder int64 `json:"step_b_order"` // Second step in transition +} + +type ValidTracesResponse struct { + TraceIDs []string `json:"trace_ids"` +} + +type FunnelAnalytics struct { + TotalStart int64 `json:"total_start"` + TotalComplete int64 `json:"total_complete"` + ErrorCount int64 `json:"error_count"` + AvgDurationMs float64 `json:"avg_duration_ms"` + P99LatencyMs float64 `json:"p99_latency_ms"` + ConversionRate float64 `json:"conversion_rate"` +} diff --git a/pkg/query-service/app/integrations/traceFunnels/query.go b/pkg/query-service/app/integrations/traceFunnels/query.go new file mode 100644 index 0000000000..7f63e9d228 --- /dev/null +++ b/pkg/query-service/app/integrations/traceFunnels/query.go @@ -0,0 +1,474 @@ +package traceFunnels + +import ( + "fmt" + "strings" + + v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" +) + +// TracesTable is the ClickHouse table name for traces +const TracesTable = "signoz_traces.signoz_index_v3" + +// StepAnalytics represents analytics data for a single step in a funnel +type StepAnalytics struct { + StepOrder int64 `json:"stepOrder"` + TotalSpans int64 `json:"totalSpans"` + ErroredSpans int64 `json:"erroredSpans"` + AvgDurationMs string `json:"avgDurationMs"` +} + +// FunnelStepFilter represents filters for a single step in the funnel +type FunnelStepFilter struct { + StepNumber int + ServiceName string + SpanName string + LatencyPointer string // "start" or "end" + CustomFilters *v3.FilterSet +} + +// SlowTrace represents a trace with its duration and span count +type SlowTrace struct { + TraceID string `json:"traceId"` + DurationMs string `json:"durationMs"` + SpanCount int64 `json:"spanCount"` +} + +// ValidateTraces parses the Funnel and builds a query to validate traces +func ValidateTraces(funnel *Funnel, timeRange TimeRange) (*v3.ClickHouseQuery, error) { + filters, err := buildFunnelFilters(funnel) + if err != nil { + return nil, fmt.Errorf("error building funnel filters: %w", err) + } + + query := generateFunnelSQL(timeRange.StartTime, timeRange.EndTime, filters) + + return &v3.ClickHouseQuery{ + Query: query, + }, nil +} + +// ValidateTracesWithLatency builds a query that considers the latency pointer for trace calculations +func ValidateTracesWithLatency(funnel *Funnel, timeRange TimeRange) (*v3.ClickHouseQuery, error) { + filters, err := buildFunnelFiltersWithLatency(funnel) + if err != nil { + return nil, fmt.Errorf("error building funnel filters with latency: %w", err) + } + + query := generateFunnelSQLWithLatency(timeRange.StartTime, timeRange.EndTime, filters) + + return &v3.ClickHouseQuery{ + Query: query, + }, nil +} + +// buildFunnelFilters extracts filters from funnel steps (without latency pointer) +func buildFunnelFilters(funnel *Funnel) ([]FunnelStepFilter, error) { + if funnel == nil { + return nil, fmt.Errorf("funnel cannot be nil") + } + + if len(funnel.Steps) == 0 { + return nil, fmt.Errorf("funnel must have at least one step") + } + + filters := make([]FunnelStepFilter, len(funnel.Steps)) + + for i, step := range funnel.Steps { + filters[i] = FunnelStepFilter{ + StepNumber: i + 1, + ServiceName: step.ServiceName, + SpanName: step.SpanName, + CustomFilters: step.Filters, + } + } + + return filters, nil +} + +// buildFunnelFiltersWithLatency extracts filters including the latency pointer +func buildFunnelFiltersWithLatency(funnel *Funnel) ([]FunnelStepFilter, error) { + if funnel == nil { + return nil, fmt.Errorf("funnel cannot be nil") + } + + if len(funnel.Steps) == 0 { + return nil, fmt.Errorf("funnel must have at least one step") + } + + filters := make([]FunnelStepFilter, len(funnel.Steps)) + + for i, step := range funnel.Steps { + latencyPointer := "start" // Default value + if step.LatencyPointer != "" { + latencyPointer = step.LatencyPointer + } + + filters[i] = FunnelStepFilter{ + StepNumber: i + 1, + ServiceName: step.ServiceName, + SpanName: step.SpanName, + LatencyPointer: latencyPointer, + CustomFilters: step.Filters, + } + } + + return filters, nil +} + +// escapeString escapes a string for safe use in SQL queries +func escapeString(s string) string { + // Replace single quotes with double single quotes to escape them in SQL + return strings.ReplaceAll(s, "'", "''") +} + +// generateFunnelSQL builds the ClickHouse SQL query for funnel validation +func generateFunnelSQL(start, end int64, filters []FunnelStepFilter) string { + var expressions []string + + // Basic time expressions. + expressions = append(expressions, fmt.Sprintf("toUInt64(%d) AS start_time", start)) + expressions = append(expressions, fmt.Sprintf("toUInt64(%d) AS end_time", end)) + expressions = append(expressions, "toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart") + expressions = append(expressions, "toString(intDiv(end_time, 1000000000)) AS tsBucketEnd") + + // Add service and span alias definitions from each filter. + for _, f := range filters { + expressions = append(expressions, fmt.Sprintf("'%s' AS service_%d", escapeString(f.ServiceName), f.StepNumber)) + expressions = append(expressions, fmt.Sprintf("'%s' AS span_%d", escapeString(f.SpanName), f.StepNumber)) + } + + // Add the CTE for each step. + for _, f := range filters { + cte := fmt.Sprintf(`step%d_traces AS ( + SELECT DISTINCT trace_id + FROM %s + WHERE serviceName = service_%d + AND name = span_%d + AND timestamp BETWEEN toString(start_time) AND toString(end_time) + AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd +)`, f.StepNumber, TracesTable, f.StepNumber, f.StepNumber) + expressions = append(expressions, cte) + } + + // Join all expressions with commas and newlines + withClause := "WITH \n" + strings.Join(expressions, ",\n") + "\n" + + // Build the intersect clause for each step. + var intersectQueries []string + for _, f := range filters { + intersectQueries = append(intersectQueries, fmt.Sprintf("SELECT trace_id FROM step%d_traces", f.StepNumber)) + } + intersectClause := strings.Join(intersectQueries, "\nINTERSECT\n") + + query := withClause + ` +SELECT trace_id +FROM ` + TracesTable + ` +WHERE trace_id IN ( + ` + intersectClause + ` +) + AND timestamp BETWEEN toString(start_time) AND toString(end_time) + AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd +GROUP BY trace_id +LIMIT 5 +` + return query +} + +// generateFunnelSQLWithLatency correctly applies latency pointer logic for trace duration +func generateFunnelSQLWithLatency(start, end int64, filters []FunnelStepFilter) string { + var expressions []string + + // Define the base time variables + expressions = append(expressions, fmt.Sprintf("toUInt64(%d) AS start_time", start)) + expressions = append(expressions, fmt.Sprintf("toUInt64(%d) AS end_time", end)) + expressions = append(expressions, "toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart") + expressions = append(expressions, "toString(intDiv(end_time, 1000000000)) AS tsBucketEnd") + expressions = append(expressions, "(end_time - start_time) / 1e9 AS total_time_seconds") + + // Define service, span, and latency pointer mappings + for _, f := range filters { + expressions = append(expressions, fmt.Sprintf("('%s', '%s', '%s') AS s%d_config", + escapeString(f.ServiceName), + escapeString(f.SpanName), + escapeString(f.LatencyPointer), + f.StepNumber)) + } + + // Construct the WITH clause + withClause := "WITH \n" + strings.Join(expressions, ",\n") + "\n" + + // Latency calculation logic + var latencyCases []string + for _, f := range filters { + if f.LatencyPointer == "end" { + latencyCases = append(latencyCases, fmt.Sprintf(` + WHEN (resource_string_service$$name, name) = (s%d_config.1, s%d_config.2) + THEN toUnixTimestamp64Nano(timestamp) + duration_nano`, f.StepNumber, f.StepNumber)) + } else { + latencyCases = append(latencyCases, fmt.Sprintf(` + WHEN (resource_string_service$$name, name) = (s%d_config.1, s%d_config.2) + THEN toUnixTimestamp64Nano(timestamp)`, f.StepNumber, f.StepNumber)) + } + } + + latencyComputation := fmt.Sprintf(` + MAX( + CASE %s + ELSE toUnixTimestamp64Nano(timestamp) + END + ) - + MIN( + CASE %s + ELSE toUnixTimestamp64Nano(timestamp) + END + ) AS trace_duration`, strings.Join(latencyCases, ""), strings.Join(latencyCases, "")) + + query := withClause + ` +SELECT + COUNT(DISTINCT CASE WHEN in_funnel_s1 = 1 THEN trace_id END) AS total_s1, + COUNT(DISTINCT CASE WHEN in_funnel_s3 = 1 THEN trace_id END) AS total_s3, + COUNT(DISTINCT CASE WHEN in_funnel_s3 = 1 THEN trace_id END) / total_time_seconds AS avg_rate, + COUNT(DISTINCT CASE WHEN in_funnel_s3 = 1 AND has_error = true THEN trace_id END) AS errors, + avg(trace_duration) AS avg_duration, + quantile(0.99)(trace_duration) AS p99_latency, + 100 - ( + (COUNT(DISTINCT CASE WHEN in_funnel_s1 = 1 THEN trace_id END) - + COUNT(DISTINCT CASE WHEN in_funnel_s3 = 1 THEN trace_id END)) + / NULLIF(COUNT(DISTINCT CASE WHEN in_funnel_s1 = 1 THEN trace_id END), 0) * 100 + ) AS conversion_rate +FROM ( + SELECT + trace_id, + ` + latencyComputation + `, + MAX(has_error) AS has_error, + MAX(CASE WHEN (resource_string_service$$name, name) = (s1_config.1, s1_config.2) THEN 1 ELSE 0 END) AS in_funnel_s1, + MAX(CASE WHEN (resource_string_service$$name, name) = (s3_config.1, s3_config.2) THEN 1 ELSE 0 END) AS in_funnel_s3 + FROM ` + TracesTable + ` + WHERE timestamp BETWEEN toString(start_time) AND toString(end_time) + AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd + AND (resource_string_service$$name, name) IN (` + generateFilterConditions(filters) + `) + GROUP BY trace_id +) AS trace_metrics; +` + return query +} + +// generateFilterConditions creates the filtering conditions dynamically +func generateFilterConditions(filters []FunnelStepFilter) string { + var conditions []string + for _, f := range filters { + conditions = append(conditions, fmt.Sprintf("(s%d_config.1, s%d_config.2)", f.StepNumber, f.StepNumber)) + } + return strings.Join(conditions, ", ") +} + +// GetStepAnalytics builds a query to get analytics for each step in a funnel +func GetStepAnalytics(funnel *Funnel, timeRange TimeRange) (*v3.ClickHouseQuery, error) { + if len(funnel.Steps) == 0 { + return nil, fmt.Errorf("funnel has no steps") + } + + // Build funnel steps array + var steps []string + for _, step := range funnel.Steps { + steps = append(steps, fmt.Sprintf("('%s', '%s')", + escapeString(step.ServiceName), escapeString(step.SpanName))) + } + stepsArray := fmt.Sprintf("array(%s)", strings.Join(steps, ",")) + + // Build step CTEs + var stepCTEs []string + for i, step := range funnel.Steps { + filterStr := "" + if step.Filters != nil && len(step.Filters.Items) > 0 { + // This is a placeholder - in a real implementation, you would convert + // the filter set to a SQL WHERE clause string + filterStr = "/* Custom filters would be applied here */" + } + + cte := fmt.Sprintf(` + step%d_traces AS ( + SELECT DISTINCT trace_id + FROM %s + WHERE resource_string_service$$name = '%s' + AND name = '%s' + AND timestamp BETWEEN toString(start_time) AND toString(end_time) + AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd + %s + )`, + i+1, + TracesTable, + escapeString(step.ServiceName), + escapeString(step.SpanName), + filterStr, + ) + stepCTEs = append(stepCTEs, cte) + } + + // Build intersecting traces CTE + var intersections []string + for i := 1; i <= len(funnel.Steps); i++ { + intersections = append(intersections, fmt.Sprintf("SELECT trace_id FROM step%d_traces", i)) + } + intersectingTracesCTE := fmt.Sprintf(` + intersecting_traces AS ( + %s + )`, + strings.Join(intersections, "\nINTERSECT\n"), + ) + + // Build CASE expressions for each step + var caseExpressions []string + for i, step := range funnel.Steps { + totalSpansExpr := fmt.Sprintf(` + COUNT(CASE WHEN resource_string_service$$name = '%s' + AND name = '%s' + THEN trace_id END) AS total_s%d_spans`, + escapeString(step.ServiceName), escapeString(step.SpanName), i+1) + + erroredSpansExpr := fmt.Sprintf(` + COUNT(CASE WHEN resource_string_service$$name = '%s' + AND name = '%s' + AND has_error = true + THEN trace_id END) AS total_s%d_errored_spans`, + escapeString(step.ServiceName), escapeString(step.SpanName), i+1) + + caseExpressions = append(caseExpressions, totalSpansExpr, erroredSpansExpr) + } + + query := fmt.Sprintf(` + WITH + toUInt64(%d) AS start_time, + toUInt64(%d) AS end_time, + toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart, + toString(intDiv(end_time, 1000000000)) AS tsBucketEnd, + %s AS funnel_steps, + %s, + %s + SELECT + %s + FROM %s + WHERE trace_id IN (SELECT trace_id FROM intersecting_traces) + AND timestamp BETWEEN toString(start_time) AND toString(end_time) + AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd`, + timeRange.StartTime, + timeRange.EndTime, + stepsArray, + strings.Join(stepCTEs, ",\n"), + intersectingTracesCTE, + strings.Join(caseExpressions, ",\n "), + TracesTable, + ) + + return &v3.ClickHouseQuery{ + Query: query, + }, nil +} + +// buildFilters converts a step's filters to a SQL WHERE clause string +func buildFilters(step FunnelStep) string { + if step.Filters == nil || len(step.Filters.Items) == 0 { + return "" + } + + // This is a placeholder - in a real implementation, you would convert + // the filter set to a SQL WHERE clause string + return "/* Custom filters would be applied here */" +} + +// GetSlowestTraces builds a query to get the slowest traces for a transition between two steps +func GetSlowestTraces(funnel *Funnel, stepAOrder int64, stepBOrder int64, timeRange TimeRange, withErrors bool) (*v3.ClickHouseQuery, error) { + // Find steps by order + var stepA, stepB *FunnelStep + for i := range funnel.Steps { + if funnel.Steps[i].StepOrder == stepAOrder { + stepA = &funnel.Steps[i] + } + if funnel.Steps[i].StepOrder == stepBOrder { + stepB = &funnel.Steps[i] + } + } + + if stepA == nil || stepB == nil { + return nil, fmt.Errorf("step not found") + } + + // Build having clause based on withErrors flag + havingClause := "" + if withErrors { + havingClause = "HAVING has_error = 1" + } + + // Build filter strings for each step + stepAFilters := "" + if stepA.Filters != nil && len(stepA.Filters.Items) > 0 { + // This is a placeholder - in a real implementation, you would convert + // the filter set to a SQL WHERE clause string + stepAFilters = "/* Custom filters for step A would be applied here */" + } + + stepBFilters := "" + if stepB.Filters != nil && len(stepB.Filters.Items) > 0 { + // This is a placeholder - in a real implementation, you would convert + // the filter set to a SQL WHERE clause string + stepBFilters = "/* Custom filters for step B would be applied here */" + } + + query := fmt.Sprintf(` + WITH + toUInt64(%d) AS start_time, + toUInt64(%d) AS end_time, + toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart, + toString(intDiv(end_time, 1000000000)) AS tsBucketEnd + SELECT + trace_id, + concat(toString((max_end_time_ns - min_start_time_ns) / 1e6), ' ms') AS duration_ms, + COUNT(*) AS span_count + FROM ( + SELECT + s1.trace_id, + MIN(toUnixTimestamp64Nano(s1.timestamp)) AS min_start_time_ns, + MAX(toUnixTimestamp64Nano(s2.timestamp) + s2.duration_nano) AS max_end_time_ns, + MAX(s1.has_error OR s2.has_error) AS has_error + FROM %s AS s1 + JOIN %s AS s2 + ON s1.trace_id = s2.trace_id + WHERE s1.resource_string_service$$name = '%s' + AND s1.name = '%s' + AND s2.resource_string_service$$name = '%s' + AND s2.name = '%s' + AND s1.timestamp BETWEEN toString(start_time) AND toString(end_time) + AND s1.ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd + AND s2.timestamp BETWEEN toString(start_time) AND toString(end_time) + AND s2.ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd + %s + %s + GROUP BY s1.trace_id + %s + ) AS trace_durations + JOIN %s AS spans + ON spans.trace_id = trace_durations.trace_id + WHERE spans.timestamp BETWEEN toString(start_time) AND toString(end_time) + AND spans.ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd + GROUP BY trace_id, duration_ms + ORDER BY CAST(replaceRegexpAll(duration_ms, ' ms$', '') AS Float64) DESC + LIMIT 5`, + timeRange.StartTime, + timeRange.EndTime, + TracesTable, + TracesTable, + escapeString(stepA.ServiceName), + escapeString(stepA.SpanName), + escapeString(stepB.ServiceName), + escapeString(stepB.SpanName), + stepAFilters, + stepBFilters, + havingClause, + TracesTable, + ) + + return &v3.ClickHouseQuery{ + Query: query, + }, nil +} diff --git a/pkg/query-service/app/integrations/traceFunnels/store.go b/pkg/query-service/app/integrations/traceFunnels/store.go new file mode 100644 index 0000000000..1a48fcb025 --- /dev/null +++ b/pkg/query-service/app/integrations/traceFunnels/store.go @@ -0,0 +1,206 @@ +package traceFunnels + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "time" + + "github.com/SigNoz/signoz/pkg/sqlstore" + "github.com/SigNoz/signoz/pkg/types" +) + +// SQLClient handles persistence of funnels to the database +type SQLClient struct { + store sqlstore.SQLStore +} + +// NewSQLClient creates a new SQL client +func NewSQLClient(store sqlstore.SQLStore) (*SQLClient, error) { + return &SQLClient{store: store}, nil +} + +// SaveFunnelRequest is used to save a funnel to the database +type SaveFunnelRequest struct { + FunnelID string `json:"funnel_id"` // Required: ID of the funnel to save + UserID string `json:"user_id,omitempty"` // Optional: will use existing user ID if not provided + OrgID string `json:"org_id,omitempty"` // Optional: will use existing org ID if not provided + Tags string `json:"tags,omitempty"` // Optional: comma-separated tags + Description string `json:"description,omitempty"` // Optional: human-readable description + Timestamp int64 `json:"timestamp,omitempty"` // Optional: timestamp for update in milliseconds (uses current time if not provided) +} + +// SaveFunnel saves a funnel to the database in the saved_views table +// Handles both creating new funnels and updating existing ones +func (c *SQLClient) SaveFunnel(funnel *Funnel, userID, orgID string, tags, extraData string) error { + ctx := context.Background() + db := c.store.BunDB() + + // Convert funnel to JSON for storage + funnelData, err := json.Marshal(funnel) + if err != nil { + return fmt.Errorf("failed to marshal funnel data: %v", err) + } + + // Format timestamps as RFC3339 + // Convert nanoseconds to milliseconds for display, then to time.Time for formatting + createdAt := time.Unix(0, funnel.CreatedAt).UTC().Format(time.RFC3339) + updatedAt := createdAt + updatedBy := userID + + // If funnel has update metadata, use it + if funnel.UpdatedAt > 0 { + updatedAt = time.Unix(0, funnel.UpdatedAt).UTC().Format(time.RFC3339) + } + + if funnel.UpdatedBy != "" { + updatedBy = funnel.UpdatedBy + } + + // Check if the funnel already exists + var count int + var existingCreatedBy string + var existingCreatedAt string + err = db.NewRaw("SELECT COUNT(*), IFNULL(created_by, ''), IFNULL(created_at, '') FROM saved_views WHERE uuid = ? AND category = 'funnel'", funnel.ID). + Scan(ctx, &count, &existingCreatedBy, &existingCreatedAt) + if err != nil { + return fmt.Errorf("failed to check if funnel exists: %v", err) + } + + if count > 0 { + // Update existing funnel - preserve created_by and created_at + _, err = db.NewRaw( + "UPDATE saved_views SET name = ?, data = ?, updated_by = ?, updated_at = ?, tags = ?, extra_data = ? WHERE uuid = ? AND category = 'funnel'", + funnel.Name, string(funnelData), updatedBy, updatedAt, tags, extraData, funnel.ID, + ).Exec(ctx) + if err != nil { + return fmt.Errorf("failed to update funnel: %v", err) + } + } else { + // Insert new funnel - set both created and updated fields + savedView := &types.SavedView{ + TimeAuditable: types.TimeAuditable{ + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + UserAuditable: types.UserAuditable{ + CreatedBy: userID, + UpdatedBy: updatedBy, + }, + UUID: funnel.ID, + Name: funnel.Name, + Category: "funnel", + SourcePage: "trace-funnels", + OrgID: orgID, + Tags: tags, + Data: string(funnelData), + ExtraData: extraData, + } + + _, err = db.NewInsert().Model(savedView).Exec(ctx) + if err != nil { + return fmt.Errorf("failed to insert funnel: %v", err) + } + } + + return nil +} + +// GetFunnelFromDB retrieves a funnel from the database +func (c *SQLClient) GetFunnelFromDB(funnelID string) (*Funnel, error) { + ctx := context.Background() + db := c.store.BunDB() + + var savedView types.SavedView + err := db.NewSelect(). + Model(&savedView). + Where("uuid = ? AND category = 'funnel'", funnelID). + Scan(ctx) + + if err != nil { + if err == sql.ErrNoRows { + return nil, fmt.Errorf("funnel not found") + } + return nil, fmt.Errorf("failed to get funnel: %v", err) + } + + var funnel Funnel + if err := json.Unmarshal([]byte(savedView.Data), &funnel); err != nil { + return nil, fmt.Errorf("failed to unmarshal funnel data: %v", err) + } + + return &funnel, nil +} + +// ListFunnelsFromDB lists all funnels from the database +func (c *SQLClient) ListFunnelsFromDB(orgID string) ([]*Funnel, error) { + ctx := context.Background() + db := c.store.BunDB() + + var savedViews []types.SavedView + err := db.NewSelect(). + Model(&savedViews). + Where("category = 'funnel' AND org_id = ?", orgID). + Scan(ctx) + + if err != nil { + return nil, fmt.Errorf("failed to list funnels: %v", err) + } + + var funnels []*Funnel + for _, view := range savedViews { + var funnel Funnel + if err := json.Unmarshal([]byte(view.Data), &funnel); err != nil { + return nil, fmt.Errorf("failed to unmarshal funnel data: %v", err) + } + + funnels = append(funnels, &funnel) + } + + return funnels, nil +} + +// ListAllFunnelsFromDB lists all funnels from the database without org_id filter +func (c *SQLClient) ListAllFunnelsFromDB() ([]*Funnel, error) { + ctx := context.Background() + db := c.store.BunDB() + + var savedViews []types.SavedView + err := db.NewSelect(). + Model(&savedViews). + Where("category = 'funnel'"). + Scan(ctx) + + if err != nil { + return nil, fmt.Errorf("failed to list all funnels: %v", err) + } + + var funnels []*Funnel + for _, view := range savedViews { + var funnel Funnel + if err := json.Unmarshal([]byte(view.Data), &funnel); err != nil { + return nil, fmt.Errorf("failed to unmarshal funnel data: %v", err) + } + + funnels = append(funnels, &funnel) + } + + return funnels, nil +} + +// DeleteFunnelFromDB deletes a funnel from the database +func (c *SQLClient) DeleteFunnelFromDB(funnelID string) error { + ctx := context.Background() + db := c.store.BunDB() + + _, err := db.NewDelete(). + Model(&types.SavedView{}). + Where("uuid = ? AND category = 'funnel'", funnelID). + Exec(ctx) + + if err != nil { + return fmt.Errorf("failed to delete funnel: %v", err) + } + return nil +} diff --git a/pkg/query-service/app/integrations/traceFunnels/utils.go b/pkg/query-service/app/integrations/traceFunnels/utils.go new file mode 100644 index 0000000000..96eda4a786 --- /dev/null +++ b/pkg/query-service/app/integrations/traceFunnels/utils.go @@ -0,0 +1,32 @@ +package traceFunnels + +import ( + "fmt" + "strconv" +) + +// ValidateTimestampIsMilliseconds checks if a timestamp is likely in milliseconds format +func ValidateTimestampIsMilliseconds(timestamp int64) bool { + // If timestamp is 0, it's not valid + if timestamp == 0 { + return false + } + + timestampStr := strconv.FormatInt(timestamp, 10) + + return len(timestampStr) >= 12 && len(timestampStr) <= 14 +} + +// ValidateTimestamp checks if a timestamp is provided and in milliseconds format +// Returns an error if validation fails +func ValidateTimestamp(timestamp int64, fieldName string) error { + if timestamp == 0 { + return fmt.Errorf("%s is required", fieldName) + } + + if !ValidateTimestampIsMilliseconds(timestamp) { + return fmt.Errorf("%s must be in milliseconds format (13 digits)", fieldName) + } + + return nil +} diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index f6d5bd2d7a..209ffdfaec 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -332,6 +332,7 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server, api.RegisterMessagingQueuesRoutes(r, am) api.RegisterThirdPartyApiRoutes(r, am) api.MetricExplorerRoutes(r, am) + api.RegisterTraceFunnelsRoutes(r, am) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"},