feat: db operations, module and handler implementation

Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
This commit is contained in:
Shivanshu Raj Shrivastava 2025-04-29 16:44:23 +05:30
parent e258d70df5
commit 43337b6697
No known key found for this signature in database
GPG Key ID: D34D26C62AC3E9AE
3 changed files with 695 additions and 0 deletions

View File

@ -0,0 +1,453 @@
package impltracefunnel
import (
"encoding/json"
"net/http"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/tracefunnel"
"github.com/SigNoz/signoz/pkg/types/authtypes"
tf "github.com/SigNoz/signoz/pkg/types/tracefunnel"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
)
type handler struct {
module tracefunnel.Module
}
func NewHandler(module tracefunnel.Module) tracefunnel.Handler {
return &handler{module: module}
}
func (handler *handler) New(rw http.ResponseWriter, r *http.Request) {
var req tf.FunnelRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
render.Error(rw, err)
return
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(rw, err)
return
}
userID := claims.UserID
orgID := claims.OrgID
funnels, err := handler.module.List(r.Context(), orgID)
if err != nil {
render.Error(rw, err)
return
}
for _, f := range funnels {
if f.Name == req.Name {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"a funnel with name '%s' already exists in this organization",
req.Name))
return
}
}
funnel, err := handler.module.Create(r.Context(), req.Timestamp, req.Name, userID, orgID)
if err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"failed to create funnel"))
return
}
response := tf.FunnelResponse{
FunnelID: funnel.ID.String(),
FunnelName: funnel.Name,
CreatedAt: req.Timestamp,
UserEmail: claims.Email,
OrgID: orgID,
}
render.Success(rw, http.StatusOK, response)
}
func (handler *handler) UpdateSteps(rw http.ResponseWriter, r *http.Request) {
var req tf.FunnelRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
render.Error(rw, err)
return
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(rw, err)
return
}
userID := claims.UserID
orgID := claims.OrgID
if err := tracefunnel.ValidateTimestamp(req.Timestamp, "timestamp"); err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"timestamp is invalid: %v", err))
return
}
funnel, err := handler.module.Get(r.Context(), req.FunnelID.String())
if err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"funnel not found: %v", err))
return
}
// Check if name is being updated and if it already exists
if req.Name != "" && req.Name != funnel.Name {
funnels, err := handler.module.List(r.Context(), orgID)
if err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"failed to list funnels: %v", err))
return
}
for _, f := range funnels {
if f.Name == req.Name {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"a funnel with name '%s' already exists in this organization", req.Name))
return
}
}
}
// Process each step in the request
for i := range req.Steps {
if req.Steps[i].Order < 1 {
req.Steps[i].Order = int64(i + 1) // Default to sequential ordering if not specified
}
// Generate a new UUID for the step if it doesn't have one
if req.Steps[i].Id.IsZero() {
newUUID := valuer.GenerateUUID()
req.Steps[i].Id = newUUID
}
}
if err := tracefunnel.ValidateFunnelSteps(req.Steps); err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"invalid funnel steps: %v", err))
return
}
// Normalize step orders
req.Steps = tracefunnel.NormalizeFunnelSteps(req.Steps)
// Update the funnel with new steps
funnel.Steps = req.Steps
funnel.UpdatedAt = time.Unix(0, req.Timestamp*1000000) // Convert to nanoseconds
funnel.UpdatedBy = userID
if req.Name != "" {
funnel.Name = req.Name
}
if req.Description != "" {
funnel.Description = req.Description
}
// Update funnel in database
err = handler.module.Update(r.Context(), funnel, userID)
if err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"failed to update funnel in database: %v", err))
return
}
// Get the updated funnel to return in response
updatedFunnel, err := handler.module.Get(r.Context(), funnel.ID.String())
if err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"failed to get updated funnel: %v", err))
return
}
response := tf.FunnelResponse{
FunnelName: updatedFunnel.Name,
FunnelID: updatedFunnel.ID.String(),
Steps: updatedFunnel.Steps,
CreatedAt: updatedFunnel.CreatedAt.UnixNano() / 1000000,
CreatedBy: updatedFunnel.CreatedBy,
OrgID: updatedFunnel.OrgID.String(),
UpdatedBy: userID,
UpdatedAt: updatedFunnel.UpdatedAt.UnixNano() / 1000000,
Description: updatedFunnel.Description,
UserEmail: claims.Email,
}
render.Success(rw, http.StatusOK, response)
}
func (handler *handler) UpdateFunnel(rw http.ResponseWriter, r *http.Request) {
var req tf.FunnelRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
render.Error(rw, err)
return
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(rw, err)
return
}
userID := claims.UserID
orgID := claims.OrgID
if err := tracefunnel.ValidateTimestamp(req.Timestamp, "timestamp"); err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "timestamp is invalid: %v", err))
return
}
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
funnel, err := handler.module.Get(r.Context(), funnelID)
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "funnel not found: %v", err))
return
}
// Check if name is being updated and if it already exists
if req.Name != "" && req.Name != funnel.Name {
funnels, err := handler.module.List(r.Context(), orgID)
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to list funnels: %v", err))
return
}
for _, f := range funnels {
if f.Name == req.Name {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "a funnel with name '%s' already exists in this organization", req.Name))
return
}
}
}
funnel.UpdatedAt = time.Unix(0, req.Timestamp*1000000) // Convert to nanoseconds
funnel.UpdatedBy = userID
if req.Name != "" {
funnel.Name = req.Name
}
if req.Description != "" {
funnel.Description = req.Description
}
err = handler.module.Update(r.Context(), funnel, userID)
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to update funnel in database: %v", err))
return
}
// Get the updated funnel to return in response
updatedFunnel, err := handler.module.Get(r.Context(), funnel.ID.String())
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to get updated funnel: %v", err))
return
}
response := tf.FunnelResponse{
FunnelName: updatedFunnel.Name,
FunnelID: updatedFunnel.ID.String(),
Steps: updatedFunnel.Steps,
CreatedAt: updatedFunnel.CreatedAt.UnixNano() / 1000000,
CreatedBy: updatedFunnel.CreatedBy,
OrgID: updatedFunnel.OrgID.String(),
UpdatedBy: userID,
UpdatedAt: updatedFunnel.UpdatedAt.UnixNano() / 1000000,
Description: updatedFunnel.Description,
UserEmail: claims.Email,
}
render.Success(rw, http.StatusOK, response)
}
func (handler *handler) List(rw http.ResponseWriter, r *http.Request) {
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"unauthenticated"))
return
}
orgID := claims.OrgID
funnels, err := handler.module.List(r.Context(), orgID)
if err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"failed to list funnels: %v", err))
return
}
var response []tf.FunnelResponse
for _, f := range funnels {
funnelResp := tf.FunnelResponse{
FunnelName: f.Name,
FunnelID: f.ID.String(),
CreatedAt: f.CreatedAt.UnixNano() / 1000000,
CreatedBy: f.CreatedBy,
OrgID: f.OrgID.String(),
UpdatedAt: f.UpdatedAt.UnixNano() / 1000000,
UpdatedBy: f.UpdatedBy,
Description: f.Description,
}
// Get user email if available
if f.CreatedByUser != nil {
funnelResp.UserEmail = f.CreatedByUser.Email
}
response = append(response, funnelResp)
}
render.Success(rw, http.StatusOK, response)
}
func (handler *handler) Get(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
funnel, err := handler.module.Get(r.Context(), funnelID)
if err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"funnel not found: %v", err))
return
}
response := tf.FunnelResponse{
FunnelID: funnel.ID.String(),
FunnelName: funnel.Name,
Description: funnel.Description,
CreatedAt: funnel.CreatedAt.UnixNano() / 1000000,
UpdatedAt: funnel.UpdatedAt.UnixNano() / 1000000,
CreatedBy: funnel.CreatedBy,
UpdatedBy: funnel.UpdatedBy,
OrgID: funnel.OrgID.String(),
Steps: funnel.Steps,
}
if funnel.CreatedByUser != nil {
response.UserEmail = funnel.CreatedByUser.Email
}
render.Success(rw, http.StatusOK, response)
}
func (handler *handler) Delete(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
err := handler.module.Delete(r.Context(), funnelID)
if err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"failed to delete funnel: %v", err))
return
}
render.Success(rw, http.StatusOK, nil)
}
func (handler *handler) Save(rw http.ResponseWriter, r *http.Request) {
var req tf.FunnelRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"invalid request: %v", err))
return
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"unauthenticated"))
return
}
orgID := claims.OrgID
usrID := claims.UserID
funnel, err := handler.module.Get(r.Context(), req.FunnelID.String())
if err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"funnel not found: %v", err))
return
}
updateTimestamp := req.Timestamp
if updateTimestamp == 0 {
updateTimestamp = time.Now().UnixMilli()
} else if !tracefunnel.ValidateTimestampIsMilliseconds(updateTimestamp) {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"timestamp must be in milliseconds format (13 digits)"))
return
}
funnel.UpdatedAt = time.Unix(0, updateTimestamp*1000000) // Convert to nanoseconds
if req.UserID != "" {
funnel.UpdatedBy = usrID
}
funnel.Description = req.Description
if err := handler.module.Save(r.Context(), funnel, funnel.UpdatedBy, orgID); err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"failed to save funnel: %v", err))
return
}
createdAt, updatedAt, extraDataFromDB, err := handler.module.GetFunnelMetadata(r.Context(), funnel.ID.String())
if err != nil {
render.Error(rw,
errors.Newf(errors.TypeInvalidInput,
errors.CodeInvalidInput,
"failed to get funnel metadata: %v", err))
return
}
resp := tf.FunnelResponse{
FunnelName: funnel.Name,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
CreatedBy: funnel.CreatedBy,
UpdatedBy: funnel.UpdatedBy,
OrgID: funnel.OrgID.String(),
Description: extraDataFromDB,
}
render.Success(rw, http.StatusOK, resp)
}

