mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-01 21:40:37 +08:00
* fix: 🐛 convert TTL APIs to async * chore: add archive support * chore: update TTL async APIs according to new design * chore: 🔥 clean removeTTL API * fix: metrics s3 config * test: ✅ update tests for async TTL api * chore: refractoring Co-authored-by: Pranay Prateek <pranay@signoz.io>
This commit is contained in:
parent
eb9a8e3a97
commit
5d080f5564
@ -23,6 +23,7 @@ import (
|
||||
|
||||
"github.com/go-kit/log"
|
||||
"github.com/go-kit/log/level"
|
||||
"github.com/google/uuid"
|
||||
"github.com/oklog/oklog/pkg/group"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@ -2266,48 +2267,135 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query
|
||||
return &GetFilteredSpansAggregatesResponse, nil
|
||||
}
|
||||
|
||||
// SetTTL sets the TTL for traces or metrics tables.
|
||||
// This is an async API which creates goroutines to set TTL.
|
||||
// Status of TTL update is tracked with ttl_status table in sqlite db.
|
||||
func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
||||
params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
|
||||
// Keep only latest 100 transactions/requests
|
||||
r.deleteTtlTransactions(ctx, 100)
|
||||
var req, tableName string
|
||||
// uuid is used as transaction id
|
||||
uuidWithHyphen := uuid.New()
|
||||
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
|
||||
|
||||
coldStorageDuration := -1
|
||||
if len(params.ColdStorageVolume) > 0 {
|
||||
coldStorageDuration = int(params.ToColdStorageDuration)
|
||||
}
|
||||
|
||||
switch params.Type {
|
||||
case constants.TraceTTL:
|
||||
tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable}
|
||||
for _, tableName := range tableNameArray {
|
||||
req = fmt.Sprintf(
|
||||
"ALTER TABLE %v MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE",
|
||||
tableName, params.DelDuration)
|
||||
if len(params.ColdStorageVolume) > 0 {
|
||||
req += fmt.Sprintf(", toDateTime(timestamp) + INTERVAL %v SECOND TO VOLUME '%s'",
|
||||
params.ToColdStorageDuration, params.ColdStorageVolume)
|
||||
}
|
||||
err := r.setColdStorage(ctx, tableName, params.ColdStorageVolume)
|
||||
for _, tableName = range tableNameArray {
|
||||
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
|
||||
}
|
||||
|
||||
zap.S().Debugf("Executing TTL request: %s\n", req)
|
||||
if err := r.db.Exec(ctx, req); err != nil {
|
||||
zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. Err=%v", err)}
|
||||
if statusItem.Status == constants.StatusPending {
|
||||
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
|
||||
}
|
||||
}
|
||||
for _, tableName := range tableNameArray {
|
||||
// TODO: DB queries should be implemented with transactional statements but currently clickhouse doesn't support them. Issue: https://github.com/ClickHouse/ClickHouse/issues/22086
|
||||
go func(tableName string) {
|
||||
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration)
|
||||
if dbErr != nil {
|
||||
zap.S().Error(fmt.Errorf("Error in inserting to ttl_status table: %s", dbErr.Error()))
|
||||
return
|
||||
}
|
||||
req = fmt.Sprintf(
|
||||
"ALTER TABLE %v MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE",
|
||||
tableName, params.DelDuration)
|
||||
if len(params.ColdStorageVolume) > 0 {
|
||||
req += fmt.Sprintf(", toDateTime(timestamp) + INTERVAL %v SECOND TO VOLUME '%s'",
|
||||
params.ToColdStorageDuration, params.ColdStorageVolume)
|
||||
}
|
||||
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
|
||||
if err != nil {
|
||||
zap.S().Error(fmt.Errorf("Error in setting cold storage: %s", err.Err.Error()))
|
||||
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||
if err == nil {
|
||||
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
|
||||
if dbErr != nil {
|
||||
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
zap.S().Debugf("Executing TTL request: %s\n", req)
|
||||
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
|
||||
if err := r.db.Exec(context.Background(), req); err != nil {
|
||||
zap.S().Error(fmt.Errorf("Error in executing set TTL query: %s", err.Error()))
|
||||
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
|
||||
if dbErr != nil {
|
||||
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id)
|
||||
if dbErr != nil {
|
||||
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
|
||||
return
|
||||
}
|
||||
}(tableName)
|
||||
}
|
||||
|
||||
case constants.MetricsTTL:
|
||||
tableName = signozMetricDBName + "." + signozSampleName
|
||||
req = fmt.Sprintf(
|
||||
"ALTER TABLE %v MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+
|
||||
"INTERVAL %v SECOND DELETE", tableName, params.DelDuration)
|
||||
if len(params.ColdStorageVolume) > 0 {
|
||||
req += fmt.Sprintf(", toDateTime(toUInt32(timestamp_ms / 1000), 'UTC')"+
|
||||
" + INTERVAL %v SECOND TO VOLUME '%s'",
|
||||
params.ToColdStorageDuration, params.ColdStorageVolume)
|
||||
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
|
||||
}
|
||||
zap.S().Debugf("Executing TTL request: %s\n", req)
|
||||
if err := r.db.Exec(ctx, req); err != nil {
|
||||
zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. Err=%v", err)}
|
||||
if statusItem.Status == constants.StatusPending {
|
||||
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
|
||||
}
|
||||
go func(tableName string) {
|
||||
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration)
|
||||
if dbErr != nil {
|
||||
zap.S().Error(fmt.Errorf("Error in inserting to ttl_status table: %s", dbErr.Error()))
|
||||
return
|
||||
}
|
||||
req = fmt.Sprintf(
|
||||
"ALTER TABLE %v MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+
|
||||
"INTERVAL %v SECOND DELETE", tableName, params.DelDuration)
|
||||
if len(params.ColdStorageVolume) > 0 {
|
||||
req += fmt.Sprintf(", toDateTime(toUInt32(timestamp_ms / 1000), 'UTC')"+
|
||||
" + INTERVAL %v SECOND TO VOLUME '%s'",
|
||||
params.ToColdStorageDuration, params.ColdStorageVolume)
|
||||
}
|
||||
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
|
||||
if err != nil {
|
||||
zap.S().Error(fmt.Errorf("Error in setting cold storage: %s", err.Err.Error()))
|
||||
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||
if err == nil {
|
||||
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
|
||||
if dbErr != nil {
|
||||
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
zap.S().Debugf("Executing TTL request: %s\n", req)
|
||||
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
|
||||
if err := r.db.Exec(ctx, req); err != nil {
|
||||
zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err))
|
||||
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
|
||||
if dbErr != nil {
|
||||
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id)
|
||||
if dbErr != nil {
|
||||
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
|
||||
return
|
||||
}
|
||||
}(tableName)
|
||||
|
||||
default:
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v",
|
||||
@ -2317,6 +2405,61 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
||||
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, numberOfTransactionsStore int) {
|
||||
_, err := r.localDB.Exec("DELETE FROM ttl_status WHERE transaction_id NOT IN (SELECT distinct transaction_id FROM ttl_status ORDER BY created_at DESC LIMIT ?)", numberOfTransactionsStore)
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing ttl_status delete sql query: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
// checkTTLStatusItem checks if ttl_status table has an entry for the given table name
|
||||
func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, tableName string) (model.TTLStatusItem, *model.ApiError) {
|
||||
statusItem := []model.TTLStatusItem{}
|
||||
|
||||
query := fmt.Sprintf("SELECT id, status, ttl, cold_storage_ttl FROM ttl_status WHERE table_name = '%s' ORDER BY created_at DESC", tableName)
|
||||
|
||||
err := r.localDB.Select(&statusItem, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if len(statusItem) == 0 {
|
||||
return model.TTLStatusItem{}, nil
|
||||
}
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return model.TTLStatusItem{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
|
||||
}
|
||||
return statusItem[0], nil
|
||||
}
|
||||
|
||||
// setTTLQueryStatus fetches ttl_status table status from DB
|
||||
func (r *ClickHouseReader) setTTLQueryStatus(ctx context.Context, tableNameArray []string) (string, *model.ApiError) {
|
||||
failFlag := false
|
||||
status := constants.StatusSuccess
|
||||
for _, tableName := range tableNameArray {
|
||||
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||
emptyStatusStruct := model.TTLStatusItem{}
|
||||
if statusItem == emptyStatusStruct {
|
||||
return "", nil
|
||||
}
|
||||
if err != nil {
|
||||
return "", &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
|
||||
}
|
||||
if statusItem.Status == constants.StatusPending && statusItem.UpdatedAt.Unix()-time.Now().Unix() < 3600 {
|
||||
status = constants.StatusPending
|
||||
return status, nil
|
||||
}
|
||||
if statusItem.Status == constants.StatusFailed {
|
||||
failFlag = true
|
||||
}
|
||||
}
|
||||
if failFlag {
|
||||
status = constants.StatusFailed
|
||||
}
|
||||
|
||||
return status, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) setColdStorage(ctx context.Context, tableName string, coldStorageVolume string) *model.ApiError {
|
||||
|
||||
// Set the storage policy for the required table. If it is already set, then setting it again
|
||||
@ -2333,40 +2476,6 @@ func (r *ClickHouseReader) setColdStorage(ctx context.Context, tableName string,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) RemoveTTL(ctx context.Context,
|
||||
params *model.RemoveTTLParams) (*model.RemoveTTLResponseItem, *model.ApiError) {
|
||||
|
||||
var reqs []string
|
||||
templateQuery := `ALTER TABLE %v REMOVE TTL`
|
||||
tracesTables := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable}
|
||||
metricsTables := []string{signozMetricDBName + "." + signozSampleName}
|
||||
|
||||
switch params.Type {
|
||||
case constants.TraceTTL:
|
||||
for _, tableName := range tracesTables {
|
||||
reqs = append(reqs, fmt.Sprintf(templateQuery, tableName))
|
||||
}
|
||||
case constants.MetricsTTL:
|
||||
for _, tableName := range metricsTables {
|
||||
reqs = append(reqs, fmt.Sprintf(templateQuery, tableName))
|
||||
}
|
||||
default:
|
||||
for _, tableName := range append(append([]string{}, tracesTables...), metricsTables...) {
|
||||
reqs = append(reqs, fmt.Sprintf(templateQuery, tableName))
|
||||
}
|
||||
}
|
||||
|
||||
zap.S().Debugf("Executing remove TTL requests: %s\n", reqs)
|
||||
for _, req := range reqs {
|
||||
if err := r.db.Exec(ctx, req); err != nil {
|
||||
zap.S().Error(fmt.Errorf("error while removing ttl. Err=%v", err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec,
|
||||
Err: fmt.Errorf("error while removing ttl. Err=%v", err)}
|
||||
}
|
||||
}
|
||||
return &model.RemoveTTLResponseItem{Message: "ttl has been successfully removed"}, nil
|
||||
}
|
||||
|
||||
// GetDisks returns a list of disks {name, type} configured in clickhouse DB.
|
||||
func (r *ClickHouseReader) GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError) {
|
||||
diskItems := []model.DiskItem{}
|
||||
@ -2382,6 +2491,7 @@ func (r *ClickHouseReader) GetDisks(ctx context.Context) (*[]model.DiskItem, *mo
|
||||
return &diskItems, nil
|
||||
}
|
||||
|
||||
// GetTTL returns current ttl, expected ttl and past setTTL status for metrics/traces.
|
||||
func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
|
||||
|
||||
parseTTL := func(queryResp string) (int, int) {
|
||||
@ -2451,42 +2561,53 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
|
||||
|
||||
switch ttlParams.Type {
|
||||
case constants.TraceTTL:
|
||||
tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable}
|
||||
status, err := r.setTTLQueryStatus(ctx, tableNameArray)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dbResp, err := getTracesTTL()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours
|
||||
if ttlQuery.ColdStorageTtl != -1 {
|
||||
ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours
|
||||
}
|
||||
|
||||
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
|
||||
return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL}, nil
|
||||
return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil
|
||||
|
||||
case constants.MetricsTTL:
|
||||
tableNameArray := []string{signozMetricDBName + "." + signozSampleName}
|
||||
status, err := r.setTTLQueryStatus(ctx, tableNameArray)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dbResp, err := getMetricsTTL()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours
|
||||
if ttlQuery.ColdStorageTtl != -1 {
|
||||
ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours
|
||||
}
|
||||
|
||||
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
|
||||
return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL}, nil
|
||||
|
||||
}
|
||||
db1, err := getTracesTTL()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil
|
||||
default:
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v",
|
||||
ttlParams.Type)}
|
||||
}
|
||||
|
||||
db2, err := getMetricsTTL()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tracesDelTTL, tracesMoveTTL := parseTTL(db1.EngineFull)
|
||||
metricsDelTTL, metricsMoveTTL := parseTTL(db2.EngineFull)
|
||||
|
||||
return &model.GetTTLResponseItem{
|
||||
TracesTime: tracesDelTTL,
|
||||
TracesMoveTime: tracesMoveTTL,
|
||||
MetricsTime: metricsDelTTL,
|
||||
MetricsMoveTime: metricsMoveTTL,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetErrors(ctx context.Context, queryParams *model.GetErrorsParams) (*[]model.Error, *model.ApiError) {
|
||||
|
@ -66,6 +66,22 @@ func InitDB(dataSourceName string) (*sqlx.DB, error) {
|
||||
return nil, fmt.Errorf("Error in creating notification_channles table: %s", err.Error())
|
||||
}
|
||||
|
||||
table_schema = `CREATE TABLE IF NOT EXISTS ttl_status (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
transaction_id TEXT NOT NULL,
|
||||
created_at datetime NOT NULL,
|
||||
updated_at datetime NOT NULL,
|
||||
table_name TEXT NOT NULL,
|
||||
ttl INTEGER DEFAULT 0,
|
||||
cold_storage_ttl INTEGER DEFAULT 0,
|
||||
status TEXT NOT NULL
|
||||
);`
|
||||
|
||||
_, err = db.Exec(table_schema)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error in creating ttl_status table: %s", err.Error())
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
|
@ -310,7 +310,6 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
|
||||
router.HandleFunc("/api/v1/serviceMapDependencies", ViewAccess(aH.serviceMapDependencies)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/settings/ttl", AdminAccess(aH.setTTL)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/settings/ttl", ViewAccess(aH.getTTL)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/settings/ttl", AdminAccess(aH.removeTTL)).Methods(http.MethodDelete)
|
||||
|
||||
router.HandleFunc("/api/v1/version", OpenAccess(aH.getVersion)).Methods(http.MethodGet)
|
||||
|
||||
@ -1139,9 +1138,14 @@ func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Context is not used here as TTL is long duration operation which needs to converted to async
|
||||
// Context is not used here as TTL is long duration DB operation
|
||||
result, apiErr := (*aH.reader).SetTTL(context.Background(), ttlParams)
|
||||
if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
if apiErr != nil {
|
||||
if apiErr.Typ == model.ErrorConflict {
|
||||
aH.handleError(w, apiErr.Err, http.StatusConflict)
|
||||
} else {
|
||||
aH.handleError(w, apiErr.Err, http.StatusInternalServerError)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -1163,47 +1167,6 @@ func (aH *APIHandler) getTTL(w http.ResponseWriter, r *http.Request) {
|
||||
aH.writeJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) removeTTL(w http.ResponseWriter, r *http.Request) {
|
||||
ttlParams, err := parseRemoveTTL(r)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
existingTTL, apiErr := (*aH.reader).GetTTL(context.Background(), &model.GetTTLParams{GetAllTTL: true})
|
||||
if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
|
||||
if ttlParams.Type == constants.TraceTTL && existingTTL.TracesTime == -1 &&
|
||||
aH.handleError(w, fmt.Errorf("traces doesn't have any TTL set, cannot remove"), http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
if ttlParams.Type == constants.MetricsTTL && existingTTL.MetricsTime == -1 &&
|
||||
aH.handleError(w, fmt.Errorf("metrics doesn't have any TTL set, cannot remove"), http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
if ttlParams.RemoveAllTTL {
|
||||
if existingTTL.TracesTime == -1 && existingTTL.MetricsTime != -1 {
|
||||
ttlParams.Type = constants.MetricsTTL
|
||||
ttlParams.RemoveAllTTL = false
|
||||
} else if existingTTL.TracesTime != -1 && existingTTL.MetricsTime == -1 {
|
||||
ttlParams.Type = constants.TraceTTL
|
||||
ttlParams.RemoveAllTTL = false
|
||||
} else if aH.handleError(w, fmt.Errorf("no TTL set, cannot remove"), http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
result, apiErr := (*aH.reader).RemoveTTL(context.Background(), ttlParams)
|
||||
if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
|
||||
aH.writeJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getDisks(w http.ResponseWriter, r *http.Request) {
|
||||
result, apiErr := (*aH.reader).GetDisks(context.Background())
|
||||
if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
|
@ -50,7 +50,6 @@ type Reader interface {
|
||||
|
||||
// Setter Interfaces
|
||||
SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
|
||||
RemoveTTL(ctx context.Context, ttlParams *model.RemoveTTLParams) (*model.RemoveTTLResponseItem, *model.ApiError)
|
||||
|
||||
GetMetricAutocompleteMetricNames(ctx context.Context, matchText string) (*[]string, *model.ApiError)
|
||||
GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError)
|
||||
|
@ -540,7 +540,7 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
|
||||
|
||||
// Validate the type parameter
|
||||
if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL {
|
||||
return nil, fmt.Errorf("type param should be <metrics|traces>, got %v", typeTTL)
|
||||
return nil, fmt.Errorf("type param should be metrics|traces, got %v", typeTTL)
|
||||
}
|
||||
|
||||
// Validate the TTL duration.
|
||||
@ -573,35 +573,17 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
|
||||
func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) {
|
||||
|
||||
typeTTL := r.URL.Query().Get("type")
|
||||
getAllTTL := false
|
||||
|
||||
if len(typeTTL) == 0 {
|
||||
getAllTTL = true
|
||||
return nil, fmt.Errorf("type param cannot be empty from the query")
|
||||
} else {
|
||||
// Validate the type parameter
|
||||
if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL {
|
||||
return nil, fmt.Errorf("type param should be <metrics|traces>, got %v", typeTTL)
|
||||
return nil, fmt.Errorf("type param should be metrics|traces, got %v", typeTTL)
|
||||
}
|
||||
}
|
||||
|
||||
return &model.GetTTLParams{Type: typeTTL, GetAllTTL: getAllTTL}, nil
|
||||
}
|
||||
|
||||
func parseRemoveTTL(r *http.Request) (*model.RemoveTTLParams, error) {
|
||||
|
||||
typeTTL := r.URL.Query().Get("type")
|
||||
removeAllTTL := false
|
||||
|
||||
if len(typeTTL) == 0 {
|
||||
removeAllTTL = true
|
||||
} else {
|
||||
// Validate the type parameter
|
||||
if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL {
|
||||
return nil, fmt.Errorf("type param should be <metrics|traces>, got %v", typeTTL)
|
||||
}
|
||||
}
|
||||
|
||||
return &model.RemoveTTLParams{Type: typeTTL, RemoveAllTTL: removeAllTTL}, nil
|
||||
return &model.GetTTLParams{Type: typeTTL}, nil
|
||||
}
|
||||
|
||||
func parseUserRequest(r *http.Request) (*model.User, error) {
|
||||
|
@ -37,26 +37,29 @@ var AmChannelApiPath = GetOrDefaultEnv("ALERTMANAGER_API_CHANNEL_PATH", "v1/rout
|
||||
var RELATIONAL_DATASOURCE_PATH = GetOrDefaultEnv("SIGNOZ_LOCAL_DB_PATH", "/var/lib/signoz/signoz.db")
|
||||
|
||||
const (
|
||||
ServiceName = "serviceName"
|
||||
HttpRoute = "httpRoute"
|
||||
HttpCode = "httpCode"
|
||||
HttpHost = "httpHost"
|
||||
HttpUrl = "httpUrl"
|
||||
HttpMethod = "httpMethod"
|
||||
Component = "component"
|
||||
OperationDB = "name"
|
||||
OperationRequest = "operation"
|
||||
Status = "status"
|
||||
Duration = "duration"
|
||||
DBName = "dbName"
|
||||
DBOperation = "dbOperation"
|
||||
DBSystem = "dbSystem"
|
||||
MsgSystem = "msgSystem"
|
||||
MsgOperation = "msgOperation"
|
||||
Timestamp = "timestamp"
|
||||
Descending = "descending"
|
||||
Ascending = "ascending"
|
||||
ContextTimeout = 60 // seconds
|
||||
ServiceName = "serviceName"
|
||||
HttpRoute = "httpRoute"
|
||||
HttpCode = "httpCode"
|
||||
HttpHost = "httpHost"
|
||||
HttpUrl = "httpUrl"
|
||||
HttpMethod = "httpMethod"
|
||||
Component = "component"
|
||||
OperationDB = "name"
|
||||
OperationRequest = "operation"
|
||||
Status = "status"
|
||||
Duration = "duration"
|
||||
DBName = "dbName"
|
||||
DBOperation = "dbOperation"
|
||||
DBSystem = "dbSystem"
|
||||
MsgSystem = "msgSystem"
|
||||
MsgOperation = "msgOperation"
|
||||
Timestamp = "timestamp"
|
||||
Descending = "descending"
|
||||
Ascending = "ascending"
|
||||
ContextTimeout = 60 // seconds
|
||||
StatusPending = "pending"
|
||||
StatusFailed = "failed"
|
||||
StatusSuccess = "success"
|
||||
)
|
||||
|
||||
func GetOrDefaultEnv(key string, fallback string) string {
|
||||
|
@ -193,7 +193,6 @@ type TTLParams struct {
|
||||
|
||||
type GetTTLParams struct {
|
||||
Type string
|
||||
GetAllTTL bool
|
||||
}
|
||||
|
||||
type GetErrorsParams struct {
|
||||
@ -206,8 +205,3 @@ type GetErrorParams struct {
|
||||
ErrorID string
|
||||
ServiceName string
|
||||
}
|
||||
|
||||
type RemoveTTLParams struct {
|
||||
Type string
|
||||
RemoveAllTTL bool
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ const (
|
||||
ErrorNotImplemented ErrorType = "not_implemented"
|
||||
ErrorUnauthorized ErrorType = "unauthorized"
|
||||
ErrorForbidden ErrorType = "forbidden"
|
||||
ErrorConflict ErrorType = "conflict"
|
||||
)
|
||||
|
||||
type QueryDataV2 struct {
|
||||
@ -48,6 +49,16 @@ type RuleResponseItem struct {
|
||||
Data string `json:"data" db:"data"`
|
||||
}
|
||||
|
||||
type TTLStatusItem struct {
|
||||
Id int `json:"id" db:"id"`
|
||||
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
|
||||
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
||||
TableName string `json:"table_name" db:"table_name"`
|
||||
TTL int `json:"ttl" db:"ttl"`
|
||||
Status string `json:"status" db:"status"`
|
||||
ColdStorageTtl int `json:"cold_storage_ttl" db:"cold_storage_ttl"`
|
||||
}
|
||||
|
||||
type ChannelItem struct {
|
||||
Id int `json:"id" db:"id"`
|
||||
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
||||
@ -246,10 +257,6 @@ type SetTTLResponseItem struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type RemoveTTLResponseItem struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type DiskItem struct {
|
||||
Name string `json:"name,omitempty" ch:"name"`
|
||||
Type string `json:"type,omitempty" ch:"type"`
|
||||
@ -260,10 +267,15 @@ type DBResponseTTL struct {
|
||||
}
|
||||
|
||||
type GetTTLResponseItem struct {
|
||||
MetricsTime int `json:"metrics_ttl_duration_hrs,omitempty"`
|
||||
MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"`
|
||||
TracesTime int `json:"traces_ttl_duration_hrs,omitempty"`
|
||||
TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"`
|
||||
MetricsTime int `json:"metrics_ttl_duration_hrs,omitempty"`
|
||||
MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"`
|
||||
TracesTime int `json:"traces_ttl_duration_hrs,omitempty"`
|
||||
TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"`
|
||||
ExpectedMetricsTime int `json:"expected_metrics_ttl_duration_hrs,omitempty"`
|
||||
ExpectedMetricsMoveTime int `json:"expected_metrics_move_ttl_duration_hrs,omitempty"`
|
||||
ExpectedTracesTime int `json:"expected_traces_ttl_duration_hrs,omitempty"`
|
||||
ExpectedTracesMoveTime int `json:"expected_traces_move_ttl_duration_hrs,omitempty"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
type DBResponseServiceName struct {
|
||||
|
@ -99,7 +99,7 @@ func TestAuthInviteAPI(t *testing.T) {
|
||||
|
||||
func TestAuthRegisterAPI(t *testing.T) {
|
||||
email := "alice@signoz.io"
|
||||
resp, err := register(email, "password", "")
|
||||
resp, err := register(email, "Password@123", "")
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, resp, "user registered successfully")
|
||||
|
||||
@ -108,7 +108,7 @@ func TestAuthRegisterAPI(t *testing.T) {
|
||||
func TestAuthLoginAPI(t *testing.T) {
|
||||
t.Skip()
|
||||
email := "abc-login@signoz.io"
|
||||
password := "password123"
|
||||
password := "Password@123"
|
||||
inv := invite(t, email)
|
||||
|
||||
resp, err := register(email, password, inv.InviteToken)
|
||||
|
@ -20,12 +20,16 @@ var (
|
||||
client http.Client
|
||||
)
|
||||
|
||||
func setTTL(table, coldStorage, toColdTTL, deleteTTL string) ([]byte, error) {
|
||||
func setTTL(table, coldStorage, toColdTTL, deleteTTL string, jwtToken string) ([]byte, error) {
|
||||
params := fmt.Sprintf("type=%s&duration=%s", table, deleteTTL)
|
||||
if len(toColdTTL) > 0 {
|
||||
params += fmt.Sprintf("&coldStorage=%s&toColdDuration=%s", coldStorage, toColdTTL)
|
||||
}
|
||||
resp, err := client.Post(endpoint+"/api/v1/settings/ttl?"+params, "", nil)
|
||||
var bearer = "Bearer " + jwtToken
|
||||
req, err := http.NewRequest("POST", endpoint+"/api/v1/settings/ttl?"+params, nil)
|
||||
req.Header.Add("Authorization", bearer)
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -40,7 +44,18 @@ func setTTL(table, coldStorage, toColdTTL, deleteTTL string) ([]byte, error) {
|
||||
}
|
||||
|
||||
func TestListDisks(t *testing.T) {
|
||||
resp, err := client.Get(endpoint + "/api/v1/disks")
|
||||
t.Skip()
|
||||
email := "alice@signoz.io"
|
||||
password := "Password@123"
|
||||
|
||||
loginResp, err := login(email, password, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
var bearer = "Bearer " + loginResp.AccessJwt
|
||||
req, err := http.NewRequest("POST", endpoint+"/api/v1/disks", nil)
|
||||
req.Header.Add("Authorization", bearer)
|
||||
|
||||
resp, err := client.Do(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer resp.Body.Close()
|
||||
@ -50,6 +65,11 @@ func TestListDisks(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSetTTL(t *testing.T) {
|
||||
email := "alice@signoz.io"
|
||||
password := "Password@123"
|
||||
|
||||
loginResp, err := login(email, password, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
testCases := []struct {
|
||||
caseNo int
|
||||
@ -60,7 +80,7 @@ func TestSetTTL(t *testing.T) {
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
1, "s3", "traces", "100s", "60s",
|
||||
1, "s3", "traces", "100h", "60h",
|
||||
"Delete TTL should be greater than cold storage move TTL.",
|
||||
},
|
||||
{
|
||||
@ -72,21 +92,17 @@ func TestSetTTL(t *testing.T) {
|
||||
"Not a valid TTL duration 100",
|
||||
},
|
||||
{
|
||||
4, "s3", "traces", "", "60s",
|
||||
4, "s3", "metrics", "1h", "2h",
|
||||
"move ttl has been successfully set up",
|
||||
},
|
||||
{
|
||||
5, "s3", "traces", "10s", "600s",
|
||||
5, "s3", "traces", "10s", "6h",
|
||||
"move ttl has been successfully set up",
|
||||
},
|
||||
{
|
||||
6, "s4", "traces", "10s", "600s",
|
||||
"No such volume `s4` in storage policy `tiered`",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
r, err := setTTL(tc.table, tc.coldStorage, tc.coldTTL, tc.deleteTTL)
|
||||
r, err := setTTL(tc.table, tc.coldStorage, tc.coldTTL, tc.deleteTTL, loginResp.AccessJwt)
|
||||
require.NoErrorf(t, err, "Failed case: %d", tc.caseNo)
|
||||
require.Containsf(t, string(r), tc.expected, "Failed case: %d", tc.caseNo)
|
||||
}
|
||||
@ -104,13 +120,17 @@ func TestSetTTL(t *testing.T) {
|
||||
fmt.Printf("=== Found %d objects in Minio\n", count)
|
||||
}
|
||||
|
||||
func getTTL(t *testing.T, table string) *model.GetTTLResponseItem {
|
||||
req := endpoint + fmt.Sprintf("/api/v1/settings/ttl?type=%s", table)
|
||||
func getTTL(t *testing.T, table string, jwtToken string) *model.GetTTLResponseItem {
|
||||
url := endpoint + fmt.Sprintf("/api/v1/settings/ttl?type=%s", table)
|
||||
if len(table) == 0 {
|
||||
req = endpoint + "/api/v1/settings/ttl"
|
||||
url = endpoint + "/api/v1/settings/ttl"
|
||||
}
|
||||
|
||||
resp, err := client.Get(req)
|
||||
var bearer = "Bearer " + jwtToken
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
req.Header.Add("Authorization", bearer)
|
||||
resp, err := client.Do(req)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
defer resp.Body.Close()
|
||||
@ -123,55 +143,69 @@ func getTTL(t *testing.T, table string) *model.GetTTLResponseItem {
|
||||
}
|
||||
|
||||
func TestGetTTL(t *testing.T) {
|
||||
r, err := setTTL("traces", "s3", "3600s", "7200s")
|
||||
email := "alice@signoz.io"
|
||||
password := "Password@123"
|
||||
|
||||
loginResp, err := login(email, password, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
resp := getTTL(t, "traces", loginResp.AccessJwt)
|
||||
for resp.Status == "pending" {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
require.Equal(t, "success", resp.Status)
|
||||
|
||||
r, err := setTTL("traces", "s3", "1h", "2h", loginResp.AccessJwt)
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, string(r), "successfully set up")
|
||||
|
||||
resp := getTTL(t, "traces")
|
||||
resp = getTTL(t, "traces", loginResp.AccessJwt)
|
||||
for resp.Status == "pending" {
|
||||
time.Sleep(time.Second)
|
||||
resp = getTTL(t, "traces", loginResp.AccessJwt)
|
||||
require.Equal(t, 1, resp.ExpectedTracesMoveTime)
|
||||
require.Equal(t, 2, resp.ExpectedTracesTime)
|
||||
}
|
||||
resp = getTTL(t, "traces", loginResp.AccessJwt)
|
||||
require.Equal(t, "success", resp.Status)
|
||||
require.Equal(t, 1, resp.TracesMoveTime)
|
||||
require.Equal(t, 2, resp.TracesTime)
|
||||
|
||||
r, err = setTTL("metrics", "s3", "3600s", "7200s")
|
||||
resp = getTTL(t, "metrics", loginResp.AccessJwt)
|
||||
for resp.Status == "pending" {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
require.Equal(t, "success", resp.Status)
|
||||
|
||||
r, err = setTTL("traces", "s3", "10h", "20h", loginResp.AccessJwt)
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, string(r), "successfully set up")
|
||||
|
||||
resp = getTTL(t, "metrics")
|
||||
resp = getTTL(t, "traces", loginResp.AccessJwt)
|
||||
for resp.Status == "pending" {
|
||||
time.Sleep(time.Second)
|
||||
resp = getTTL(t, "traces", loginResp.AccessJwt)
|
||||
}
|
||||
require.Equal(t, "success", resp.Status)
|
||||
require.Equal(t, 10, resp.TracesMoveTime)
|
||||
require.Equal(t, 20, resp.TracesTime)
|
||||
|
||||
resp = getTTL(t, "metrics", loginResp.AccessJwt)
|
||||
for resp.Status != "success" && resp.Status != "failed" {
|
||||
time.Sleep(time.Second)
|
||||
resp = getTTL(t, "metrics", loginResp.AccessJwt)
|
||||
}
|
||||
require.Equal(t, "success", resp.Status)
|
||||
require.Equal(t, 1, resp.MetricsMoveTime)
|
||||
require.Equal(t, 2, resp.MetricsTime)
|
||||
|
||||
r, err = setTTL("traces", "s3", "36000s", "72000s")
|
||||
r, err = setTTL("metrics", "s3", "0s", "0s", loginResp.AccessJwt)
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, string(r), "successfully set up")
|
||||
require.Contains(t, string(r), "Not a valid TTL duration 0s")
|
||||
|
||||
resp = getTTL(t, "")
|
||||
require.Equal(t, 10, resp.TracesMoveTime)
|
||||
require.Equal(t, 20, resp.TracesTime)
|
||||
require.Equal(t, 1, resp.MetricsMoveTime)
|
||||
require.Equal(t, 2, resp.MetricsTime)
|
||||
|
||||
r, err = setTTL("metrics", "s3", "15h", "50h")
|
||||
r, err = setTTL("traces", "s3", "0s", "0s", loginResp.AccessJwt)
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, string(r), "successfully set up")
|
||||
|
||||
resp = getTTL(t, "")
|
||||
require.Equal(t, 10, resp.TracesMoveTime)
|
||||
require.Equal(t, 20, resp.TracesTime)
|
||||
require.Equal(t, 15, resp.MetricsMoveTime)
|
||||
require.Equal(t, 50, resp.MetricsTime)
|
||||
|
||||
r, err = setTTL("metrics", "s3", "0s", "0s")
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, string(r), "successfully set up")
|
||||
|
||||
r, err = setTTL("traces", "s3", "0s", "0s")
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, string(r), "successfully set up")
|
||||
|
||||
resp = getTTL(t, "")
|
||||
require.Equal(t, 0, resp.TracesMoveTime)
|
||||
require.Equal(t, 0, resp.TracesTime)
|
||||
require.Equal(t, 0, resp.MetricsMoveTime)
|
||||
require.Equal(t, 0, resp.MetricsTime)
|
||||
require.Contains(t, string(r), "Not a valid TTL duration 0s")
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user