mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-14 22:55:55 +08:00
feat: ttl api for new trace tables (#6497)
* feat: tt api for new trace tables * fix: remove print and use correct context * fix: update var name
This commit is contained in:
parent
e4505693b0
commit
e46d969143
@ -1374,6 +1374,105 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa
|
|||||||
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||||
|
// uuid is used as transaction id
|
||||||
|
uuidWithHyphen := uuid.New()
|
||||||
|
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
|
||||||
|
tableNames := []string{
|
||||||
|
r.TraceDB + "." + r.traceTableName,
|
||||||
|
r.TraceDB + "." + r.traceResourceTableV3,
|
||||||
|
r.TraceDB + "." + signozErrorIndexTable,
|
||||||
|
r.TraceDB + "." + signozUsageExplorerTable,
|
||||||
|
r.TraceDB + "." + defaultDependencyGraphTable,
|
||||||
|
r.TraceDB + "." + r.traceSummaryTable,
|
||||||
|
}
|
||||||
|
|
||||||
|
coldStorageDuration := -1
|
||||||
|
if len(params.ColdStorageVolume) > 0 {
|
||||||
|
coldStorageDuration = int(params.ToColdStorageDuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if there is existing things to be done
|
||||||
|
for _, tableName := range tableNames {
|
||||||
|
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
|
||||||
|
}
|
||||||
|
if statusItem.Status == constants.StatusPending {
|
||||||
|
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TTL query
|
||||||
|
ttlV2 := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(%s) + INTERVAL %v SECOND DELETE"
|
||||||
|
ttlV2ColdStorage := ", toDateTime(%s) + INTERVAL %v SECOND TO VOLUME '%s'"
|
||||||
|
|
||||||
|
// TTL query for resource table
|
||||||
|
ttlV2Resource := "ALTER TABLE %s ON CLUSTER %s MODIFY TTL toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND DELETE"
|
||||||
|
ttlTracesV2ResourceColdStorage := ", toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + INTERVAL %v SECOND TO VOLUME '%s'"
|
||||||
|
|
||||||
|
for _, distributedTableName := range tableNames {
|
||||||
|
go func(distributedTableName string) {
|
||||||
|
tableName := getLocalTableName(distributedTableName)
|
||||||
|
|
||||||
|
// for trace summary table, we need to use end instead of timestamp
|
||||||
|
timestamp := "timestamp"
|
||||||
|
if strings.HasSuffix(distributedTableName, r.traceSummaryTable) {
|
||||||
|
timestamp = "end"
|
||||||
|
}
|
||||||
|
|
||||||
|
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration)
|
||||||
|
if dbErr != nil {
|
||||||
|
zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
req := fmt.Sprintf(ttlV2, tableName, r.cluster, timestamp, params.DelDuration)
|
||||||
|
if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) {
|
||||||
|
req = fmt.Sprintf(ttlV2Resource, tableName, r.cluster, params.DelDuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(params.ColdStorageVolume) > 0 {
|
||||||
|
if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) {
|
||||||
|
req += fmt.Sprintf(ttlTracesV2ResourceColdStorage, params.ToColdStorageDuration, params.ColdStorageVolume)
|
||||||
|
} else {
|
||||||
|
req += fmt.Sprintf(ttlV2ColdStorage, timestamp, params.ToColdStorageDuration, params.ColdStorageVolume)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error("Error in setting cold storage", zap.Error(err))
|
||||||
|
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||||
|
if err == nil {
|
||||||
|
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
|
||||||
|
if dbErr != nil {
|
||||||
|
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
req += " SETTINGS materialize_ttl_after_modify=0;"
|
||||||
|
zap.L().Error(" ExecutingTTL request: ", zap.String("request", req))
|
||||||
|
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
|
||||||
|
if err := r.db.Exec(ctx, req); err != nil {
|
||||||
|
zap.L().Error("Error in executing set TTL query", zap.Error(err))
|
||||||
|
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
|
||||||
|
if dbErr != nil {
|
||||||
|
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id)
|
||||||
|
if dbErr != nil {
|
||||||
|
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}(distributedTableName)
|
||||||
|
}
|
||||||
|
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// SetTTL sets the TTL for traces or metrics or logs tables.
|
// SetTTL sets the TTL for traces or metrics or logs tables.
|
||||||
// This is an async API which creates goroutines to set TTL.
|
// This is an async API which creates goroutines to set TTL.
|
||||||
// Status of TTL update is tracked with ttl_status table in sqlite db.
|
// Status of TTL update is tracked with ttl_status table in sqlite db.
|
||||||
@ -1395,6 +1494,10 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
|||||||
|
|
||||||
switch params.Type {
|
switch params.Type {
|
||||||
case constants.TraceTTL:
|
case constants.TraceTTL:
|
||||||
|
if r.useTraceNewSchema {
|
||||||
|
return r.SetTTLTracesV2(ctx, params)
|
||||||
|
}
|
||||||
|
|
||||||
tableNames := []string{
|
tableNames := []string{
|
||||||
signozTraceDBName + "." + signozTraceTableName,
|
signozTraceDBName + "." + signozTraceTableName,
|
||||||
signozTraceDBName + "." + signozDurationMVTable,
|
signozTraceDBName + "." + signozDurationMVTable,
|
||||||
@ -1755,7 +1858,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
|
|||||||
getTracesTTL := func() (*model.DBResponseTTL, *model.ApiError) {
|
getTracesTTL := func() (*model.DBResponseTTL, *model.ApiError) {
|
||||||
var dbResp []model.DBResponseTTL
|
var dbResp []model.DBResponseTTL
|
||||||
|
|
||||||
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", signozTraceLocalTableName, signozTraceDBName)
|
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.traceLocalTableName, signozTraceDBName)
|
||||||
|
|
||||||
err := r.db.Select(ctx, &dbResp, query)
|
err := r.db.Select(ctx, &dbResp, query)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user