View File

@ -0,0 +1,123 @@
package impltracefunnel
import (
"context"
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/modules/tracefunnel"
"github.com/SigNoz/signoz/pkg/types"
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
"github.com/SigNoz/signoz/pkg/valuer"
)
type module struct {
store traceFunnels.TraceFunnelStore
}
func NewModule(store traceFunnels.TraceFunnelStore) tracefunnel.Module {
return &module{
store: store,
}
}
func (module *module) Create(ctx context.Context, timestamp int64, name string, userID string, orgID string) (*traceFunnels.Funnel, error) {
orgUUID, err := valuer.NewUUID(orgID)
if err != nil {
return nil, fmt.Errorf("invalid org ID: %v", err)
}
funnel := &traceFunnels.Funnel{
BaseMetadata: traceFunnels.BaseMetadata{
Name: name,
OrgID: orgUUID,
},
}
funnel.CreatedAt = time.Unix(0, timestamp*1000000) // Convert to nanoseconds
funnel.CreatedBy = userID
// Set up the user relationship
funnel.CreatedByUser = &types.User{
ID: userID,
}
if err := module.store.Create(ctx, funnel); err != nil {
return nil, fmt.Errorf("failed to create funnel: %v", err)
}
return funnel, nil
}
// Get gets a funnel by ID
func (module *module) Get(ctx context.Context, funnelID string) (*traceFunnels.Funnel, error) {
uuid, err := valuer.NewUUID(funnelID)
if err != nil {
return nil, fmt.Errorf("invalid funnel ID: %v", err)
}
return module.store.Get(ctx, uuid)
}
// Update updates a funnel
func (module *module) Update(ctx context.Context, funnel *traceFunnels.Funnel, userID string) error {
funnel.UpdatedBy = userID
return module.store.Update(ctx, funnel)
}
// List lists all funnels for an organization
func (module *module) List(ctx context.Context, orgID string) ([]*traceFunnels.Funnel, error) {
orgUUID, err := valuer.NewUUID(orgID)
if err != nil {
return nil, fmt.Errorf("invalid org ID: %v", err)
}
funnels, err := module.store.List(ctx)
if err != nil {
return nil, err
}
// Filter by orgID
var orgFunnels []*traceFunnels.Funnel
for _, f := range funnels {
if f.OrgID == orgUUID {
orgFunnels = append(orgFunnels, f)
}
}
return orgFunnels, nil
}
// Delete deletes a funnel
func (module *module) Delete(ctx context.Context, funnelID string) error {
uuid, err := valuer.NewUUID(funnelID)
if err != nil {
return fmt.Errorf("invalid funnel ID: %v", err)
}
return module.store.Delete(ctx, uuid)
}
// Save saves a funnel
func (module *module) Save(ctx context.Context, funnel *traceFunnels.Funnel, userID string, orgID string) error {
orgUUID, err := valuer.NewUUID(orgID)
if err != nil {
return fmt.Errorf("invalid org ID: %v", err)
}
funnel.UpdatedBy = userID
funnel.OrgID = orgUUID
return module.store.Update(ctx, funnel)
}
// GetFunnelMetadata gets metadata for a funnel
func (module *module) GetFunnelMetadata(ctx context.Context, funnelID string) (int64, int64, string, error) {
uuid, err := valuer.NewUUID(funnelID)
if err != nil {
return 0, 0, "", fmt.Errorf("invalid funnel ID: %v", err)
}
funnel, err := module.store.Get(ctx, uuid)
if err != nil {
return 0, 0, "", err
}
return funnel.CreatedAt.UnixNano() / 1000000, funnel.UpdatedAt.UnixNano() / 1000000, funnel.Description, nil
}

