mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 04:39:01 +08:00
This reverts commit b36d2ec4c6ae6a21068985a0a9a8f890a6b9a083.
This commit is contained in:
parent
d80908a1fc
commit
3100d602c4
@ -385,7 +385,6 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
|
||||
apiHandler.RegisterMessagingQueuesRoutes(r, am)
|
||||
apiHandler.RegisterThirdPartyApiRoutes(r, am)
|
||||
apiHandler.MetricExplorerRoutes(r, am)
|
||||
apiHandler.RegisterTraceFunnelsRoutes(r, am)
|
||||
|
||||
c := cors.New(cors.Options{
|
||||
AllowedOrigins: []string{"*"},
|
||||
|
@ -66,7 +66,7 @@ type Server struct {
|
||||
|
||||
func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore) (*Server, error) {
|
||||
server := &Server{
|
||||
logger: logger.With("pkg", "github.com/SigNoz/pkg/alertmanager/alertmanagerserver"),
|
||||
logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/alertmanagerserver"),
|
||||
registry: registry,
|
||||
srvConfig: srvConfig,
|
||||
orgID: orgID,
|
||||
|
@ -33,7 +33,7 @@ func NewRegistry(logger *slog.Logger, services ...NamedService) (*Registry, erro
|
||||
}
|
||||
|
||||
return &Registry{
|
||||
logger: logger.With("pkg", "github.com/SigNoz/pkg/factory"),
|
||||
logger: logger.With("pkg", "go.signoz.io/pkg/factory"),
|
||||
services: m,
|
||||
startCh: make(chan error, 1),
|
||||
stopCh: make(chan error, len(services)),
|
||||
|
@ -3,7 +3,7 @@ package middleware
|
||||
import "net/http"
|
||||
|
||||
const (
|
||||
pkgname string = "github.com/SigNoz/pkg/http/middleware"
|
||||
pkgname string = "go.signoz.io/pkg/http/middleware"
|
||||
)
|
||||
|
||||
// Wrapper is an interface implemented by all middlewares
|
||||
|
@ -38,7 +38,7 @@ func New(logger *zap.Logger, cfg Config, handler http.Handler) (*Server, error)
|
||||
|
||||
return &Server{
|
||||
srv: srv,
|
||||
logger: logger.Named("github.com/SigNoz/pkg/http/server"),
|
||||
logger: logger.Named("go.signoz.io/pkg/http/server"),
|
||||
handler: handler,
|
||||
cfg: cfg,
|
||||
}, nil
|
||||
|
@ -18,9 +18,6 @@ import (
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/integrations/traceFunnels"
|
||||
"github.com/google/uuid"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager"
|
||||
errorsV2 "github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
@ -5576,546 +5573,3 @@ func (aH *APIHandler) getDomainInfo(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
aH.Respond(w, resp)
|
||||
}
|
||||
|
||||
// RegisterTraceFunnelsRoutes adds trace funnels routes
|
||||
func (aH *APIHandler) RegisterTraceFunnelsRoutes(router *mux.Router, am *AuthMiddleware) {
|
||||
|
||||
// Main messaging queues router
|
||||
traceFunnelsRouter := router.PathPrefix("/api/v1/trace-funnels").Subrouter()
|
||||
|
||||
// API endpoints
|
||||
traceFunnelsRouter.HandleFunc("/new-funnel", aH.handleNewFunnel).Methods("POST")
|
||||
traceFunnelsRouter.HandleFunc("/steps/update", aH.handleUpdateFunnelStep).Methods("PUT")
|
||||
traceFunnelsRouter.HandleFunc("/list", aH.handleListFunnels).Methods("GET")
|
||||
traceFunnelsRouter.HandleFunc("/get/{funnel_id}", aH.handleGetFunnel).Methods("GET")
|
||||
traceFunnelsRouter.HandleFunc("/delete/{funnel_id}", aH.handleDeleteFunnel).Methods("DELETE")
|
||||
traceFunnelsRouter.HandleFunc("/save", aH.handleSaveFunnel).Methods("POST")
|
||||
|
||||
//// Analytics endpoints
|
||||
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/validate", aH.handleValidateTraces).Methods("POST")
|
||||
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/overview", aH.handleFunnelAnalytics).Methods("POST")
|
||||
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/steps", aH.handleStepAnalytics).Methods("POST")
|
||||
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/slow-traces", func(w http.ResponseWriter, r *http.Request) {
|
||||
aH.handleSlowTraces(w, r, false)
|
||||
}).Methods("POST")
|
||||
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/error-traces", func(w http.ResponseWriter, r *http.Request) {
|
||||
aH.handleSlowTraces(w, r, true)
|
||||
}).Methods("POST")
|
||||
|
||||
}
|
||||
|
||||
// handleNewFunnel creates a new funnel without steps
|
||||
// Steps should be added separately using the update endpoint
|
||||
func (aH *APIHandler) handleNewFunnel(w http.ResponseWriter, r *http.Request) {
|
||||
var req traceFunnels.NewFunnelRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
claims, ok := authtypes.ClaimsFromContext(r.Context())
|
||||
if !ok {
|
||||
http.Error(w, "unauthenticated", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
userID := claims.UserID
|
||||
orgID := claims.OrgID
|
||||
|
||||
// Validate timestamp is provided and in milliseconds format
|
||||
if err := traceFunnels.ValidateTimestamp(req.Timestamp, "creation_timestamp"); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Check for name collision in the SQLite database
|
||||
var count int
|
||||
|
||||
err := aH.Signoz.SQLStore.SQLDB().QueryRow(
|
||||
"SELECT COUNT(*) FROM saved_views WHERE name = ? AND created_by = ? AND category = 'funnel'",
|
||||
req.Name, userID,
|
||||
).Scan(&count)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error checking for funnel name collision in SQLite: %v", zap.Error(err))
|
||||
} else if count > 0 {
|
||||
http.Error(w, fmt.Sprintf("funnel with name '%s' already exists for user '%s' in the database", req.Name, userID), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
funnel := &traceFunnels.Funnel{
|
||||
ID: uuid.New().String(),
|
||||
Name: req.Name,
|
||||
CreatedAt: req.Timestamp * 1000000, // Convert milliseconds to nanoseconds for internal storage
|
||||
CreatedBy: userID,
|
||||
OrgID: orgID,
|
||||
Steps: make([]traceFunnels.FunnelStep, 0),
|
||||
}
|
||||
|
||||
funnelData, err := json.Marshal(funnel)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to marshal funnel data: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
createdAt := time.Unix(0, funnel.CreatedAt).UTC().Format(time.RFC3339)
|
||||
|
||||
// Insert new funnel
|
||||
_, err = aH.Signoz.SQLStore.SQLDB().Exec(
|
||||
"INSERT INTO saved_views (uuid, name, category, created_by, updated_by, source_page, data, created_at, updated_at, org_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
funnel.ID, funnel.Name, "funnel", userID, userID, "trace-funnels", string(funnelData), createdAt, createdAt, orgID,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to save funnel to database: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
response := traceFunnels.NewFunnelResponse{
|
||||
ID: funnel.ID,
|
||||
Name: funnel.Name,
|
||||
CreatedAt: funnel.CreatedAt / 1000000,
|
||||
CreatedBy: funnel.CreatedBy,
|
||||
OrgID: orgID,
|
||||
}
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// handleUpdateFunnelStep adds or updates steps for an existing funnel
|
||||
// Steps are identified by their step_order, which must be unique within a funnel
|
||||
func (aH *APIHandler) handleUpdateFunnelStep(w http.ResponseWriter, r *http.Request) {
|
||||
var req traceFunnels.FunnelStepRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
claims, ok := authtypes.ClaimsFromContext(r.Context())
|
||||
if !ok {
|
||||
http.Error(w, "unauthenticated", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
userID := claims.UserID
|
||||
|
||||
if err := traceFunnels.ValidateTimestamp(req.Timestamp, "updated_timestamp"); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
aH.Signoz.SQLStore.SQLxDB()
|
||||
dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore)
|
||||
|
||||
funnel, err := dbClient.GetFunnelFromDB(req.FunnelID)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("funnel not found: %v", err), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
// Process each step in the request
|
||||
for i := range req.Steps {
|
||||
if req.Steps[i].StepOrder < 1 {
|
||||
req.Steps[i].StepOrder = int64(i + 1) // Default to sequential ordering if not specified
|
||||
}
|
||||
}
|
||||
|
||||
if err := traceFunnels.ValidateFunnelSteps(req.Steps); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Normalize step orders
|
||||
req.Steps = traceFunnels.NormalizeFunnelSteps(req.Steps)
|
||||
|
||||
// Update the funnel with new steps
|
||||
funnel.Steps = req.Steps
|
||||
funnel.UpdatedAt = req.Timestamp * 1000000
|
||||
funnel.UpdatedBy = userID
|
||||
|
||||
funnelData, err := json.Marshal(funnel)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to marshal funnel data: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
updatedAt := time.Unix(0, funnel.UpdatedAt).UTC().Format(time.RFC3339)
|
||||
|
||||
_, err = aH.Signoz.SQLStore.SQLDB().Exec(
|
||||
"UPDATE saved_views SET data = ?, updated_by = ?, updated_at = ? WHERE uuid = ? AND category = 'funnel'",
|
||||
string(funnelData), userID, updatedAt, req.FunnelID,
|
||||
)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to update funnel in database: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
response := map[string]interface{}{
|
||||
"id": funnel.ID,
|
||||
"funnel_name": funnel.Name,
|
||||
"creation_timestamp": funnel.CreatedAt / 1000000,
|
||||
"user_id": funnel.CreatedBy,
|
||||
"org_id": funnel.OrgID,
|
||||
"updated_timestamp": req.Timestamp,
|
||||
"updated_by": userID,
|
||||
"steps": funnel.Steps,
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) handleListFunnels(w http.ResponseWriter, r *http.Request) {
|
||||
orgID := r.URL.Query().Get("org_id")
|
||||
|
||||
var dbFunnels []*traceFunnels.Funnel
|
||||
var err error
|
||||
|
||||
dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore)
|
||||
|
||||
if orgID != "" {
|
||||
dbFunnels, err = dbClient.ListFunnelsFromDB(orgID)
|
||||
} else {
|
||||
dbFunnels, err = dbClient.ListAllFunnelsFromDB()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("error fetching funnels from database: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Convert to response format with additional metadata
|
||||
response := make([]map[string]interface{}, 0, len(dbFunnels))
|
||||
for _, f := range dbFunnels {
|
||||
funnelInfo := map[string]interface{}{
|
||||
"id": f.ID,
|
||||
"funnel_name": f.Name,
|
||||
"creation_timestamp": f.CreatedAt / 1000000,
|
||||
"user_id": f.CreatedBy,
|
||||
"org_id": f.OrgID,
|
||||
}
|
||||
|
||||
if f.UpdatedAt > 0 {
|
||||
funnelInfo["updated_timestamp"] = f.UpdatedAt / 1000000
|
||||
}
|
||||
if f.UpdatedBy != "" {
|
||||
funnelInfo["updated_by"] = f.UpdatedBy
|
||||
}
|
||||
|
||||
var extraData, tags string
|
||||
err := aH.Signoz.SQLStore.SQLDB().QueryRow(
|
||||
"SELECT IFNULL(extra_data, ''), IFNULL(tags, '') FROM saved_views WHERE uuid = ? AND category = 'funnel'",
|
||||
f.ID,
|
||||
).Scan(&extraData, &tags)
|
||||
|
||||
if err == nil && tags != "" {
|
||||
funnelInfo["tags"] = tags
|
||||
}
|
||||
|
||||
if err == nil && extraData != "" {
|
||||
var extraDataMap map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(extraData), &extraDataMap); err == nil {
|
||||
if description, ok := extraDataMap["description"].(string); ok {
|
||||
funnelInfo["description"] = description
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
response = append(response, funnelInfo)
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) handleGetFunnel(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
funnelID := vars["funnel_id"]
|
||||
|
||||
dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore)
|
||||
funnel, err := dbClient.GetFunnelFromDB(funnelID)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("funnel not found: %v", err), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
var extraData, tags string
|
||||
|
||||
err = aH.Signoz.SQLStore.SQLDB().QueryRow(
|
||||
"SELECT IFNULL(extra_data, ''), IFNULL(tags, '') FROM saved_views WHERE uuid = ? AND category = 'funnel'",
|
||||
funnel.ID,
|
||||
).Scan(&extraData, &tags)
|
||||
|
||||
response := map[string]interface{}{
|
||||
"id": funnel.ID,
|
||||
"funnel_name": funnel.Name,
|
||||
"creation_timestamp": funnel.CreatedAt / 1000000,
|
||||
"user_id": funnel.CreatedBy,
|
||||
"org_id": funnel.OrgID,
|
||||
"steps": funnel.Steps,
|
||||
}
|
||||
|
||||
if funnel.UpdatedAt > 0 {
|
||||
response["updated_timestamp"] = funnel.UpdatedAt / 1000000
|
||||
}
|
||||
if funnel.UpdatedBy != "" {
|
||||
response["updated_by"] = funnel.UpdatedBy
|
||||
}
|
||||
|
||||
if err == nil && tags != "" {
|
||||
response["tags"] = tags
|
||||
}
|
||||
|
||||
if err == nil && extraData != "" {
|
||||
var extraDataMap map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(extraData), &extraDataMap); err == nil {
|
||||
if description, ok := extraDataMap["description"].(string); ok {
|
||||
response["description"] = description
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) handleDeleteFunnel(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
funnelID := vars["funnel_id"]
|
||||
|
||||
dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore)
|
||||
err := dbClient.DeleteFunnelFromDB(funnelID)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to delete funnel: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(w).Encode(map[string]string{"status": "success"})
|
||||
}
|
||||
|
||||
// handleSaveFunnel saves a funnel to the SQLite database
|
||||
// Only requires funnel_id and optional description
|
||||
func (aH *APIHandler) handleSaveFunnel(w http.ResponseWriter, r *http.Request) {
|
||||
var req traceFunnels.SaveFunnelRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore)
|
||||
funnel, err := dbClient.GetFunnelFromDB(req.FunnelID)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("funnel not found: %v", err), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
updateTimestamp := req.Timestamp
|
||||
if updateTimestamp == 0 {
|
||||
updateTimestamp = time.Now().UnixMilli()
|
||||
} else {
|
||||
if !traceFunnels.ValidateTimestampIsMilliseconds(updateTimestamp) {
|
||||
http.Error(w, "timestamp must be in milliseconds format (13 digits)", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
funnel.UpdatedAt = updateTimestamp * 1000000 // Convert ms to ns
|
||||
|
||||
if req.UserID != "" {
|
||||
funnel.UpdatedBy = req.UserID
|
||||
}
|
||||
extraData := ""
|
||||
if req.Description != "" {
|
||||
descriptionJSON, err := json.Marshal(map[string]string{"description": req.Description})
|
||||
if err != nil {
|
||||
http.Error(w, "failed to marshal description: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
extraData = string(descriptionJSON)
|
||||
}
|
||||
|
||||
orgID := req.OrgID
|
||||
if orgID == "" {
|
||||
orgID = funnel.OrgID
|
||||
}
|
||||
|
||||
if err := dbClient.SaveFunnel(funnel, funnel.UpdatedBy, orgID, req.Tags, extraData); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
var createdAt, updatedAt, tags, extraDataFromDB string
|
||||
err = aH.Signoz.SQLStore.SQLDB().QueryRow(
|
||||
"SELECT created_at, updated_at, IFNULL(tags, ''), IFNULL(extra_data, '') FROM saved_views WHERE uuid = ? AND category = 'funnel'",
|
||||
funnel.ID,
|
||||
).Scan(&createdAt, &updatedAt, &tags, &extraDataFromDB)
|
||||
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(w).Encode(map[string]string{
|
||||
"status": "success",
|
||||
"id": funnel.ID,
|
||||
"name": funnel.Name,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
response := map[string]string{
|
||||
"status": "success",
|
||||
"id": funnel.ID,
|
||||
"name": funnel.Name,
|
||||
"created_at": createdAt,
|
||||
"updated_at": updatedAt,
|
||||
"created_by": funnel.CreatedBy,
|
||||
"updated_by": funnel.UpdatedBy,
|
||||
"org_id": funnel.OrgID,
|
||||
}
|
||||
|
||||
if tags != "" {
|
||||
response["tags"] = tags
|
||||
}
|
||||
|
||||
if extraDataFromDB != "" {
|
||||
var extraDataMap map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(extraDataFromDB), &extraDataMap); err == nil {
|
||||
if description, ok := extraDataMap["description"].(string); ok {
|
||||
response["description"] = description
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) handleValidateTraces(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
funnelID := vars["funnel_id"]
|
||||
|
||||
dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore)
|
||||
funnel, err := dbClient.GetFunnelFromDB(funnelID)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("funnel not found: %v", err), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
var timeRange traceFunnels.TimeRange
|
||||
if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if len(funnel.Steps) < 2 {
|
||||
http.Error(w, "funnel must have at least 2 steps", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
chq, err := traceFunnels.ValidateTraces(funnel, timeRange)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error(err.Error())
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
|
||||
aH.Respond(w, results)
|
||||
}
|
||||
|
||||
// Analytics handlers
|
||||
func (aH *APIHandler) handleFunnelAnalytics(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
funnelID := vars["funnel_id"]
|
||||
|
||||
dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore)
|
||||
|
||||
funnel, err := dbClient.GetFunnelFromDB(funnelID)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("funnel not found: %v", err), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
var timeRange traceFunnels.TimeRange
|
||||
if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
chq, err := traceFunnels.ValidateTracesWithLatency(funnel, timeRange)
|
||||
if err != nil {
|
||||
zap.L().Error(err.Error())
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
|
||||
aH.Respond(w, results)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) handleStepAnalytics(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
funnelID := vars["funnel_id"]
|
||||
|
||||
// Get funnel directly from SQLite database
|
||||
dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore)
|
||||
funnel, err := dbClient.GetFunnelFromDB(funnelID)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("funnel not found: %v", err), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
var timeRange traceFunnels.TimeRange
|
||||
if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
chq, err := traceFunnels.GetStepAnalytics(funnel, timeRange)
|
||||
if err != nil {
|
||||
zap.L().Error(err.Error())
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
|
||||
aH.Respond(w, results)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) handleSlowTraces(w http.ResponseWriter, r *http.Request, withErrors bool) {
|
||||
vars := mux.Vars(r)
|
||||
funnelID := vars["funnel_id"]
|
||||
|
||||
dbClient, _ := traceFunnels.NewSQLClient(aH.Signoz.SQLStore)
|
||||
funnel, err := dbClient.GetFunnelFromDB(funnelID)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("funnel not found: %v", err), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
var req traceFunnels.StepTransitionRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
stepAExists, stepBExists := false, false
|
||||
for _, step := range funnel.Steps {
|
||||
if step.StepOrder == req.StepAOrder {
|
||||
stepAExists = true
|
||||
}
|
||||
if step.StepOrder == req.StepBOrder {
|
||||
stepBExists = true
|
||||
}
|
||||
}
|
||||
|
||||
if !stepAExists || !stepBExists {
|
||||
http.Error(w, fmt.Sprintf("One or both steps not found. Step A Order: %d, Step B Order: %d", req.StepAOrder, req.StepBOrder), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
chq, err := traceFunnels.GetSlowestTraces(funnel, req.StepAOrder, req.StepBOrder, req.TimeRange, withErrors)
|
||||
if err != nil {
|
||||
zap.L().Error(err.Error())
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
|
||||
aH.Respond(w, results)
|
||||
}
|
||||
|
@ -1,145 +0,0 @@
|
||||
package traceFunnels
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type FunnelStore struct {
|
||||
sync.RWMutex
|
||||
funnels map[string]*Funnel
|
||||
}
|
||||
|
||||
func (s *FunnelStore) CreateFunnel(name, userID, orgID string, timestamp int64) (*Funnel, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
for _, existingFunnel := range s.funnels {
|
||||
if existingFunnel.Name == name && existingFunnel.CreatedBy == userID {
|
||||
return nil, fmt.Errorf("funnel with name '%s' already exists for user '%s'", name, userID)
|
||||
}
|
||||
}
|
||||
|
||||
if timestamp == 0 {
|
||||
return nil, fmt.Errorf("timestamp is required")
|
||||
}
|
||||
|
||||
funnel := &Funnel{
|
||||
ID: uuid.New().String(),
|
||||
Name: name,
|
||||
CreatedAt: timestamp * 1000000, // Convert milliseconds to nanoseconds for internal storage
|
||||
CreatedBy: userID,
|
||||
OrgID: orgID,
|
||||
Steps: make([]FunnelStep, 0),
|
||||
}
|
||||
|
||||
s.funnels[funnel.ID] = funnel
|
||||
return funnel, nil
|
||||
}
|
||||
|
||||
func (s *FunnelStore) GetFunnel(id string) (*Funnel, error) {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
funnel, ok := s.funnels[id]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("funnel not found")
|
||||
}
|
||||
|
||||
return funnel, nil
|
||||
}
|
||||
|
||||
func (s *FunnelStore) ListFunnels() []*Funnel {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
funnels := make([]*Funnel, 0, len(s.funnels))
|
||||
for _, funnel := range s.funnels {
|
||||
funnels = append(funnels, funnel)
|
||||
}
|
||||
|
||||
return funnels
|
||||
}
|
||||
|
||||
func (s *FunnelStore) UpdateFunnelSteps(id string, steps []FunnelStep, updatedBy string, updatedAt int64) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
funnel, ok := s.funnels[id]
|
||||
if !ok {
|
||||
return fmt.Errorf("funnel with ID %s not found", id)
|
||||
}
|
||||
|
||||
funnel.Steps = steps
|
||||
funnel.UpdatedAt = updatedAt * 1000000
|
||||
funnel.UpdatedBy = updatedBy
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteFunnel removes a funnel from the in-memory store
|
||||
func (s *FunnelStore) DeleteFunnel(id string) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if _, ok := s.funnels[id]; !ok {
|
||||
return fmt.Errorf("funnel with ID %s not found", id)
|
||||
}
|
||||
|
||||
delete(s.funnels, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateFunnelSteps validates funnel steps and ensures they have unique and correct order
|
||||
// Rules: At least 2 steps, max 3 steps, orders must be unique and include 1 and 2
|
||||
func ValidateFunnelSteps(steps []FunnelStep) error {
|
||||
if len(steps) < 2 {
|
||||
return fmt.Errorf("at least 2 funnel steps are required")
|
||||
}
|
||||
|
||||
if len(steps) > 3 {
|
||||
return fmt.Errorf("maximum 3 funnel steps are allowed")
|
||||
}
|
||||
|
||||
orderMap := make(map[int64]bool)
|
||||
|
||||
for _, step := range steps {
|
||||
if orderMap[step.StepOrder] {
|
||||
return fmt.Errorf("duplicate step order: %d", step.StepOrder)
|
||||
}
|
||||
orderMap[step.StepOrder] = true
|
||||
|
||||
if step.StepOrder < 1 || step.StepOrder > 3 {
|
||||
return fmt.Errorf("step order must be between 1 and 3, got: %d", step.StepOrder)
|
||||
}
|
||||
}
|
||||
|
||||
if !orderMap[1] || !orderMap[2] {
|
||||
return fmt.Errorf("funnel steps with orders 1 and 2 are mandatory")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NormalizeFunnelSteps ensures steps have sequential orders starting from 1
|
||||
// This sorts steps by order and then reassigns orders to be sequential
|
||||
func NormalizeFunnelSteps(steps []FunnelStep) []FunnelStep {
|
||||
// Create a copy of the input slice
|
||||
sortedSteps := make([]FunnelStep, len(steps))
|
||||
copy(sortedSteps, steps)
|
||||
|
||||
// Sort using Go's built-in sort.Slice function
|
||||
sort.Slice(sortedSteps, func(i, j int) bool {
|
||||
return sortedSteps[i].StepOrder < sortedSteps[j].StepOrder
|
||||
})
|
||||
|
||||
// Normalize orders to be sequential starting from 1
|
||||
for i := 0; i < len(sortedSteps); i++ {
|
||||
sortedSteps[i].StepOrder = int64(i + 1)
|
||||
}
|
||||
|
||||
return sortedSteps
|
||||
}
|
@ -1,81 +0,0 @@
|
||||
package traceFunnels
|
||||
|
||||
import v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
|
||||
type Funnel struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"funnel_name"`
|
||||
CreatedAt int64 `json:"creation_timestamp"`
|
||||
CreatedBy string `json:"user_id"`
|
||||
OrgID string `json:"org_id"`
|
||||
UpdatedAt int64 `json:"updated_timestamp,omitempty"`
|
||||
UpdatedBy string `json:"updated_by,omitempty"`
|
||||
Steps []FunnelStep `json:"steps"`
|
||||
}
|
||||
|
||||
// FunnelStep Models
|
||||
type FunnelStep struct {
|
||||
StepOrder int64 `json:"step_order"` // Order of the step in the funnel (1-based)
|
||||
ServiceName string `json:"service_name"` // Service name for the span
|
||||
SpanName string `json:"span_name"` // Span name to match
|
||||
Filters *v3.FilterSet `json:"filters"` // Additional SQL filters
|
||||
LatencyPointer string `json:"latency_pointer"` // "start" or "end"
|
||||
LatencyType string `json:"latency_type"` // "p99", "p95", "p90"
|
||||
HasErrors bool `json:"has_errors"` // Whether to include error spans
|
||||
}
|
||||
|
||||
// NewFunnelRequest Request/Response structures
|
||||
// NewFunnelRequest is used to create a new funnel without steps
|
||||
// Steps should be added separately using the update endpoint
|
||||
type NewFunnelRequest struct {
|
||||
Name string `json:"funnel_name"`
|
||||
Timestamp int64 `json:"creation_timestamp"` // Unix milliseconds timestamp
|
||||
}
|
||||
|
||||
type NewFunnelResponse struct {
|
||||
ID string `json:"funnel_id"`
|
||||
Name string `json:"funnel_name"`
|
||||
CreatedAt int64 `json:"creation_timestamp"`
|
||||
CreatedBy string `json:"user_id"`
|
||||
OrgID string `json:"org_id"`
|
||||
}
|
||||
|
||||
type FunnelListResponse struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"funnel_name"`
|
||||
CreatedAt int64 `json:"creation_timestamp"` // Unix nano timestamp
|
||||
CreatedBy string `json:"user_id"`
|
||||
OrgID string `json:"org_id,omitempty"`
|
||||
}
|
||||
|
||||
// FunnelStepRequest is used to add or update steps for an existing funnel
|
||||
type FunnelStepRequest struct {
|
||||
FunnelID string `json:"funnel_id"`
|
||||
Steps []FunnelStep `json:"steps"`
|
||||
Timestamp int64 `json:"updated_timestamp"` // Unix milliseconds timestamp for update time
|
||||
}
|
||||
|
||||
// TimeRange Analytics request/response types
|
||||
type TimeRange struct {
|
||||
StartTime int64 `json:"start_time"` // Unix nano
|
||||
EndTime int64 `json:"end_time"` // Unix nano
|
||||
}
|
||||
|
||||
type StepTransitionRequest struct {
|
||||
TimeRange
|
||||
StepAOrder int64 `json:"step_a_order"` // First step in transition
|
||||
StepBOrder int64 `json:"step_b_order"` // Second step in transition
|
||||
}
|
||||
|
||||
type ValidTracesResponse struct {
|
||||
TraceIDs []string `json:"trace_ids"`
|
||||
}
|
||||
|
||||
type FunnelAnalytics struct {
|
||||
TotalStart int64 `json:"total_start"`
|
||||
TotalComplete int64 `json:"total_complete"`
|
||||
ErrorCount int64 `json:"error_count"`
|
||||
AvgDurationMs float64 `json:"avg_duration_ms"`
|
||||
P99LatencyMs float64 `json:"p99_latency_ms"`
|
||||
ConversionRate float64 `json:"conversion_rate"`
|
||||
}
|
@ -1,474 +0,0 @@
|
||||
package traceFunnels
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
// TracesTable is the ClickHouse table name for traces
|
||||
const TracesTable = "signoz_traces.signoz_index_v3"
|
||||
|
||||
// StepAnalytics represents analytics data for a single step in a funnel
|
||||
type StepAnalytics struct {
|
||||
StepOrder int64 `json:"stepOrder"`
|
||||
TotalSpans int64 `json:"totalSpans"`
|
||||
ErroredSpans int64 `json:"erroredSpans"`
|
||||
AvgDurationMs string `json:"avgDurationMs"`
|
||||
}
|
||||
|
||||
// FunnelStepFilter represents filters for a single step in the funnel
|
||||
type FunnelStepFilter struct {
|
||||
StepNumber int
|
||||
ServiceName string
|
||||
SpanName string
|
||||
LatencyPointer string // "start" or "end"
|
||||
CustomFilters *v3.FilterSet
|
||||
}
|
||||
|
||||
// SlowTrace represents a trace with its duration and span count
|
||||
type SlowTrace struct {
|
||||
TraceID string `json:"traceId"`
|
||||
DurationMs string `json:"durationMs"`
|
||||
SpanCount int64 `json:"spanCount"`
|
||||
}
|
||||
|
||||
// ValidateTraces parses the Funnel and builds a query to validate traces
|
||||
func ValidateTraces(funnel *Funnel, timeRange TimeRange) (*v3.ClickHouseQuery, error) {
|
||||
filters, err := buildFunnelFilters(funnel)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building funnel filters: %w", err)
|
||||
}
|
||||
|
||||
query := generateFunnelSQL(timeRange.StartTime, timeRange.EndTime, filters)
|
||||
|
||||
return &v3.ClickHouseQuery{
|
||||
Query: query,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ValidateTracesWithLatency builds a query that considers the latency pointer for trace calculations
|
||||
func ValidateTracesWithLatency(funnel *Funnel, timeRange TimeRange) (*v3.ClickHouseQuery, error) {
|
||||
filters, err := buildFunnelFiltersWithLatency(funnel)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building funnel filters with latency: %w", err)
|
||||
}
|
||||
|
||||
query := generateFunnelSQLWithLatency(timeRange.StartTime, timeRange.EndTime, filters)
|
||||
|
||||
return &v3.ClickHouseQuery{
|
||||
Query: query,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// buildFunnelFilters extracts filters from funnel steps (without latency pointer)
|
||||
func buildFunnelFilters(funnel *Funnel) ([]FunnelStepFilter, error) {
|
||||
if funnel == nil {
|
||||
return nil, fmt.Errorf("funnel cannot be nil")
|
||||
}
|
||||
|
||||
if len(funnel.Steps) == 0 {
|
||||
return nil, fmt.Errorf("funnel must have at least one step")
|
||||
}
|
||||
|
||||
filters := make([]FunnelStepFilter, len(funnel.Steps))
|
||||
|
||||
for i, step := range funnel.Steps {
|
||||
filters[i] = FunnelStepFilter{
|
||||
StepNumber: i + 1,
|
||||
ServiceName: step.ServiceName,
|
||||
SpanName: step.SpanName,
|
||||
CustomFilters: step.Filters,
|
||||
}
|
||||
}
|
||||
|
||||
return filters, nil
|
||||
}
|
||||
|
||||
// buildFunnelFiltersWithLatency extracts filters including the latency pointer
|
||||
func buildFunnelFiltersWithLatency(funnel *Funnel) ([]FunnelStepFilter, error) {
|
||||
if funnel == nil {
|
||||
return nil, fmt.Errorf("funnel cannot be nil")
|
||||
}
|
||||
|
||||
if len(funnel.Steps) == 0 {
|
||||
return nil, fmt.Errorf("funnel must have at least one step")
|
||||
}
|
||||
|
||||
filters := make([]FunnelStepFilter, len(funnel.Steps))
|
||||
|
||||
for i, step := range funnel.Steps {
|
||||
latencyPointer := "start" // Default value
|
||||
if step.LatencyPointer != "" {
|
||||
latencyPointer = step.LatencyPointer
|
||||
}
|
||||
|
||||
filters[i] = FunnelStepFilter{
|
||||
StepNumber: i + 1,
|
||||
ServiceName: step.ServiceName,
|
||||
SpanName: step.SpanName,
|
||||
LatencyPointer: latencyPointer,
|
||||
CustomFilters: step.Filters,
|
||||
}
|
||||
}
|
||||
|
||||
return filters, nil
|
||||
}
|
||||
|
||||
// escapeString escapes a string for safe use in SQL queries
|
||||
func escapeString(s string) string {
|
||||
// Replace single quotes with double single quotes to escape them in SQL
|
||||
return strings.ReplaceAll(s, "'", "''")
|
||||
}
|
||||
|
||||
// generateFunnelSQL builds the ClickHouse SQL query for funnel validation
|
||||
func generateFunnelSQL(start, end int64, filters []FunnelStepFilter) string {
|
||||
var expressions []string
|
||||
|
||||
// Basic time expressions.
|
||||
expressions = append(expressions, fmt.Sprintf("toUInt64(%d) AS start_time", start))
|
||||
expressions = append(expressions, fmt.Sprintf("toUInt64(%d) AS end_time", end))
|
||||
expressions = append(expressions, "toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart")
|
||||
expressions = append(expressions, "toString(intDiv(end_time, 1000000000)) AS tsBucketEnd")
|
||||
|
||||
// Add service and span alias definitions from each filter.
|
||||
for _, f := range filters {
|
||||
expressions = append(expressions, fmt.Sprintf("'%s' AS service_%d", escapeString(f.ServiceName), f.StepNumber))
|
||||
expressions = append(expressions, fmt.Sprintf("'%s' AS span_%d", escapeString(f.SpanName), f.StepNumber))
|
||||
}
|
||||
|
||||
// Add the CTE for each step.
|
||||
for _, f := range filters {
|
||||
cte := fmt.Sprintf(`step%d_traces AS (
|
||||
SELECT DISTINCT trace_id
|
||||
FROM %s
|
||||
WHERE serviceName = service_%d
|
||||
AND name = span_%d
|
||||
AND timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
|
||||
)`, f.StepNumber, TracesTable, f.StepNumber, f.StepNumber)
|
||||
expressions = append(expressions, cte)
|
||||
}
|
||||
|
||||
// Join all expressions with commas and newlines
|
||||
withClause := "WITH \n" + strings.Join(expressions, ",\n") + "\n"
|
||||
|
||||
// Build the intersect clause for each step.
|
||||
var intersectQueries []string
|
||||
for _, f := range filters {
|
||||
intersectQueries = append(intersectQueries, fmt.Sprintf("SELECT trace_id FROM step%d_traces", f.StepNumber))
|
||||
}
|
||||
intersectClause := strings.Join(intersectQueries, "\nINTERSECT\n")
|
||||
|
||||
query := withClause + `
|
||||
SELECT trace_id
|
||||
FROM ` + TracesTable + `
|
||||
WHERE trace_id IN (
|
||||
` + intersectClause + `
|
||||
)
|
||||
AND timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
|
||||
GROUP BY trace_id
|
||||
LIMIT 5
|
||||
`
|
||||
return query
|
||||
}
|
||||
|
||||
// generateFunnelSQLWithLatency correctly applies latency pointer logic for trace duration
|
||||
func generateFunnelSQLWithLatency(start, end int64, filters []FunnelStepFilter) string {
|
||||
var expressions []string
|
||||
|
||||
// Define the base time variables
|
||||
expressions = append(expressions, fmt.Sprintf("toUInt64(%d) AS start_time", start))
|
||||
expressions = append(expressions, fmt.Sprintf("toUInt64(%d) AS end_time", end))
|
||||
expressions = append(expressions, "toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart")
|
||||
expressions = append(expressions, "toString(intDiv(end_time, 1000000000)) AS tsBucketEnd")
|
||||
expressions = append(expressions, "(end_time - start_time) / 1e9 AS total_time_seconds")
|
||||
|
||||
// Define service, span, and latency pointer mappings
|
||||
for _, f := range filters {
|
||||
expressions = append(expressions, fmt.Sprintf("('%s', '%s', '%s') AS s%d_config",
|
||||
escapeString(f.ServiceName),
|
||||
escapeString(f.SpanName),
|
||||
escapeString(f.LatencyPointer),
|
||||
f.StepNumber))
|
||||
}
|
||||
|
||||
// Construct the WITH clause
|
||||
withClause := "WITH \n" + strings.Join(expressions, ",\n") + "\n"
|
||||
|
||||
// Latency calculation logic
|
||||
var latencyCases []string
|
||||
for _, f := range filters {
|
||||
if f.LatencyPointer == "end" {
|
||||
latencyCases = append(latencyCases, fmt.Sprintf(`
|
||||
WHEN (resource_string_service$$name, name) = (s%d_config.1, s%d_config.2)
|
||||
THEN toUnixTimestamp64Nano(timestamp) + duration_nano`, f.StepNumber, f.StepNumber))
|
||||
} else {
|
||||
latencyCases = append(latencyCases, fmt.Sprintf(`
|
||||
WHEN (resource_string_service$$name, name) = (s%d_config.1, s%d_config.2)
|
||||
THEN toUnixTimestamp64Nano(timestamp)`, f.StepNumber, f.StepNumber))
|
||||
}
|
||||
}
|
||||
|
||||
latencyComputation := fmt.Sprintf(`
|
||||
MAX(
|
||||
CASE %s
|
||||
ELSE toUnixTimestamp64Nano(timestamp)
|
||||
END
|
||||
) -
|
||||
MIN(
|
||||
CASE %s
|
||||
ELSE toUnixTimestamp64Nano(timestamp)
|
||||
END
|
||||
) AS trace_duration`, strings.Join(latencyCases, ""), strings.Join(latencyCases, ""))
|
||||
|
||||
query := withClause + `
|
||||
SELECT
|
||||
COUNT(DISTINCT CASE WHEN in_funnel_s1 = 1 THEN trace_id END) AS total_s1,
|
||||
COUNT(DISTINCT CASE WHEN in_funnel_s3 = 1 THEN trace_id END) AS total_s3,
|
||||
COUNT(DISTINCT CASE WHEN in_funnel_s3 = 1 THEN trace_id END) / total_time_seconds AS avg_rate,
|
||||
COUNT(DISTINCT CASE WHEN in_funnel_s3 = 1 AND has_error = true THEN trace_id END) AS errors,
|
||||
avg(trace_duration) AS avg_duration,
|
||||
quantile(0.99)(trace_duration) AS p99_latency,
|
||||
100 - (
|
||||
(COUNT(DISTINCT CASE WHEN in_funnel_s1 = 1 THEN trace_id END) -
|
||||
COUNT(DISTINCT CASE WHEN in_funnel_s3 = 1 THEN trace_id END))
|
||||
/ NULLIF(COUNT(DISTINCT CASE WHEN in_funnel_s1 = 1 THEN trace_id END), 0) * 100
|
||||
) AS conversion_rate
|
||||
FROM (
|
||||
SELECT
|
||||
trace_id,
|
||||
` + latencyComputation + `,
|
||||
MAX(has_error) AS has_error,
|
||||
MAX(CASE WHEN (resource_string_service$$name, name) = (s1_config.1, s1_config.2) THEN 1 ELSE 0 END) AS in_funnel_s1,
|
||||
MAX(CASE WHEN (resource_string_service$$name, name) = (s3_config.1, s3_config.2) THEN 1 ELSE 0 END) AS in_funnel_s3
|
||||
FROM ` + TracesTable + `
|
||||
WHERE timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
|
||||
AND (resource_string_service$$name, name) IN (` + generateFilterConditions(filters) + `)
|
||||
GROUP BY trace_id
|
||||
) AS trace_metrics;
|
||||
`
|
||||
return query
|
||||
}
|
||||
|
||||
// generateFilterConditions creates the filtering conditions dynamically
|
||||
func generateFilterConditions(filters []FunnelStepFilter) string {
|
||||
var conditions []string
|
||||
for _, f := range filters {
|
||||
conditions = append(conditions, fmt.Sprintf("(s%d_config.1, s%d_config.2)", f.StepNumber, f.StepNumber))
|
||||
}
|
||||
return strings.Join(conditions, ", ")
|
||||
}
|
||||
|
||||
// GetStepAnalytics builds a query to get analytics for each step in a funnel
|
||||
func GetStepAnalytics(funnel *Funnel, timeRange TimeRange) (*v3.ClickHouseQuery, error) {
|
||||
if len(funnel.Steps) == 0 {
|
||||
return nil, fmt.Errorf("funnel has no steps")
|
||||
}
|
||||
|
||||
// Build funnel steps array
|
||||
var steps []string
|
||||
for _, step := range funnel.Steps {
|
||||
steps = append(steps, fmt.Sprintf("('%s', '%s')",
|
||||
escapeString(step.ServiceName), escapeString(step.SpanName)))
|
||||
}
|
||||
stepsArray := fmt.Sprintf("array(%s)", strings.Join(steps, ","))
|
||||
|
||||
// Build step CTEs
|
||||
var stepCTEs []string
|
||||
for i, step := range funnel.Steps {
|
||||
filterStr := ""
|
||||
if step.Filters != nil && len(step.Filters.Items) > 0 {
|
||||
// This is a placeholder - in a real implementation, you would convert
|
||||
// the filter set to a SQL WHERE clause string
|
||||
filterStr = "/* Custom filters would be applied here */"
|
||||
}
|
||||
|
||||
cte := fmt.Sprintf(`
|
||||
step%d_traces AS (
|
||||
SELECT DISTINCT trace_id
|
||||
FROM %s
|
||||
WHERE resource_string_service$$name = '%s'
|
||||
AND name = '%s'
|
||||
AND timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
|
||||
%s
|
||||
)`,
|
||||
i+1,
|
||||
TracesTable,
|
||||
escapeString(step.ServiceName),
|
||||
escapeString(step.SpanName),
|
||||
filterStr,
|
||||
)
|
||||
stepCTEs = append(stepCTEs, cte)
|
||||
}
|
||||
|
||||
// Build intersecting traces CTE
|
||||
var intersections []string
|
||||
for i := 1; i <= len(funnel.Steps); i++ {
|
||||
intersections = append(intersections, fmt.Sprintf("SELECT trace_id FROM step%d_traces", i))
|
||||
}
|
||||
intersectingTracesCTE := fmt.Sprintf(`
|
||||
intersecting_traces AS (
|
||||
%s
|
||||
)`,
|
||||
strings.Join(intersections, "\nINTERSECT\n"),
|
||||
)
|
||||
|
||||
// Build CASE expressions for each step
|
||||
var caseExpressions []string
|
||||
for i, step := range funnel.Steps {
|
||||
totalSpansExpr := fmt.Sprintf(`
|
||||
COUNT(CASE WHEN resource_string_service$$name = '%s'
|
||||
AND name = '%s'
|
||||
THEN trace_id END) AS total_s%d_spans`,
|
||||
escapeString(step.ServiceName), escapeString(step.SpanName), i+1)
|
||||
|
||||
erroredSpansExpr := fmt.Sprintf(`
|
||||
COUNT(CASE WHEN resource_string_service$$name = '%s'
|
||||
AND name = '%s'
|
||||
AND has_error = true
|
||||
THEN trace_id END) AS total_s%d_errored_spans`,
|
||||
escapeString(step.ServiceName), escapeString(step.SpanName), i+1)
|
||||
|
||||
caseExpressions = append(caseExpressions, totalSpansExpr, erroredSpansExpr)
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
WITH
|
||||
toUInt64(%d) AS start_time,
|
||||
toUInt64(%d) AS end_time,
|
||||
toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart,
|
||||
toString(intDiv(end_time, 1000000000)) AS tsBucketEnd,
|
||||
%s AS funnel_steps,
|
||||
%s,
|
||||
%s
|
||||
SELECT
|
||||
%s
|
||||
FROM %s
|
||||
WHERE trace_id IN (SELECT trace_id FROM intersecting_traces)
|
||||
AND timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd`,
|
||||
timeRange.StartTime,
|
||||
timeRange.EndTime,
|
||||
stepsArray,
|
||||
strings.Join(stepCTEs, ",\n"),
|
||||
intersectingTracesCTE,
|
||||
strings.Join(caseExpressions, ",\n "),
|
||||
TracesTable,
|
||||
)
|
||||
|
||||
return &v3.ClickHouseQuery{
|
||||
Query: query,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// buildFilters converts a step's filters to a SQL WHERE clause string
|
||||
func buildFilters(step FunnelStep) string {
|
||||
if step.Filters == nil || len(step.Filters.Items) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
// This is a placeholder - in a real implementation, you would convert
|
||||
// the filter set to a SQL WHERE clause string
|
||||
return "/* Custom filters would be applied here */"
|
||||
}
|
||||
|
||||
// GetSlowestTraces builds a query to get the slowest traces for a transition between two steps
|
||||
func GetSlowestTraces(funnel *Funnel, stepAOrder int64, stepBOrder int64, timeRange TimeRange, withErrors bool) (*v3.ClickHouseQuery, error) {
|
||||
// Find steps by order
|
||||
var stepA, stepB *FunnelStep
|
||||
for i := range funnel.Steps {
|
||||
if funnel.Steps[i].StepOrder == stepAOrder {
|
||||
stepA = &funnel.Steps[i]
|
||||
}
|
||||
if funnel.Steps[i].StepOrder == stepBOrder {
|
||||
stepB = &funnel.Steps[i]
|
||||
}
|
||||
}
|
||||
|
||||
if stepA == nil || stepB == nil {
|
||||
return nil, fmt.Errorf("step not found")
|
||||
}
|
||||
|
||||
// Build having clause based on withErrors flag
|
||||
havingClause := ""
|
||||
if withErrors {
|
||||
havingClause = "HAVING has_error = 1"
|
||||
}
|
||||
|
||||
// Build filter strings for each step
|
||||
stepAFilters := ""
|
||||
if stepA.Filters != nil && len(stepA.Filters.Items) > 0 {
|
||||
// This is a placeholder - in a real implementation, you would convert
|
||||
// the filter set to a SQL WHERE clause string
|
||||
stepAFilters = "/* Custom filters for step A would be applied here */"
|
||||
}
|
||||
|
||||
stepBFilters := ""
|
||||
if stepB.Filters != nil && len(stepB.Filters.Items) > 0 {
|
||||
// This is a placeholder - in a real implementation, you would convert
|
||||
// the filter set to a SQL WHERE clause string
|
||||
stepBFilters = "/* Custom filters for step B would be applied here */"
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
WITH
|
||||
toUInt64(%d) AS start_time,
|
||||
toUInt64(%d) AS end_time,
|
||||
toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart,
|
||||
toString(intDiv(end_time, 1000000000)) AS tsBucketEnd
|
||||
SELECT
|
||||
trace_id,
|
||||
concat(toString((max_end_time_ns - min_start_time_ns) / 1e6), ' ms') AS duration_ms,
|
||||
COUNT(*) AS span_count
|
||||
FROM (
|
||||
SELECT
|
||||
s1.trace_id,
|
||||
MIN(toUnixTimestamp64Nano(s1.timestamp)) AS min_start_time_ns,
|
||||
MAX(toUnixTimestamp64Nano(s2.timestamp) + s2.duration_nano) AS max_end_time_ns,
|
||||
MAX(s1.has_error OR s2.has_error) AS has_error
|
||||
FROM %s AS s1
|
||||
JOIN %s AS s2
|
||||
ON s1.trace_id = s2.trace_id
|
||||
WHERE s1.resource_string_service$$name = '%s'
|
||||
AND s1.name = '%s'
|
||||
AND s2.resource_string_service$$name = '%s'
|
||||
AND s2.name = '%s'
|
||||
AND s1.timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||
AND s1.ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
|
||||
AND s2.timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||
AND s2.ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
|
||||
%s
|
||||
%s
|
||||
GROUP BY s1.trace_id
|
||||
%s
|
||||
) AS trace_durations
|
||||
JOIN %s AS spans
|
||||
ON spans.trace_id = trace_durations.trace_id
|
||||
WHERE spans.timestamp BETWEEN toString(start_time) AND toString(end_time)
|
||||
AND spans.ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
|
||||
GROUP BY trace_id, duration_ms
|
||||
ORDER BY CAST(replaceRegexpAll(duration_ms, ' ms$', '') AS Float64) DESC
|
||||
LIMIT 5`,
|
||||
timeRange.StartTime,
|
||||
timeRange.EndTime,
|
||||
TracesTable,
|
||||
TracesTable,
|
||||
escapeString(stepA.ServiceName),
|
||||
escapeString(stepA.SpanName),
|
||||
escapeString(stepB.ServiceName),
|
||||
escapeString(stepB.SpanName),
|
||||
stepAFilters,
|
||||
stepBFilters,
|
||||
havingClause,
|
||||
TracesTable,
|
||||
)
|
||||
|
||||
return &v3.ClickHouseQuery{
|
||||
Query: query,
|
||||
}, nil
|
||||
}
|
@ -1,206 +0,0 @@
|
||||
package traceFunnels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
)
|
||||
|
||||
// SQLClient handles persistence of funnels to the database
|
||||
type SQLClient struct {
|
||||
store sqlstore.SQLStore
|
||||
}
|
||||
|
||||
// NewSQLClient creates a new SQL client
|
||||
func NewSQLClient(store sqlstore.SQLStore) (*SQLClient, error) {
|
||||
return &SQLClient{store: store}, nil
|
||||
}
|
||||
|
||||
// SaveFunnelRequest is used to save a funnel to the database
|
||||
type SaveFunnelRequest struct {
|
||||
FunnelID string `json:"funnel_id"` // Required: ID of the funnel to save
|
||||
UserID string `json:"user_id,omitempty"` // Optional: will use existing user ID if not provided
|
||||
OrgID string `json:"org_id,omitempty"` // Optional: will use existing org ID if not provided
|
||||
Tags string `json:"tags,omitempty"` // Optional: comma-separated tags
|
||||
Description string `json:"description,omitempty"` // Optional: human-readable description
|
||||
Timestamp int64 `json:"timestamp,omitempty"` // Optional: timestamp for update in milliseconds (uses current time if not provided)
|
||||
}
|
||||
|
||||
// SaveFunnel saves a funnel to the database in the saved_views table
|
||||
// Handles both creating new funnels and updating existing ones
|
||||
func (c *SQLClient) SaveFunnel(funnel *Funnel, userID, orgID string, tags, extraData string) error {
|
||||
ctx := context.Background()
|
||||
db := c.store.BunDB()
|
||||
|
||||
// Convert funnel to JSON for storage
|
||||
funnelData, err := json.Marshal(funnel)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal funnel data: %v", err)
|
||||
}
|
||||
|
||||
// Format timestamps as RFC3339
|
||||
// Convert nanoseconds to milliseconds for display, then to time.Time for formatting
|
||||
createdAt := time.Unix(0, funnel.CreatedAt).UTC().Format(time.RFC3339)
|
||||
updatedAt := createdAt
|
||||
updatedBy := userID
|
||||
|
||||
// If funnel has update metadata, use it
|
||||
if funnel.UpdatedAt > 0 {
|
||||
updatedAt = time.Unix(0, funnel.UpdatedAt).UTC().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
if funnel.UpdatedBy != "" {
|
||||
updatedBy = funnel.UpdatedBy
|
||||
}
|
||||
|
||||
// Check if the funnel already exists
|
||||
var count int
|
||||
var existingCreatedBy string
|
||||
var existingCreatedAt string
|
||||
err = db.NewRaw("SELECT COUNT(*), IFNULL(created_by, ''), IFNULL(created_at, '') FROM saved_views WHERE uuid = ? AND category = 'funnel'", funnel.ID).
|
||||
Scan(ctx, &count, &existingCreatedBy, &existingCreatedAt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check if funnel exists: %v", err)
|
||||
}
|
||||
|
||||
if count > 0 {
|
||||
// Update existing funnel - preserve created_by and created_at
|
||||
_, err = db.NewRaw(
|
||||
"UPDATE saved_views SET name = ?, data = ?, updated_by = ?, updated_at = ?, tags = ?, extra_data = ? WHERE uuid = ? AND category = 'funnel'",
|
||||
funnel.Name, string(funnelData), updatedBy, updatedAt, tags, extraData, funnel.ID,
|
||||
).Exec(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update funnel: %v", err)
|
||||
}
|
||||
} else {
|
||||
// Insert new funnel - set both created and updated fields
|
||||
savedView := &types.SavedView{
|
||||
TimeAuditable: types.TimeAuditable{
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
},
|
||||
UserAuditable: types.UserAuditable{
|
||||
CreatedBy: userID,
|
||||
UpdatedBy: updatedBy,
|
||||
},
|
||||
UUID: funnel.ID,
|
||||
Name: funnel.Name,
|
||||
Category: "funnel",
|
||||
SourcePage: "trace-funnels",
|
||||
OrgID: orgID,
|
||||
Tags: tags,
|
||||
Data: string(funnelData),
|
||||
ExtraData: extraData,
|
||||
}
|
||||
|
||||
_, err = db.NewInsert().Model(savedView).Exec(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert funnel: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetFunnelFromDB retrieves a funnel from the database
|
||||
func (c *SQLClient) GetFunnelFromDB(funnelID string) (*Funnel, error) {
|
||||
ctx := context.Background()
|
||||
db := c.store.BunDB()
|
||||
|
||||
var savedView types.SavedView
|
||||
err := db.NewSelect().
|
||||
Model(&savedView).
|
||||
Where("uuid = ? AND category = 'funnel'", funnelID).
|
||||
Scan(ctx)
|
||||
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, fmt.Errorf("funnel not found")
|
||||
}
|
||||
return nil, fmt.Errorf("failed to get funnel: %v", err)
|
||||
}
|
||||
|
||||
var funnel Funnel
|
||||
if err := json.Unmarshal([]byte(savedView.Data), &funnel); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal funnel data: %v", err)
|
||||
}
|
||||
|
||||
return &funnel, nil
|
||||
}
|
||||
|
||||
// ListFunnelsFromDB lists all funnels from the database
|
||||
func (c *SQLClient) ListFunnelsFromDB(orgID string) ([]*Funnel, error) {
|
||||
ctx := context.Background()
|
||||
db := c.store.BunDB()
|
||||
|
||||
var savedViews []types.SavedView
|
||||
err := db.NewSelect().
|
||||
Model(&savedViews).
|
||||
Where("category = 'funnel' AND org_id = ?", orgID).
|
||||
Scan(ctx)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list funnels: %v", err)
|
||||
}
|
||||
|
||||
var funnels []*Funnel
|
||||
for _, view := range savedViews {
|
||||
var funnel Funnel
|
||||
if err := json.Unmarshal([]byte(view.Data), &funnel); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal funnel data: %v", err)
|
||||
}
|
||||
|
||||
funnels = append(funnels, &funnel)
|
||||
}
|
||||
|
||||
return funnels, nil
|
||||
}
|
||||
|
||||
// ListAllFunnelsFromDB lists all funnels from the database without org_id filter
|
||||
func (c *SQLClient) ListAllFunnelsFromDB() ([]*Funnel, error) {
|
||||
ctx := context.Background()
|
||||
db := c.store.BunDB()
|
||||
|
||||
var savedViews []types.SavedView
|
||||
err := db.NewSelect().
|
||||
Model(&savedViews).
|
||||
Where("category = 'funnel'").
|
||||
Scan(ctx)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list all funnels: %v", err)
|
||||
}
|
||||
|
||||
var funnels []*Funnel
|
||||
for _, view := range savedViews {
|
||||
var funnel Funnel
|
||||
if err := json.Unmarshal([]byte(view.Data), &funnel); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal funnel data: %v", err)
|
||||
}
|
||||
|
||||
funnels = append(funnels, &funnel)
|
||||
}
|
||||
|
||||
return funnels, nil
|
||||
}
|
||||
|
||||
// DeleteFunnelFromDB deletes a funnel from the database
|
||||
func (c *SQLClient) DeleteFunnelFromDB(funnelID string) error {
|
||||
ctx := context.Background()
|
||||
db := c.store.BunDB()
|
||||
|
||||
_, err := db.NewDelete().
|
||||
Model(&types.SavedView{}).
|
||||
Where("uuid = ? AND category = 'funnel'", funnelID).
|
||||
Exec(ctx)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete funnel: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,32 +0,0 @@
|
||||
package traceFunnels
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// ValidateTimestampIsMilliseconds checks if a timestamp is likely in milliseconds format
|
||||
func ValidateTimestampIsMilliseconds(timestamp int64) bool {
|
||||
// If timestamp is 0, it's not valid
|
||||
if timestamp == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
timestampStr := strconv.FormatInt(timestamp, 10)
|
||||
|
||||
return len(timestampStr) >= 12 && len(timestampStr) <= 14
|
||||
}
|
||||
|
||||
// ValidateTimestamp checks if a timestamp is provided and in milliseconds format
|
||||
// Returns an error if validation fails
|
||||
func ValidateTimestamp(timestamp int64, fieldName string) error {
|
||||
if timestamp == 0 {
|
||||
return fmt.Errorf("%s is required", fieldName)
|
||||
}
|
||||
|
||||
if !ValidateTimestampIsMilliseconds(timestamp) {
|
||||
return fmt.Errorf("%s must be in milliseconds format (13 digits)", fieldName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -332,7 +332,6 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
|
||||
api.RegisterMessagingQueuesRoutes(r, am)
|
||||
api.RegisterThirdPartyApiRoutes(r, am)
|
||||
api.MetricExplorerRoutes(r, am)
|
||||
api.RegisterTraceFunnelsRoutes(r, am)
|
||||
|
||||
c := cors.New(cors.Options{
|
||||
AllowedOrigins: []string{"*"},
|
||||
|
Loading…
x
Reference in New Issue
Block a user