mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-10-10 02:06:30 +08:00
feat: add created{By,At} , updated{By,At} to alerts/dashboards (#3754)
This commit is contained in:
parent
f7fe64a8df
commit
e0b83bda62
@ -1,11 +1,13 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"go.signoz.io/signoz/pkg/query-service/auth"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
)
|
||||
|
||||
@ -39,10 +41,12 @@ func (am *AuthMiddleware) ViewAccess(f func(http.ResponseWriter, *http.Request))
|
||||
if !(auth.IsViewer(user) || auth.IsEditor(user) || auth.IsAdmin(user)) {
|
||||
RespondError(w, &model.ApiError{
|
||||
Typ: model.ErrorForbidden,
|
||||
Err: errors.New("API is accessible to viewers/editors/admins."),
|
||||
Err: errors.New("API is accessible to viewers/editors/admins"),
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
ctx := context.WithValue(r.Context(), constants.ContextUserKey, user)
|
||||
r = r.WithContext(ctx)
|
||||
f(w, r)
|
||||
}
|
||||
}
|
||||
@ -64,6 +68,8 @@ func (am *AuthMiddleware) EditAccess(f func(http.ResponseWriter, *http.Request))
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
ctx := context.WithValue(r.Context(), constants.ContextUserKey, user)
|
||||
r = r.WithContext(ctx)
|
||||
f(w, r)
|
||||
}
|
||||
}
|
||||
@ -86,6 +92,8 @@ func (am *AuthMiddleware) SelfAccess(f func(http.ResponseWriter, *http.Request))
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
ctx := context.WithValue(r.Context(), constants.ContextUserKey, user)
|
||||
r = r.WithContext(ctx)
|
||||
f(w, r)
|
||||
}
|
||||
}
|
||||
@ -107,6 +115,8 @@ func (am *AuthMiddleware) AdminAccess(f func(http.ResponseWriter, *http.Request)
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
ctx := context.WithValue(r.Context(), constants.ContextUserKey, user)
|
||||
r = r.WithContext(ctx)
|
||||
f(w, r)
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package dashboards
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@ -14,6 +15,7 @@ import (
|
||||
"github.com/gosimple/slug"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
"go.uber.org/zap"
|
||||
@ -95,6 +97,37 @@ func InitDB(dataSourceName string) (*sqlx.DB, error) {
|
||||
return nil, fmt.Errorf("error in creating ttl_status table: %s", err.Error())
|
||||
}
|
||||
|
||||
// sqlite does not support "IF NOT EXISTS"
|
||||
createdAt := `ALTER TABLE rules ADD COLUMN created_at datetime;`
|
||||
_, err = db.Exec(createdAt)
|
||||
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
|
||||
return nil, fmt.Errorf("error in adding column created_at to rules table: %s", err.Error())
|
||||
}
|
||||
|
||||
createdBy := `ALTER TABLE rules ADD COLUMN created_by TEXT;`
|
||||
_, err = db.Exec(createdBy)
|
||||
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
|
||||
return nil, fmt.Errorf("error in adding column created_by to rules table: %s", err.Error())
|
||||
}
|
||||
|
||||
updatedBy := `ALTER TABLE rules ADD COLUMN updated_by TEXT;`
|
||||
_, err = db.Exec(updatedBy)
|
||||
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
|
||||
return nil, fmt.Errorf("error in adding column updated_by to rules table: %s", err.Error())
|
||||
}
|
||||
|
||||
createdBy = `ALTER TABLE dashboards ADD COLUMN created_by TEXT;`
|
||||
_, err = db.Exec(createdBy)
|
||||
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
|
||||
return nil, fmt.Errorf("error in adding column created_by to dashboards table: %s", err.Error())
|
||||
}
|
||||
|
||||
updatedBy = `ALTER TABLE dashboards ADD COLUMN updated_by TEXT;`
|
||||
_, err = db.Exec(updatedBy)
|
||||
if err != nil && !strings.Contains(err.Error(), "duplicate column name") {
|
||||
return nil, fmt.Errorf("error in adding column updated_by to dashboards table: %s", err.Error())
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
@ -103,7 +136,9 @@ type Dashboard struct {
|
||||
Uuid string `json:"uuid" db:"uuid"`
|
||||
Slug string `json:"-" db:"-"`
|
||||
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
||||
CreateBy *string `json:"created_by" db:"created_by"`
|
||||
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
|
||||
UpdateBy *string `json:"updated_by" db:"updated_by"`
|
||||
Title string `json:"-" db:"-"`
|
||||
Data Data `json:"data" db:"data"`
|
||||
}
|
||||
@ -132,16 +167,22 @@ func (c *Data) Scan(src interface{}) error {
|
||||
}
|
||||
|
||||
// CreateDashboard creates a new dashboard
|
||||
func CreateDashboard(data map[string]interface{}, fm interfaces.FeatureLookup) (*Dashboard, *model.ApiError) {
|
||||
func CreateDashboard(ctx context.Context, data map[string]interface{}, fm interfaces.FeatureLookup) (*Dashboard, *model.ApiError) {
|
||||
dash := &Dashboard{
|
||||
Data: data,
|
||||
}
|
||||
var userEmail string
|
||||
if user := common.GetUserFromContext(ctx); user != nil {
|
||||
userEmail = user.Email
|
||||
}
|
||||
dash.CreatedAt = time.Now()
|
||||
dash.CreateBy = &userEmail
|
||||
dash.UpdatedAt = time.Now()
|
||||
dash.UpdateBy = &userEmail
|
||||
dash.UpdateSlug()
|
||||
dash.Uuid = uuid.New().String()
|
||||
|
||||
map_data, err := json.Marshal(dash.Data)
|
||||
mapData, err := json.Marshal(dash.Data)
|
||||
if err != nil {
|
||||
zap.S().Errorf("Error in marshalling data field in dashboard: ", dash, err)
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
||||
@ -155,8 +196,8 @@ func CreateDashboard(data map[string]interface{}, fm interfaces.FeatureLookup) (
|
||||
}
|
||||
}
|
||||
|
||||
// db.Prepare("Insert into dashboards where")
|
||||
result, err := db.Exec("INSERT INTO dashboards (uuid, created_at, updated_at, data) VALUES ($1, $2, $3, $4)", dash.Uuid, dash.CreatedAt, dash.UpdatedAt, map_data)
|
||||
result, err := db.Exec("INSERT INTO dashboards (uuid, created_at, created_by, updated_at, updated_by, data) VALUES ($1, $2, $3, $4, $5, $6)",
|
||||
dash.Uuid, dash.CreatedAt, userEmail, dash.UpdatedAt, userEmail, mapData)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Errorf("Error in inserting dashboard data: ", dash, err)
|
||||
@ -177,7 +218,7 @@ func CreateDashboard(data map[string]interface{}, fm interfaces.FeatureLookup) (
|
||||
return dash, nil
|
||||
}
|
||||
|
||||
func GetDashboards() ([]Dashboard, *model.ApiError) {
|
||||
func GetDashboards(ctx context.Context) ([]Dashboard, *model.ApiError) {
|
||||
|
||||
dashboards := []Dashboard{}
|
||||
query := `SELECT * FROM dashboards`
|
||||
@ -190,9 +231,9 @@ func GetDashboards() ([]Dashboard, *model.ApiError) {
|
||||
return dashboards, nil
|
||||
}
|
||||
|
||||
func DeleteDashboard(uuid string, fm interfaces.FeatureLookup) *model.ApiError {
|
||||
func DeleteDashboard(ctx context.Context, uuid string, fm interfaces.FeatureLookup) *model.ApiError {
|
||||
|
||||
dashboard, dErr := GetDashboard(uuid)
|
||||
dashboard, dErr := GetDashboard(ctx, uuid)
|
||||
if dErr != nil {
|
||||
zap.S().Errorf("Error in getting dashboard: ", uuid, dErr)
|
||||
return dErr
|
||||
@ -222,7 +263,7 @@ func DeleteDashboard(uuid string, fm interfaces.FeatureLookup) *model.ApiError {
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetDashboard(uuid string) (*Dashboard, *model.ApiError) {
|
||||
func GetDashboard(ctx context.Context, uuid string) (*Dashboard, *model.ApiError) {
|
||||
|
||||
dashboard := Dashboard{}
|
||||
query := `SELECT * FROM dashboards WHERE uuid=?`
|
||||
@ -235,15 +276,15 @@ func GetDashboard(uuid string) (*Dashboard, *model.ApiError) {
|
||||
return &dashboard, nil
|
||||
}
|
||||
|
||||
func UpdateDashboard(uuid string, data map[string]interface{}, fm interfaces.FeatureLookup) (*Dashboard, *model.ApiError) {
|
||||
func UpdateDashboard(ctx context.Context, uuid string, data map[string]interface{}, fm interfaces.FeatureLookup) (*Dashboard, *model.ApiError) {
|
||||
|
||||
map_data, err := json.Marshal(data)
|
||||
mapData, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
zap.S().Errorf("Error in marshalling data field in dashboard: ", data, err)
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
}
|
||||
|
||||
dashboard, apiErr := GetDashboard(uuid)
|
||||
dashboard, apiErr := GetDashboard(ctx, uuid)
|
||||
if apiErr != nil {
|
||||
return nil, apiErr
|
||||
}
|
||||
@ -265,10 +306,15 @@ func UpdateDashboard(uuid string, data map[string]interface{}, fm interfaces.Fea
|
||||
}
|
||||
|
||||
dashboard.UpdatedAt = time.Now()
|
||||
var userEmail string
|
||||
if user := common.GetUserFromContext(ctx); user != nil {
|
||||
userEmail = user.Email
|
||||
}
|
||||
dashboard.UpdateBy = &userEmail
|
||||
dashboard.Data = data
|
||||
|
||||
// db.Prepare("Insert into dashboards where")
|
||||
_, err = db.Exec("UPDATE dashboards SET updated_at=$1, data=$2 WHERE uuid=$3 ", dashboard.UpdatedAt, map_data, dashboard.Uuid)
|
||||
_, err = db.Exec("UPDATE dashboards SET updated_at=$1, updated_by=$2, data=$3 WHERE uuid=$4;",
|
||||
dashboard.UpdatedAt, userEmail, mapData, dashboard.Uuid)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Errorf("Error in inserting dashboard data: ", data, err)
|
||||
|
@ -1,6 +1,7 @@
|
||||
package dashboards
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
@ -38,13 +39,13 @@ func readCurrentDir(dir string, fm interfaces.FeatureLookup) error {
|
||||
continue
|
||||
}
|
||||
|
||||
_, apiErr := GetDashboard(data["uuid"].(string))
|
||||
_, apiErr := GetDashboard(context.Background(), data["uuid"].(string))
|
||||
if apiErr == nil {
|
||||
zap.S().Infof("Creating Dashboards: Error in file: %s\t%s", filename, "Dashboard already present in database")
|
||||
continue
|
||||
}
|
||||
|
||||
_, apiErr = CreateDashboard(data, fm)
|
||||
_, apiErr = CreateDashboard(context.Background(), data, fm)
|
||||
if apiErr != nil {
|
||||
zap.S().Errorf("Creating Dashboards: Error in file: %s\t%s", filename, apiErr.Err)
|
||||
continue
|
||||
|
@ -448,7 +448,7 @@ func Intersection(a, b []int) (c []int) {
|
||||
|
||||
func (aH *APIHandler) getRule(w http.ResponseWriter, r *http.Request) {
|
||||
id := mux.Vars(r)["id"]
|
||||
ruleResponse, err := aH.ruleManager.GetRule(id)
|
||||
ruleResponse, err := aH.ruleManager.GetRule(r.Context(), id)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
@ -765,7 +765,7 @@ func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request
|
||||
|
||||
func (aH *APIHandler) listRules(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
rules, err := aH.ruleManager.ListRuleStates()
|
||||
rules, err := aH.ruleManager.ListRuleStates(r.Context())
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
@ -778,7 +778,7 @@ func (aH *APIHandler) listRules(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
func (aH *APIHandler) getDashboards(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
allDashboards, err := dashboards.GetDashboards()
|
||||
allDashboards, err := dashboards.GetDashboards(r.Context())
|
||||
|
||||
if err != nil {
|
||||
RespondError(w, err, nil)
|
||||
@ -829,7 +829,7 @@ func (aH *APIHandler) getDashboards(w http.ResponseWriter, r *http.Request) {
|
||||
func (aH *APIHandler) deleteDashboard(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
uuid := mux.Vars(r)["uuid"]
|
||||
err := dashboards.DeleteDashboard(uuid, aH.featureFlags)
|
||||
err := dashboards.DeleteDashboard(r.Context(), uuid, aH.featureFlags)
|
||||
|
||||
if err != nil {
|
||||
RespondError(w, err, nil)
|
||||
@ -935,7 +935,7 @@ func (aH *APIHandler) updateDashboard(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
dashboard, apiError := dashboards.UpdateDashboard(uuid, postData, aH.featureFlags)
|
||||
dashboard, apiError := dashboards.UpdateDashboard(r.Context(), uuid, postData, aH.featureFlags)
|
||||
if apiError != nil {
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
@ -949,7 +949,7 @@ func (aH *APIHandler) getDashboard(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
uuid := mux.Vars(r)["uuid"]
|
||||
|
||||
dashboard, apiError := dashboards.GetDashboard(uuid)
|
||||
dashboard, apiError := dashboards.GetDashboard(r.Context(), uuid)
|
||||
|
||||
if apiError != nil {
|
||||
RespondError(w, apiError, nil)
|
||||
@ -960,7 +960,7 @@ func (aH *APIHandler) getDashboard(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) saveAndReturn(w http.ResponseWriter, signozDashboard model.DashboardData) {
|
||||
func (aH *APIHandler) saveAndReturn(w http.ResponseWriter, r *http.Request, signozDashboard model.DashboardData) {
|
||||
toSave := make(map[string]interface{})
|
||||
toSave["title"] = signozDashboard.Title
|
||||
toSave["description"] = signozDashboard.Description
|
||||
@ -969,7 +969,7 @@ func (aH *APIHandler) saveAndReturn(w http.ResponseWriter, signozDashboard model
|
||||
toSave["widgets"] = signozDashboard.Widgets
|
||||
toSave["variables"] = signozDashboard.Variables
|
||||
|
||||
dashboard, apiError := dashboards.CreateDashboard(toSave, aH.featureFlags)
|
||||
dashboard, apiError := dashboards.CreateDashboard(r.Context(), toSave, aH.featureFlags)
|
||||
if apiError != nil {
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
@ -988,7 +988,7 @@ func (aH *APIHandler) createDashboardsTransform(w http.ResponseWriter, r *http.R
|
||||
err = json.Unmarshal(b, &importData)
|
||||
if err == nil {
|
||||
signozDashboard := dashboards.TransformGrafanaJSONToSignoz(importData)
|
||||
aH.saveAndReturn(w, signozDashboard)
|
||||
aH.saveAndReturn(w, r, signozDashboard)
|
||||
return
|
||||
}
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, "Error while creating dashboard from grafana json")
|
||||
@ -1010,7 +1010,7 @@ func (aH *APIHandler) createDashboards(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
dash, apiErr := dashboards.CreateDashboard(postData, aH.featureFlags)
|
||||
dash, apiErr := dashboards.CreateDashboard(r.Context(), postData, aH.featureFlags)
|
||||
|
||||
if apiErr != nil {
|
||||
RespondError(w, apiErr, nil)
|
||||
@ -1051,7 +1051,7 @@ func (aH *APIHandler) deleteRule(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
id := mux.Vars(r)["id"]
|
||||
|
||||
err := aH.ruleManager.DeleteRule(id)
|
||||
err := aH.ruleManager.DeleteRule(r.Context(), id)
|
||||
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
@ -1074,7 +1074,7 @@ func (aH *APIHandler) patchRule(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
gettableRule, err := aH.ruleManager.PatchRule(string(body), id)
|
||||
gettableRule, err := aH.ruleManager.PatchRule(r.Context(), string(body), id)
|
||||
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
@ -1095,7 +1095,7 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
err = aH.ruleManager.EditRule(string(body), id)
|
||||
err = aH.ruleManager.EditRule(r.Context(), string(body), id)
|
||||
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
@ -1248,7 +1248,7 @@ func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
err = aH.ruleManager.CreateRule(string(body))
|
||||
err = aH.ruleManager.CreateRule(r.Context(), string(body))
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||
return
|
||||
|
16
pkg/query-service/common/user.go
Normal file
16
pkg/query-service/common/user.go
Normal file
@ -0,0 +1,16 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
)
|
||||
|
||||
func GetUserFromContext(ctx context.Context) *model.UserPayload {
|
||||
user, ok := ctx.Value(constants.ContextUserKey).(*model.UserPayload)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return user
|
||||
}
|
@ -17,6 +17,10 @@ const (
|
||||
OpAmpWsEndpoint = "0.0.0.0:4320" // address for opamp websocket
|
||||
)
|
||||
|
||||
type ContextKey string
|
||||
|
||||
const ContextUserKey ContextKey = "user"
|
||||
|
||||
var ConfigSignozIo = "https://config.signoz.io/api/v1"
|
||||
|
||||
var DEFAULT_TELEMETRY_ANONYMOUS = false
|
||||
|
@ -241,4 +241,8 @@ type GettableRule struct {
|
||||
Id string `json:"id"`
|
||||
State string `json:"state"`
|
||||
PostableRule
|
||||
CreatedAt *time.Time `json:"createAt"`
|
||||
CreatedBy *string `json:"createBy"`
|
||||
UpdatedAt *time.Time `json:"updateAt"`
|
||||
UpdatedBy *string `json:"updateBy"`
|
||||
}
|
||||
|
@ -1,35 +1,41 @@
|
||||
package rules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"go.uber.org/zap"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Data store to capture user alert rule settings
|
||||
type RuleDB interface {
|
||||
// CreateRuleTx stores rule in the db and returns tx and group name (on success)
|
||||
CreateRuleTx(rule string) (string, Tx, error)
|
||||
CreateRuleTx(ctx context.Context, rule string) (string, Tx, error)
|
||||
|
||||
// EditRuleTx updates the given rule in the db and returns tx and group name (on success)
|
||||
EditRuleTx(rule string, id string) (string, Tx, error)
|
||||
EditRuleTx(ctx context.Context, rule string, id string) (string, Tx, error)
|
||||
|
||||
// DeleteRuleTx deletes the given rule in the db and returns tx and group name (on success)
|
||||
DeleteRuleTx(id string) (string, Tx, error)
|
||||
DeleteRuleTx(ctx context.Context, id string) (string, Tx, error)
|
||||
|
||||
// GetStoredRules fetches the rule definitions from db
|
||||
GetStoredRules() ([]StoredRule, error)
|
||||
GetStoredRules(ctx context.Context) ([]StoredRule, error)
|
||||
|
||||
// GetStoredRule for a given ID from DB
|
||||
GetStoredRule(id string) (*StoredRule, error)
|
||||
GetStoredRule(ctx context.Context, id string) (*StoredRule, error)
|
||||
}
|
||||
|
||||
type StoredRule struct {
|
||||
Id int `json:"id" db:"id"`
|
||||
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
|
||||
Data string `json:"data" db:"data"`
|
||||
Id int `json:"id" db:"id"`
|
||||
CreatedAt *time.Time `json:"created_at" db:"created_at"`
|
||||
CreatedBy *string `json:"created_by" db:"created_by"`
|
||||
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
|
||||
UpdatedBy *string `json:"updated_by" db:"updated_by"`
|
||||
Data string `json:"data" db:"data"`
|
||||
}
|
||||
|
||||
type Tx interface {
|
||||
@ -51,17 +57,23 @@ func newRuleDB(db *sqlx.DB) RuleDB {
|
||||
|
||||
// CreateRuleTx stores a given rule in db and returns task name,
|
||||
// sql tx and error (if any)
|
||||
func (r *ruleDB) CreateRuleTx(rule string) (string, Tx, error) {
|
||||
func (r *ruleDB) CreateRuleTx(ctx context.Context, rule string) (string, Tx, error) {
|
||||
|
||||
var groupName string
|
||||
var lastInsertId int64
|
||||
|
||||
var userEmail string
|
||||
if user := common.GetUserFromContext(ctx); user != nil {
|
||||
userEmail = user.Email
|
||||
}
|
||||
createdAt := time.Now()
|
||||
updatedAt := time.Now()
|
||||
tx, err := r.Begin()
|
||||
if err != nil {
|
||||
return groupName, nil, err
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`INSERT into rules (updated_at, data) VALUES($1,$2);`)
|
||||
stmt, err := tx.Prepare(`INSERT into rules (created_at, created_by, updated_at, updated_by, data) VALUES($1,$2,$3,$4,$5);`)
|
||||
if err != nil {
|
||||
zap.S().Errorf("Error in preparing statement for INSERT to rules\n", err)
|
||||
tx.Rollback()
|
||||
@ -70,7 +82,7 @@ func (r *ruleDB) CreateRuleTx(rule string) (string, Tx, error) {
|
||||
|
||||
defer stmt.Close()
|
||||
|
||||
result, err := stmt.Exec(time.Now(), rule)
|
||||
result, err := stmt.Exec(createdAt, userEmail, updatedAt, userEmail, rule)
|
||||
if err != nil {
|
||||
zap.S().Errorf("Error in Executing prepared statement for INSERT to rules\n", err)
|
||||
tx.Rollback() // return an error too, we may want to wrap them
|
||||
@ -87,7 +99,7 @@ func (r *ruleDB) CreateRuleTx(rule string) (string, Tx, error) {
|
||||
|
||||
// EditRuleTx stores a given rule string in database and returns
|
||||
// task name, sql tx and error (if any)
|
||||
func (r *ruleDB) EditRuleTx(rule string, id string) (string, Tx, error) {
|
||||
func (r *ruleDB) EditRuleTx(ctx context.Context, rule string, id string) (string, Tx, error) {
|
||||
|
||||
var groupName string
|
||||
idInt, _ := strconv.Atoi(id)
|
||||
@ -95,6 +107,11 @@ func (r *ruleDB) EditRuleTx(rule string, id string) (string, Tx, error) {
|
||||
return groupName, nil, fmt.Errorf("failed to read alert id from parameters")
|
||||
}
|
||||
|
||||
var userEmail string
|
||||
if user := common.GetUserFromContext(ctx); user != nil {
|
||||
userEmail = user.Email
|
||||
}
|
||||
updatedAt := time.Now()
|
||||
groupName = prepareTaskName(int64(idInt))
|
||||
|
||||
// todo(amol): resolve this error - database locked when using
|
||||
@ -103,7 +120,7 @@ func (r *ruleDB) EditRuleTx(rule string, id string) (string, Tx, error) {
|
||||
//if err != nil {
|
||||
// return groupName, tx, err
|
||||
//}
|
||||
stmt, err := r.Prepare(`UPDATE rules SET updated_at=$1, data=$2 WHERE id=$3;`)
|
||||
stmt, err := r.Prepare(`UPDATE rules SET updated_by=$1, updated_at=$2, data=$3 WHERE id=$4;`)
|
||||
if err != nil {
|
||||
zap.S().Errorf("Error in preparing statement for UPDATE to rules\n", err)
|
||||
// tx.Rollback()
|
||||
@ -111,7 +128,7 @@ func (r *ruleDB) EditRuleTx(rule string, id string) (string, Tx, error) {
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
if _, err := stmt.Exec(time.Now(), rule, idInt); err != nil {
|
||||
if _, err := stmt.Exec(userEmail, updatedAt, rule, idInt); err != nil {
|
||||
zap.S().Errorf("Error in Executing prepared statement for UPDATE to rules\n", err)
|
||||
// tx.Rollback() // return an error too, we may want to wrap them
|
||||
return groupName, nil, err
|
||||
@ -121,7 +138,7 @@ func (r *ruleDB) EditRuleTx(rule string, id string) (string, Tx, error) {
|
||||
|
||||
// DeleteRuleTx deletes a given rule with id and returns
|
||||
// taskname, sql tx and error (if any)
|
||||
func (r *ruleDB) DeleteRuleTx(id string) (string, Tx, error) {
|
||||
func (r *ruleDB) DeleteRuleTx(ctx context.Context, id string) (string, Tx, error) {
|
||||
|
||||
idInt, _ := strconv.Atoi(id)
|
||||
groupName := prepareTaskName(int64(idInt))
|
||||
@ -149,11 +166,11 @@ func (r *ruleDB) DeleteRuleTx(id string) (string, Tx, error) {
|
||||
return groupName, nil, nil
|
||||
}
|
||||
|
||||
func (r *ruleDB) GetStoredRules() ([]StoredRule, error) {
|
||||
func (r *ruleDB) GetStoredRules(ctx context.Context) ([]StoredRule, error) {
|
||||
|
||||
rules := []StoredRule{}
|
||||
|
||||
query := fmt.Sprintf("SELECT id, updated_at, data FROM rules")
|
||||
query := "SELECT id, created_at, created_by, updated_at, updated_by, data FROM rules"
|
||||
|
||||
err := r.Select(&rules, query)
|
||||
|
||||
@ -165,7 +182,7 @@ func (r *ruleDB) GetStoredRules() ([]StoredRule, error) {
|
||||
return rules, nil
|
||||
}
|
||||
|
||||
func (r *ruleDB) GetStoredRule(id string) (*StoredRule, error) {
|
||||
func (r *ruleDB) GetStoredRule(ctx context.Context, id string) (*StoredRule, error) {
|
||||
intId, err := strconv.Atoi(id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid id parameter")
|
||||
@ -173,7 +190,7 @@ func (r *ruleDB) GetStoredRule(id string) (*StoredRule, error) {
|
||||
|
||||
rule := &StoredRule{}
|
||||
|
||||
query := fmt.Sprintf("SELECT id, updated_at, data FROM rules WHERE id=%d", intId)
|
||||
query := fmt.Sprintf("SELECT id, created_at, created_by, updated_at, updated_by, data FROM rules WHERE id=%d", intId)
|
||||
err = r.Get(rule, query)
|
||||
|
||||
// zap.S().Info(query)
|
||||
|
@ -16,8 +16,9 @@ import (
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"errors"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
// opentracing "github.com/opentracing/opentracing-go"
|
||||
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
|
||||
@ -27,8 +28,6 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/utils/labels"
|
||||
)
|
||||
|
||||
// namespace for prom metrics
|
||||
const namespace = "signoz"
|
||||
const taskNamesuffix = "webAppEditor"
|
||||
|
||||
func ruleIdFromTaskName(n string) string {
|
||||
@ -77,8 +76,6 @@ type Manager struct {
|
||||
// datastore to store alert definitions
|
||||
ruleDB RuleDB
|
||||
|
||||
// pause all rule tasks
|
||||
pause bool
|
||||
logger log.Logger
|
||||
|
||||
featureFlags interfaces.FeatureLookup
|
||||
@ -142,7 +139,7 @@ func (m *Manager) Pause(b bool) {
|
||||
}
|
||||
|
||||
func (m *Manager) initiate() error {
|
||||
storedRules, err := m.ruleDB.GetStoredRules()
|
||||
storedRules, err := m.ruleDB.GetStoredRules(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -172,7 +169,7 @@ func (m *Manager) initiate() error {
|
||||
zap.S().Info("msg:", "migrating rule from JSON to yaml", "\t rule:", rec.Data, "\t parsed rule:", parsedRule)
|
||||
ruleJSON, err := json.Marshal(parsedRule)
|
||||
if err == nil {
|
||||
taskName, _, err := m.ruleDB.EditRuleTx(string(ruleJSON), fmt.Sprintf("%d", rec.Id))
|
||||
taskName, _, err := m.ruleDB.EditRuleTx(context.Background(), string(ruleJSON), fmt.Sprintf("%d", rec.Id))
|
||||
if err != nil {
|
||||
zap.S().Errorf("msg: failed to migrate rule ", "/t error:", err)
|
||||
} else {
|
||||
@ -195,6 +192,10 @@ func (m *Manager) initiate() error {
|
||||
}
|
||||
}
|
||||
|
||||
if len(loadErrors) > 0 {
|
||||
return errors.Join(loadErrors...)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -223,11 +224,11 @@ func (m *Manager) Stop() {
|
||||
|
||||
// EditRuleDefinition writes the rule definition to the
|
||||
// datastore and also updates the rule executor
|
||||
func (m *Manager) EditRule(ruleStr string, id string) error {
|
||||
func (m *Manager) EditRule(ctx context.Context, ruleStr string, id string) error {
|
||||
|
||||
parsedRule, errs := ParsePostableRule([]byte(ruleStr))
|
||||
|
||||
currentRule, err := m.GetRule(id)
|
||||
currentRule, err := m.GetRule(ctx, id)
|
||||
if err != nil {
|
||||
zap.S().Errorf("msg: ", "failed to get the rule from rule db", "\t ruleid: ", id)
|
||||
return err
|
||||
@ -247,7 +248,7 @@ func (m *Manager) EditRule(ruleStr string, id string) error {
|
||||
return errs[0]
|
||||
}
|
||||
|
||||
taskName, _, err := m.ruleDB.EditRuleTx(ruleStr, id)
|
||||
taskName, _, err := m.ruleDB.EditRuleTx(ctx, ruleStr, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -314,7 +315,7 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) DeleteRule(id string) error {
|
||||
func (m *Manager) DeleteRule(ctx context.Context, id string) error {
|
||||
|
||||
idInt, err := strconv.Atoi(id)
|
||||
if err != nil {
|
||||
@ -323,7 +324,7 @@ func (m *Manager) DeleteRule(id string) error {
|
||||
}
|
||||
|
||||
// update feature usage
|
||||
rule, err := m.GetRule(id)
|
||||
rule, err := m.GetRule(ctx, id)
|
||||
if err != nil {
|
||||
zap.S().Errorf("msg: ", "failed to get the rule from rule db", "\t ruleid: ", id)
|
||||
return err
|
||||
@ -334,7 +335,7 @@ func (m *Manager) DeleteRule(id string) error {
|
||||
m.deleteTask(taskName)
|
||||
}
|
||||
|
||||
if _, _, err := m.ruleDB.DeleteRuleTx(id); err != nil {
|
||||
if _, _, err := m.ruleDB.DeleteRuleTx(ctx, id); err != nil {
|
||||
zap.S().Errorf("msg: ", "failed to delete the rule from rule db", "\t ruleid: ", id)
|
||||
return err
|
||||
}
|
||||
@ -365,7 +366,7 @@ func (m *Manager) deleteTask(taskName string) {
|
||||
|
||||
// CreateRule stores rule def into db and also
|
||||
// starts an executor for the rule
|
||||
func (m *Manager) CreateRule(ruleStr string) error {
|
||||
func (m *Manager) CreateRule(ctx context.Context, ruleStr string) error {
|
||||
parsedRule, errs := ParsePostableRule([]byte(ruleStr))
|
||||
|
||||
// check if the rule uses any feature that is not enabled
|
||||
@ -380,7 +381,7 @@ func (m *Manager) CreateRule(ruleStr string) error {
|
||||
return errs[0]
|
||||
}
|
||||
|
||||
taskName, tx, err := m.ruleDB.CreateRuleTx(ruleStr)
|
||||
taskName, tx, err := m.ruleDB.CreateRuleTx(ctx, ruleStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -665,10 +666,10 @@ func (m *Manager) ListActiveRules() ([]Rule, error) {
|
||||
return ruleList, nil
|
||||
}
|
||||
|
||||
func (m *Manager) ListRuleStates() (*GettableRules, error) {
|
||||
func (m *Manager) ListRuleStates(ctx context.Context) (*GettableRules, error) {
|
||||
|
||||
// fetch rules from DB
|
||||
storedRules, err := m.ruleDB.GetStoredRules()
|
||||
storedRules, err := m.ruleDB.GetStoredRules(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -693,14 +694,18 @@ func (m *Manager) ListRuleStates() (*GettableRules, error) {
|
||||
} else {
|
||||
ruleResponse.State = rm.State().String()
|
||||
}
|
||||
ruleResponse.CreatedAt = s.CreatedAt
|
||||
ruleResponse.CreatedBy = s.CreatedBy
|
||||
ruleResponse.UpdatedAt = s.UpdatedAt
|
||||
ruleResponse.UpdatedBy = s.UpdatedBy
|
||||
resp = append(resp, ruleResponse)
|
||||
}
|
||||
|
||||
return &GettableRules{Rules: resp}, nil
|
||||
}
|
||||
|
||||
func (m *Manager) GetRule(id string) (*GettableRule, error) {
|
||||
s, err := m.ruleDB.GetStoredRule(id)
|
||||
func (m *Manager) GetRule(ctx context.Context, id string) (*GettableRule, error) {
|
||||
s, err := m.ruleDB.GetStoredRule(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -746,7 +751,7 @@ func (m *Manager) syncRuleStateWithTask(taskName string, rule *PostableRule) err
|
||||
// - over write the patch attributes received in input (ruleStr)
|
||||
// - re-deploy or undeploy task as necessary
|
||||
// - update the patched rule in the DB
|
||||
func (m *Manager) PatchRule(ruleStr string, ruleId string) (*GettableRule, error) {
|
||||
func (m *Manager) PatchRule(ctx context.Context, ruleStr string, ruleId string) (*GettableRule, error) {
|
||||
|
||||
if ruleId == "" {
|
||||
return nil, fmt.Errorf("id is mandatory for patching rule")
|
||||
@ -755,7 +760,7 @@ func (m *Manager) PatchRule(ruleStr string, ruleId string) (*GettableRule, error
|
||||
taskName := prepareTaskName(ruleId)
|
||||
|
||||
// retrieve rule from DB
|
||||
storedJSON, err := m.ruleDB.GetStoredRule(ruleId)
|
||||
storedJSON, err := m.ruleDB.GetStoredRule(ctx, ruleId)
|
||||
if err != nil {
|
||||
zap.S().Errorf("msg:", "failed to get stored rule with given id", "\t error:", err)
|
||||
return nil, err
|
||||
@ -789,7 +794,7 @@ func (m *Manager) PatchRule(ruleStr string, ruleId string) (*GettableRule, error
|
||||
}
|
||||
|
||||
// write updated rule to db
|
||||
if _, _, err = m.ruleDB.EditRuleTx(string(patchedRuleBytes), ruleId); err != nil {
|
||||
if _, _, err = m.ruleDB.EditRuleTx(ctx, string(patchedRuleBytes), ruleId); err != nil {
|
||||
// write failed, rollback task state
|
||||
|
||||
// restore task state from the stored rule
|
||||
|
Loading…
x
Reference in New Issue
Block a user