View File

@ -0,0 +1,119 @@
package impltracefunnel
import (
"context"
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/sqlstore"
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
"github.com/SigNoz/signoz/pkg/valuer"
)
type store struct {
sqlstore sqlstore.SQLStore
}
func NewStore(sqlstore sqlstore.SQLStore) traceFunnels.TraceFunnelStore {
return &store{sqlstore: sqlstore}
}
func (store *store) Create(ctx context.Context, funnel *traceFunnels.Funnel) error {
if funnel.ID.IsZero() {
funnel.ID = valuer.GenerateUUID()
}
if funnel.CreatedAt.IsZero() {
funnel.CreatedAt = time.Now()
}
if funnel.UpdatedAt.IsZero() {
funnel.UpdatedAt = time.Now()
}
_, err := store.
sqlstore.
BunDB().
NewInsert().
Model(funnel).
Exec(ctx)
if err != nil {
return fmt.Errorf("failed to create funnel: %v", err)
}
if funnel.CreatedByUser != nil {
_, err = store.sqlstore.BunDB().NewUpdate().
Model(funnel).
Set("created_by = ?", funnel.CreatedByUser.ID).
Where("id = ?", funnel.ID).
Exec(ctx)
if err != nil {
return fmt.Errorf("failed to update funnel user relationship: %v", err)
}
}
return nil
}
// Get retrieves a funnel by ID
func (store *store) Get(ctx context.Context, uuid valuer.UUID) (*traceFunnels.Funnel, error) {
funnel := &traceFunnels.Funnel{}
err := store.
sqlstore.
BunDB().
NewSelect().
Model(funnel).
Relation("CreatedByUser").
Where("?TableAlias.id = ?", uuid).
Scan(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get funnel: %v", err)
}
return funnel, nil
}
// Update updates an existing funnel
func (store *store) Update(ctx context.Context, funnel *traceFunnels.Funnel) error {
funnel.UpdatedAt = time.Now()
_, err := store.
sqlstore.
BunDB().
NewUpdate().
Model(funnel).
WherePK().
Exec(ctx)
if err != nil {
return fmt.Errorf("failed to update funnel: %v", err)
}
return nil
}
// List retrieves all funnels
func (store *store) List(ctx context.Context) ([]*traceFunnels.Funnel, error) {
var funnels []*traceFunnels.Funnel
err := store.
sqlstore.
BunDB().
NewSelect().
Model(&funnels).
Relation("CreatedByUser").
Scan(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list funnels: %v", err)
}
return funnels, nil
}
// Delete removes a funnel by ID
func (store *store) Delete(ctx context.Context, uuid valuer.UUID) error {
_, err := store.
sqlstore.
BunDB().
NewDelete().
Model((*traceFunnels.Funnel)(nil)).
Where("id = ?", uuid).Exec(ctx)
if err != nil {
return fmt.Errorf("failed to delete funnel: %v", err)
}
return nil
}