TTL API for logs V2 (#5926)

* feat: ttl api for logs

* fix: add comments

* fix: add materialize_ttl_after_modify

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
Nityananda Gohain 2024-09-23 20:12:38 +05:30 committed by GitHub
parent a6b05f0a3d
commit 5e5f0f167f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1996,6 +1996,108 @@ func getLocalTableName(tableName string) string {
}
func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// Keep only latest 100 transactions/requests
r.deleteTtlTransactions(ctx, 100)
// uuid is used as transaction id
uuidWithHyphen := uuid.New()
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
coldStorageDuration := -1
if len(params.ColdStorageVolume) > 0 {
coldStorageDuration = int(params.ToColdStorageDuration)
}
tableNameArray := []string{r.logsDB + "." + r.logsLocalTableV2, r.logsDB + "." + r.logsResourceLocalTableV2}
// check if there is existing things to be done
for _, tableName := range tableNameArray {
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
if statusItem.Status == constants.StatusPending {
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
}
}
// TTL query for logs_v2 table
ttlLogsV2 := fmt.Sprintf(
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp / 1000000000) + "+
"INTERVAL %v SECOND DELETE", tableNameArray[0], r.cluster, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
ttlLogsV2 += fmt.Sprintf(", toDateTime(timestamp / 1000000000)"+
" + INTERVAL %v SECOND TO VOLUME '%s'",
params.ToColdStorageDuration, params.ColdStorageVolume)
}
// TTL query for logs_v2_resource table
// adding 1800 as our bucket size is 1800 seconds
ttlLogsV2Resource := fmt.Sprintf(
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + "+
"INTERVAL %v SECOND DELETE", tableNameArray[1], r.cluster, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
ttlLogsV2Resource += fmt.Sprintf(", toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + "+
"INTERVAL %v SECOND TO VOLUME '%s'",
params.ToColdStorageDuration, params.ColdStorageVolume)
}
ttlPayload := map[string]string{
tableNameArray[0]: ttlLogsV2,
tableNameArray[1]: ttlLogsV2Resource,
}
// set the ttl if nothing is pending/ no errors
go func(ttlPayload map[string]string) {
for tableName, query := range ttlPayload {
// https://github.com/SigNoz/signoz/issues/5470
// we will change ttl for only the new parts and not the old ones
query += " SETTINGS materialize_ttl_after_modify=0"
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration)
if dbErr != nil {
zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return
}
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil {
zap.L().Error("error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
}
return
}
zap.L().Info("Executing TTL request: ", zap.String("request", query))
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
if err := r.db.Exec(ctx, query); err != nil {
zap.L().Error("error while setting ttl", zap.Error(err))
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
return
}
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id)
if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return
}
}
}(ttlPayload)
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
// SetTTL sets the TTL for traces or metrics or logs tables.
// This is an async API which creates goroutines to set TTL.
// Status of TTL update is tracked with ttl_status table in sqlite db.
// SetTTL sets the TTL for traces or metrics or logs tables.
// This is an async API which creates goroutines to set TTL.
// Status of TTL update is tracked with ttl_status table in sqlite db.
@ -2139,6 +2241,10 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
go metricTTL(tableName)
}
case constants.LogsTTL:
if r.useLogsNewSchema {
return r.SetTTLLogsV2(ctx, params)
}
tableName := r.logsDB + "." + r.logsLocalTable
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err != nil {
@ -2372,7 +2478,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
getLogsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
var dbResp []model.DBResponseTTL
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsLocalTable, r.logsDB)
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsLocalTableName, r.logsDB)
err := r.db.Select(ctx, &dbResp, query)