diff --git a/pkg/query-service/app/auth.go b/pkg/query-service/app/auth.go index dccf6dd8dd..f771a7cbfe 100644 --- a/pkg/query-service/app/auth.go +++ b/pkg/query-service/app/auth.go @@ -1,11 +1,13 @@ package app import ( + "context" "errors" "net/http" "github.com/gorilla/mux" "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/model" ) @@ -39,10 +41,12 @@ func (am *AuthMiddleware) ViewAccess(f func(http.ResponseWriter, *http.Request)) if !(auth.IsViewer(user) || auth.IsEditor(user) || auth.IsAdmin(user)) { RespondError(w, &model.ApiError{ Typ: model.ErrorForbidden, - Err: errors.New("API is accessible to viewers/editors/admins."), + Err: errors.New("API is accessible to viewers/editors/admins"), }, nil) return } + ctx := context.WithValue(r.Context(), constants.ContextUserKey, user) + r = r.WithContext(ctx) f(w, r) } } @@ -64,6 +68,8 @@ func (am *AuthMiddleware) EditAccess(f func(http.ResponseWriter, *http.Request)) }, nil) return } + ctx := context.WithValue(r.Context(), constants.ContextUserKey, user) + r = r.WithContext(ctx) f(w, r) } } @@ -86,6 +92,8 @@ func (am *AuthMiddleware) SelfAccess(f func(http.ResponseWriter, *http.Request)) }, nil) return } + ctx := context.WithValue(r.Context(), constants.ContextUserKey, user) + r = r.WithContext(ctx) f(w, r) } } @@ -107,6 +115,8 @@ func (am *AuthMiddleware) AdminAccess(f func(http.ResponseWriter, *http.Request) }, nil) return } + ctx := context.WithValue(r.Context(), constants.ContextUserKey, user) + r = r.WithContext(ctx) f(w, r) } } diff --git a/pkg/query-service/app/dashboards/model.go b/pkg/query-service/app/dashboards/model.go index e97facff62..fa51864d4c 100644 --- a/pkg/query-service/app/dashboards/model.go +++ b/pkg/query-service/app/dashboards/model.go @@ -1,6 +1,7 @@ package dashboards import ( + "context" "encoding/base64" "encoding/json" "fmt" @@ -14,6 +15,7 @@ import ( "github.com/gosimple/slug" "github.com/jmoiron/sqlx" "github.com/mitchellh/mapstructure" + "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" "go.uber.org/zap" @@ -95,6 +97,37 @@ func InitDB(dataSourceName string) (*sqlx.DB, error) { return nil, fmt.Errorf("error in creating ttl_status table: %s", err.Error()) } + // sqlite does not support "IF NOT EXISTS" + createdAt := `ALTER TABLE rules ADD COLUMN created_at datetime;` + _, err = db.Exec(createdAt) + if err != nil && !strings.Contains(err.Error(), "duplicate column name") { + return nil, fmt.Errorf("error in adding column created_at to rules table: %s", err.Error()) + } + + createdBy := `ALTER TABLE rules ADD COLUMN created_by TEXT;` + _, err = db.Exec(createdBy) + if err != nil && !strings.Contains(err.Error(), "duplicate column name") { + return nil, fmt.Errorf("error in adding column created_by to rules table: %s", err.Error()) + } + + updatedBy := `ALTER TABLE rules ADD COLUMN updated_by TEXT;` + _, err = db.Exec(updatedBy) + if err != nil && !strings.Contains(err.Error(), "duplicate column name") { + return nil, fmt.Errorf("error in adding column updated_by to rules table: %s", err.Error()) + } + + createdBy = `ALTER TABLE dashboards ADD COLUMN created_by TEXT;` + _, err = db.Exec(createdBy) + if err != nil && !strings.Contains(err.Error(), "duplicate column name") { + return nil, fmt.Errorf("error in adding column created_by to dashboards table: %s", err.Error()) + } + + updatedBy = `ALTER TABLE dashboards ADD COLUMN updated_by TEXT;` + _, err = db.Exec(updatedBy) + if err != nil && !strings.Contains(err.Error(), "duplicate column name") { + return nil, fmt.Errorf("error in adding column updated_by to dashboards table: %s", err.Error()) + } + return db, nil } @@ -103,7 +136,9 @@ type Dashboard struct { Uuid string `json:"uuid" db:"uuid"` Slug string `json:"-" db:"-"` CreatedAt time.Time `json:"created_at" db:"created_at"` + CreateBy *string `json:"created_by" db:"created_by"` UpdatedAt time.Time `json:"updated_at" db:"updated_at"` + UpdateBy *string `json:"updated_by" db:"updated_by"` Title string `json:"-" db:"-"` Data Data `json:"data" db:"data"` } @@ -132,16 +167,22 @@ func (c *Data) Scan(src interface{}) error { } // CreateDashboard creates a new dashboard -func CreateDashboard(data map[string]interface{}, fm interfaces.FeatureLookup) (*Dashboard, *model.ApiError) { +func CreateDashboard(ctx context.Context, data map[string]interface{}, fm interfaces.FeatureLookup) (*Dashboard, *model.ApiError) { dash := &Dashboard{ Data: data, } + var userEmail string + if user := common.GetUserFromContext(ctx); user != nil { + userEmail = user.Email + } dash.CreatedAt = time.Now() + dash.CreateBy = &userEmail dash.UpdatedAt = time.Now() + dash.UpdateBy = &userEmail dash.UpdateSlug() dash.Uuid = uuid.New().String() - map_data, err := json.Marshal(dash.Data) + mapData, err := json.Marshal(dash.Data) if err != nil { zap.S().Errorf("Error in marshalling data field in dashboard: ", dash, err) return nil, &model.ApiError{Typ: model.ErrorExec, Err: err} @@ -155,8 +196,8 @@ func CreateDashboard(data map[string]interface{}, fm interfaces.FeatureLookup) ( } } - // db.Prepare("Insert into dashboards where") - result, err := db.Exec("INSERT INTO dashboards (uuid, created_at, updated_at, data) VALUES ($1, $2, $3, $4)", dash.Uuid, dash.CreatedAt, dash.UpdatedAt, map_data) + result, err := db.Exec("INSERT INTO dashboards (uuid, created_at, created_by, updated_at, updated_by, data) VALUES ($1, $2, $3, $4, $5, $6)", + dash.Uuid, dash.CreatedAt, userEmail, dash.UpdatedAt, userEmail, mapData) if err != nil { zap.S().Errorf("Error in inserting dashboard data: ", dash, err) @@ -177,7 +218,7 @@ func CreateDashboard(data map[string]interface{}, fm interfaces.FeatureLookup) ( return dash, nil } -func GetDashboards() ([]Dashboard, *model.ApiError) { +func GetDashboards(ctx context.Context) ([]Dashboard, *model.ApiError) { dashboards := []Dashboard{} query := `SELECT * FROM dashboards` @@ -190,9 +231,9 @@ func GetDashboards() ([]Dashboard, *model.ApiError) { return dashboards, nil } -func DeleteDashboard(uuid string, fm interfaces.FeatureLookup) *model.ApiError { +func DeleteDashboard(ctx context.Context, uuid string, fm interfaces.FeatureLookup) *model.ApiError { - dashboard, dErr := GetDashboard(uuid) + dashboard, dErr := GetDashboard(ctx, uuid) if dErr != nil { zap.S().Errorf("Error in getting dashboard: ", uuid, dErr) return dErr @@ -222,7 +263,7 @@ func DeleteDashboard(uuid string, fm interfaces.FeatureLookup) *model.ApiError { return nil } -func GetDashboard(uuid string) (*Dashboard, *model.ApiError) { +func GetDashboard(ctx context.Context, uuid string) (*Dashboard, *model.ApiError) { dashboard := Dashboard{} query := `SELECT * FROM dashboards WHERE uuid=?` @@ -235,15 +276,15 @@ func GetDashboard(uuid string) (*Dashboard, *model.ApiError) { return &dashboard, nil } -func UpdateDashboard(uuid string, data map[string]interface{}, fm interfaces.FeatureLookup) (*Dashboard, *model.ApiError) { +func UpdateDashboard(ctx context.Context, uuid string, data map[string]interface{}, fm interfaces.FeatureLookup) (*Dashboard, *model.ApiError) { - map_data, err := json.Marshal(data) + mapData, err := json.Marshal(data) if err != nil { zap.S().Errorf("Error in marshalling data field in dashboard: ", data, err) return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} } - dashboard, apiErr := GetDashboard(uuid) + dashboard, apiErr := GetDashboard(ctx, uuid) if apiErr != nil { return nil, apiErr } @@ -265,10 +306,15 @@ func UpdateDashboard(uuid string, data map[string]interface{}, fm interfaces.Fea } dashboard.UpdatedAt = time.Now() + var userEmail string + if user := common.GetUserFromContext(ctx); user != nil { + userEmail = user.Email + } + dashboard.UpdateBy = &userEmail dashboard.Data = data - // db.Prepare("Insert into dashboards where") - _, err = db.Exec("UPDATE dashboards SET updated_at=$1, data=$2 WHERE uuid=$3 ", dashboard.UpdatedAt, map_data, dashboard.Uuid) + _, err = db.Exec("UPDATE dashboards SET updated_at=$1, updated_by=$2, data=$3 WHERE uuid=$4;", + dashboard.UpdatedAt, userEmail, mapData, dashboard.Uuid) if err != nil { zap.S().Errorf("Error in inserting dashboard data: ", data, err) diff --git a/pkg/query-service/app/dashboards/provision.go b/pkg/query-service/app/dashboards/provision.go index d8869f048b..fa2a935b58 100644 --- a/pkg/query-service/app/dashboards/provision.go +++ b/pkg/query-service/app/dashboards/provision.go @@ -1,6 +1,7 @@ package dashboards import ( + "context" "encoding/json" "io/ioutil" "os" @@ -38,13 +39,13 @@ func readCurrentDir(dir string, fm interfaces.FeatureLookup) error { continue } - _, apiErr := GetDashboard(data["uuid"].(string)) + _, apiErr := GetDashboard(context.Background(), data["uuid"].(string)) if apiErr == nil { zap.S().Infof("Creating Dashboards: Error in file: %s\t%s", filename, "Dashboard already present in database") continue } - _, apiErr = CreateDashboard(data, fm) + _, apiErr = CreateDashboard(context.Background(), data, fm) if apiErr != nil { zap.S().Errorf("Creating Dashboards: Error in file: %s\t%s", filename, apiErr.Err) continue diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 87f05ab098..eedaf0b11d 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -448,7 +448,7 @@ func Intersection(a, b []int) (c []int) { func (aH *APIHandler) getRule(w http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] - ruleResponse, err := aH.ruleManager.GetRule(id) + ruleResponse, err := aH.ruleManager.GetRule(r.Context(), id) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) return @@ -765,7 +765,7 @@ func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request func (aH *APIHandler) listRules(w http.ResponseWriter, r *http.Request) { - rules, err := aH.ruleManager.ListRuleStates() + rules, err := aH.ruleManager.ListRuleStates(r.Context()) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) return @@ -778,7 +778,7 @@ func (aH *APIHandler) listRules(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) getDashboards(w http.ResponseWriter, r *http.Request) { - allDashboards, err := dashboards.GetDashboards() + allDashboards, err := dashboards.GetDashboards(r.Context()) if err != nil { RespondError(w, err, nil) @@ -829,7 +829,7 @@ func (aH *APIHandler) getDashboards(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) deleteDashboard(w http.ResponseWriter, r *http.Request) { uuid := mux.Vars(r)["uuid"] - err := dashboards.DeleteDashboard(uuid, aH.featureFlags) + err := dashboards.DeleteDashboard(r.Context(), uuid, aH.featureFlags) if err != nil { RespondError(w, err, nil) @@ -935,7 +935,7 @@ func (aH *APIHandler) updateDashboard(w http.ResponseWriter, r *http.Request) { return } - dashboard, apiError := dashboards.UpdateDashboard(uuid, postData, aH.featureFlags) + dashboard, apiError := dashboards.UpdateDashboard(r.Context(), uuid, postData, aH.featureFlags) if apiError != nil { RespondError(w, apiError, nil) return @@ -949,7 +949,7 @@ func (aH *APIHandler) getDashboard(w http.ResponseWriter, r *http.Request) { uuid := mux.Vars(r)["uuid"] - dashboard, apiError := dashboards.GetDashboard(uuid) + dashboard, apiError := dashboards.GetDashboard(r.Context(), uuid) if apiError != nil { RespondError(w, apiError, nil) @@ -960,7 +960,7 @@ func (aH *APIHandler) getDashboard(w http.ResponseWriter, r *http.Request) { } -func (aH *APIHandler) saveAndReturn(w http.ResponseWriter, signozDashboard model.DashboardData) { +func (aH *APIHandler) saveAndReturn(w http.ResponseWriter, r *http.Request, signozDashboard model.DashboardData) { toSave := make(map[string]interface{}) toSave["title"] = signozDashboard.Title toSave["description"] = signozDashboard.Description @@ -969,7 +969,7 @@ func (aH *APIHandler) saveAndReturn(w http.ResponseWriter, signozDashboard model toSave["widgets"] = signozDashboard.Widgets toSave["variables"] = signozDashboard.Variables - dashboard, apiError := dashboards.CreateDashboard(toSave, aH.featureFlags) + dashboard, apiError := dashboards.CreateDashboard(r.Context(), toSave, aH.featureFlags) if apiError != nil { RespondError(w, apiError, nil) return @@ -988,7 +988,7 @@ func (aH *APIHandler) createDashboardsTransform(w http.ResponseWriter, r *http.R err = json.Unmarshal(b, &importData) if err == nil { signozDashboard := dashboards.TransformGrafanaJSONToSignoz(importData) - aH.saveAndReturn(w, signozDashboard) + aH.saveAndReturn(w, r, signozDashboard) return } RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, "Error while creating dashboard from grafana json") @@ -1010,7 +1010,7 @@ func (aH *APIHandler) createDashboards(w http.ResponseWriter, r *http.Request) { return } - dash, apiErr := dashboards.CreateDashboard(postData, aH.featureFlags) + dash, apiErr := dashboards.CreateDashboard(r.Context(), postData, aH.featureFlags) if apiErr != nil { RespondError(w, apiErr, nil) @@ -1051,7 +1051,7 @@ func (aH *APIHandler) deleteRule(w http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] - err := aH.ruleManager.DeleteRule(id) + err := aH.ruleManager.DeleteRule(r.Context(), id) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) @@ -1074,7 +1074,7 @@ func (aH *APIHandler) patchRule(w http.ResponseWriter, r *http.Request) { return } - gettableRule, err := aH.ruleManager.PatchRule(string(body), id) + gettableRule, err := aH.ruleManager.PatchRule(r.Context(), string(body), id) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) @@ -1095,7 +1095,7 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) { return } - err = aH.ruleManager.EditRule(string(body), id) + err = aH.ruleManager.EditRule(r.Context(), string(body), id) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) @@ -1248,7 +1248,7 @@ func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) { return } - err = aH.ruleManager.CreateRule(string(body)) + err = aH.ruleManager.CreateRule(r.Context(), string(body)) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) return diff --git a/pkg/query-service/common/user.go b/pkg/query-service/common/user.go new file mode 100644 index 0000000000..ecfc519fc0 --- /dev/null +++ b/pkg/query-service/common/user.go @@ -0,0 +1,16 @@ +package common + +import ( + "context" + + "go.signoz.io/signoz/pkg/query-service/constants" + "go.signoz.io/signoz/pkg/query-service/model" +) + +func GetUserFromContext(ctx context.Context) *model.UserPayload { + user, ok := ctx.Value(constants.ContextUserKey).(*model.UserPayload) + if !ok { + return nil + } + return user +} diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 0f181c36fd..3027ecaee2 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -17,6 +17,10 @@ const ( OpAmpWsEndpoint = "0.0.0.0:4320" // address for opamp websocket ) +type ContextKey string + +const ContextUserKey ContextKey = "user" + var ConfigSignozIo = "https://config.signoz.io/api/v1" var DEFAULT_TELEMETRY_ANONYMOUS = false diff --git a/pkg/query-service/rules/apiParams.go b/pkg/query-service/rules/apiParams.go index f33af06e81..300eac330f 100644 --- a/pkg/query-service/rules/apiParams.go +++ b/pkg/query-service/rules/apiParams.go @@ -241,4 +241,8 @@ type GettableRule struct { Id string `json:"id"` State string `json:"state"` PostableRule + CreatedAt *time.Time `json:"createAt"` + CreatedBy *string `json:"createBy"` + UpdatedAt *time.Time `json:"updateAt"` + UpdatedBy *string `json:"updateBy"` } diff --git a/pkg/query-service/rules/db.go b/pkg/query-service/rules/db.go index 7070f23346..cc3a33f953 100644 --- a/pkg/query-service/rules/db.go +++ b/pkg/query-service/rules/db.go @@ -1,35 +1,41 @@ package rules import ( + "context" "fmt" - "github.com/jmoiron/sqlx" - "go.uber.org/zap" "strconv" "time" + + "github.com/jmoiron/sqlx" + "go.signoz.io/signoz/pkg/query-service/common" + "go.uber.org/zap" ) // Data store to capture user alert rule settings type RuleDB interface { // CreateRuleTx stores rule in the db and returns tx and group name (on success) - CreateRuleTx(rule string) (string, Tx, error) + CreateRuleTx(ctx context.Context, rule string) (string, Tx, error) // EditRuleTx updates the given rule in the db and returns tx and group name (on success) - EditRuleTx(rule string, id string) (string, Tx, error) + EditRuleTx(ctx context.Context, rule string, id string) (string, Tx, error) // DeleteRuleTx deletes the given rule in the db and returns tx and group name (on success) - DeleteRuleTx(id string) (string, Tx, error) + DeleteRuleTx(ctx context.Context, id string) (string, Tx, error) // GetStoredRules fetches the rule definitions from db - GetStoredRules() ([]StoredRule, error) + GetStoredRules(ctx context.Context) ([]StoredRule, error) // GetStoredRule for a given ID from DB - GetStoredRule(id string) (*StoredRule, error) + GetStoredRule(ctx context.Context, id string) (*StoredRule, error) } type StoredRule struct { - Id int `json:"id" db:"id"` - UpdatedAt time.Time `json:"updated_at" db:"updated_at"` - Data string `json:"data" db:"data"` + Id int `json:"id" db:"id"` + CreatedAt *time.Time `json:"created_at" db:"created_at"` + CreatedBy *string `json:"created_by" db:"created_by"` + UpdatedAt *time.Time `json:"updated_at" db:"updated_at"` + UpdatedBy *string `json:"updated_by" db:"updated_by"` + Data string `json:"data" db:"data"` } type Tx interface { @@ -51,17 +57,23 @@ func newRuleDB(db *sqlx.DB) RuleDB { // CreateRuleTx stores a given rule in db and returns task name, // sql tx and error (if any) -func (r *ruleDB) CreateRuleTx(rule string) (string, Tx, error) { +func (r *ruleDB) CreateRuleTx(ctx context.Context, rule string) (string, Tx, error) { var groupName string var lastInsertId int64 + var userEmail string + if user := common.GetUserFromContext(ctx); user != nil { + userEmail = user.Email + } + createdAt := time.Now() + updatedAt := time.Now() tx, err := r.Begin() if err != nil { return groupName, nil, err } - stmt, err := tx.Prepare(`INSERT into rules (updated_at, data) VALUES($1,$2);`) + stmt, err := tx.Prepare(`INSERT into rules (created_at, created_by, updated_at, updated_by, data) VALUES($1,$2,$3,$4,$5);`) if err != nil { zap.S().Errorf("Error in preparing statement for INSERT to rules\n", err) tx.Rollback() @@ -70,7 +82,7 @@ func (r *ruleDB) CreateRuleTx(rule string) (string, Tx, error) { defer stmt.Close() - result, err := stmt.Exec(time.Now(), rule) + result, err := stmt.Exec(createdAt, userEmail, updatedAt, userEmail, rule) if err != nil { zap.S().Errorf("Error in Executing prepared statement for INSERT to rules\n", err) tx.Rollback() // return an error too, we may want to wrap them @@ -87,7 +99,7 @@ func (r *ruleDB) CreateRuleTx(rule string) (string, Tx, error) { // EditRuleTx stores a given rule string in database and returns // task name, sql tx and error (if any) -func (r *ruleDB) EditRuleTx(rule string, id string) (string, Tx, error) { +func (r *ruleDB) EditRuleTx(ctx context.Context, rule string, id string) (string, Tx, error) { var groupName string idInt, _ := strconv.Atoi(id) @@ -95,6 +107,11 @@ func (r *ruleDB) EditRuleTx(rule string, id string) (string, Tx, error) { return groupName, nil, fmt.Errorf("failed to read alert id from parameters") } + var userEmail string + if user := common.GetUserFromContext(ctx); user != nil { + userEmail = user.Email + } + updatedAt := time.Now() groupName = prepareTaskName(int64(idInt)) // todo(amol): resolve this error - database locked when using @@ -103,7 +120,7 @@ func (r *ruleDB) EditRuleTx(rule string, id string) (string, Tx, error) { //if err != nil { // return groupName, tx, err //} - stmt, err := r.Prepare(`UPDATE rules SET updated_at=$1, data=$2 WHERE id=$3;`) + stmt, err := r.Prepare(`UPDATE rules SET updated_by=$1, updated_at=$2, data=$3 WHERE id=$4;`) if err != nil { zap.S().Errorf("Error in preparing statement for UPDATE to rules\n", err) // tx.Rollback() @@ -111,7 +128,7 @@ func (r *ruleDB) EditRuleTx(rule string, id string) (string, Tx, error) { } defer stmt.Close() - if _, err := stmt.Exec(time.Now(), rule, idInt); err != nil { + if _, err := stmt.Exec(userEmail, updatedAt, rule, idInt); err != nil { zap.S().Errorf("Error in Executing prepared statement for UPDATE to rules\n", err) // tx.Rollback() // return an error too, we may want to wrap them return groupName, nil, err @@ -121,7 +138,7 @@ func (r *ruleDB) EditRuleTx(rule string, id string) (string, Tx, error) { // DeleteRuleTx deletes a given rule with id and returns // taskname, sql tx and error (if any) -func (r *ruleDB) DeleteRuleTx(id string) (string, Tx, error) { +func (r *ruleDB) DeleteRuleTx(ctx context.Context, id string) (string, Tx, error) { idInt, _ := strconv.Atoi(id) groupName := prepareTaskName(int64(idInt)) @@ -149,11 +166,11 @@ func (r *ruleDB) DeleteRuleTx(id string) (string, Tx, error) { return groupName, nil, nil } -func (r *ruleDB) GetStoredRules() ([]StoredRule, error) { +func (r *ruleDB) GetStoredRules(ctx context.Context) ([]StoredRule, error) { rules := []StoredRule{} - query := fmt.Sprintf("SELECT id, updated_at, data FROM rules") + query := "SELECT id, created_at, created_by, updated_at, updated_by, data FROM rules" err := r.Select(&rules, query) @@ -165,7 +182,7 @@ func (r *ruleDB) GetStoredRules() ([]StoredRule, error) { return rules, nil } -func (r *ruleDB) GetStoredRule(id string) (*StoredRule, error) { +func (r *ruleDB) GetStoredRule(ctx context.Context, id string) (*StoredRule, error) { intId, err := strconv.Atoi(id) if err != nil { return nil, fmt.Errorf("invalid id parameter") @@ -173,7 +190,7 @@ func (r *ruleDB) GetStoredRule(id string) (*StoredRule, error) { rule := &StoredRule{} - query := fmt.Sprintf("SELECT id, updated_at, data FROM rules WHERE id=%d", intId) + query := fmt.Sprintf("SELECT id, created_at, created_by, updated_at, updated_by, data FROM rules WHERE id=%d", intId) err = r.Get(rule, query) // zap.S().Info(query) diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index 70596982d9..30c643b031 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -16,8 +16,9 @@ import ( "go.uber.org/zap" + "errors" + "github.com/jmoiron/sqlx" - "github.com/pkg/errors" // opentracing "github.com/opentracing/opentracing-go" am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" @@ -27,8 +28,6 @@ import ( "go.signoz.io/signoz/pkg/query-service/utils/labels" ) -// namespace for prom metrics -const namespace = "signoz" const taskNamesuffix = "webAppEditor" func ruleIdFromTaskName(n string) string { @@ -77,8 +76,6 @@ type Manager struct { // datastore to store alert definitions ruleDB RuleDB - // pause all rule tasks - pause bool logger log.Logger featureFlags interfaces.FeatureLookup @@ -142,7 +139,7 @@ func (m *Manager) Pause(b bool) { } func (m *Manager) initiate() error { - storedRules, err := m.ruleDB.GetStoredRules() + storedRules, err := m.ruleDB.GetStoredRules(context.Background()) if err != nil { return err } @@ -172,7 +169,7 @@ func (m *Manager) initiate() error { zap.S().Info("msg:", "migrating rule from JSON to yaml", "\t rule:", rec.Data, "\t parsed rule:", parsedRule) ruleJSON, err := json.Marshal(parsedRule) if err == nil { - taskName, _, err := m.ruleDB.EditRuleTx(string(ruleJSON), fmt.Sprintf("%d", rec.Id)) + taskName, _, err := m.ruleDB.EditRuleTx(context.Background(), string(ruleJSON), fmt.Sprintf("%d", rec.Id)) if err != nil { zap.S().Errorf("msg: failed to migrate rule ", "/t error:", err) } else { @@ -195,6 +192,10 @@ func (m *Manager) initiate() error { } } + if len(loadErrors) > 0 { + return errors.Join(loadErrors...) + } + return nil } @@ -223,11 +224,11 @@ func (m *Manager) Stop() { // EditRuleDefinition writes the rule definition to the // datastore and also updates the rule executor -func (m *Manager) EditRule(ruleStr string, id string) error { +func (m *Manager) EditRule(ctx context.Context, ruleStr string, id string) error { parsedRule, errs := ParsePostableRule([]byte(ruleStr)) - currentRule, err := m.GetRule(id) + currentRule, err := m.GetRule(ctx, id) if err != nil { zap.S().Errorf("msg: ", "failed to get the rule from rule db", "\t ruleid: ", id) return err @@ -247,7 +248,7 @@ func (m *Manager) EditRule(ruleStr string, id string) error { return errs[0] } - taskName, _, err := m.ruleDB.EditRuleTx(ruleStr, id) + taskName, _, err := m.ruleDB.EditRuleTx(ctx, ruleStr, id) if err != nil { return err } @@ -314,7 +315,7 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error { return nil } -func (m *Manager) DeleteRule(id string) error { +func (m *Manager) DeleteRule(ctx context.Context, id string) error { idInt, err := strconv.Atoi(id) if err != nil { @@ -323,7 +324,7 @@ func (m *Manager) DeleteRule(id string) error { } // update feature usage - rule, err := m.GetRule(id) + rule, err := m.GetRule(ctx, id) if err != nil { zap.S().Errorf("msg: ", "failed to get the rule from rule db", "\t ruleid: ", id) return err @@ -334,7 +335,7 @@ func (m *Manager) DeleteRule(id string) error { m.deleteTask(taskName) } - if _, _, err := m.ruleDB.DeleteRuleTx(id); err != nil { + if _, _, err := m.ruleDB.DeleteRuleTx(ctx, id); err != nil { zap.S().Errorf("msg: ", "failed to delete the rule from rule db", "\t ruleid: ", id) return err } @@ -365,7 +366,7 @@ func (m *Manager) deleteTask(taskName string) { // CreateRule stores rule def into db and also // starts an executor for the rule -func (m *Manager) CreateRule(ruleStr string) error { +func (m *Manager) CreateRule(ctx context.Context, ruleStr string) error { parsedRule, errs := ParsePostableRule([]byte(ruleStr)) // check if the rule uses any feature that is not enabled @@ -380,7 +381,7 @@ func (m *Manager) CreateRule(ruleStr string) error { return errs[0] } - taskName, tx, err := m.ruleDB.CreateRuleTx(ruleStr) + taskName, tx, err := m.ruleDB.CreateRuleTx(ctx, ruleStr) if err != nil { return err } @@ -665,10 +666,10 @@ func (m *Manager) ListActiveRules() ([]Rule, error) { return ruleList, nil } -func (m *Manager) ListRuleStates() (*GettableRules, error) { +func (m *Manager) ListRuleStates(ctx context.Context) (*GettableRules, error) { // fetch rules from DB - storedRules, err := m.ruleDB.GetStoredRules() + storedRules, err := m.ruleDB.GetStoredRules(ctx) if err != nil { return nil, err } @@ -693,14 +694,18 @@ func (m *Manager) ListRuleStates() (*GettableRules, error) { } else { ruleResponse.State = rm.State().String() } + ruleResponse.CreatedAt = s.CreatedAt + ruleResponse.CreatedBy = s.CreatedBy + ruleResponse.UpdatedAt = s.UpdatedAt + ruleResponse.UpdatedBy = s.UpdatedBy resp = append(resp, ruleResponse) } return &GettableRules{Rules: resp}, nil } -func (m *Manager) GetRule(id string) (*GettableRule, error) { - s, err := m.ruleDB.GetStoredRule(id) +func (m *Manager) GetRule(ctx context.Context, id string) (*GettableRule, error) { + s, err := m.ruleDB.GetStoredRule(ctx, id) if err != nil { return nil, err } @@ -746,7 +751,7 @@ func (m *Manager) syncRuleStateWithTask(taskName string, rule *PostableRule) err // - over write the patch attributes received in input (ruleStr) // - re-deploy or undeploy task as necessary // - update the patched rule in the DB -func (m *Manager) PatchRule(ruleStr string, ruleId string) (*GettableRule, error) { +func (m *Manager) PatchRule(ctx context.Context, ruleStr string, ruleId string) (*GettableRule, error) { if ruleId == "" { return nil, fmt.Errorf("id is mandatory for patching rule") @@ -755,7 +760,7 @@ func (m *Manager) PatchRule(ruleStr string, ruleId string) (*GettableRule, error taskName := prepareTaskName(ruleId) // retrieve rule from DB - storedJSON, err := m.ruleDB.GetStoredRule(ruleId) + storedJSON, err := m.ruleDB.GetStoredRule(ctx, ruleId) if err != nil { zap.S().Errorf("msg:", "failed to get stored rule with given id", "\t error:", err) return nil, err @@ -789,7 +794,7 @@ func (m *Manager) PatchRule(ruleStr string, ruleId string) (*GettableRule, error } // write updated rule to db - if _, _, err = m.ruleDB.EditRuleTx(string(patchedRuleBytes), ruleId); err != nil { + if _, _, err = m.ruleDB.EditRuleTx(ctx, string(patchedRuleBytes), ruleId); err != nil { // write failed, rollback task state // restore task state from the stored rule