diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index bb2a84b487..8dd29856fb 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -1996,6 +1996,108 @@ func getLocalTableName(tableName string) string { } +func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { + // Keep only latest 100 transactions/requests + r.deleteTtlTransactions(ctx, 100) + // uuid is used as transaction id + uuidWithHyphen := uuid.New() + uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) + + coldStorageDuration := -1 + if len(params.ColdStorageVolume) > 0 { + coldStorageDuration = int(params.ToColdStorageDuration) + } + + tableNameArray := []string{r.logsDB + "." + r.logsLocalTableV2, r.logsDB + "." + r.logsResourceLocalTableV2} + + // check if there is existing things to be done + for _, tableName := range tableNameArray { + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} + } + if statusItem.Status == constants.StatusPending { + return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} + } + } + + // TTL query for logs_v2 table + ttlLogsV2 := fmt.Sprintf( + "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp / 1000000000) + "+ + "INTERVAL %v SECOND DELETE", tableNameArray[0], r.cluster, params.DelDuration) + if len(params.ColdStorageVolume) > 0 { + ttlLogsV2 += fmt.Sprintf(", toDateTime(timestamp / 1000000000)"+ + " + INTERVAL %v SECOND TO VOLUME '%s'", + params.ToColdStorageDuration, params.ColdStorageVolume) + } + + // TTL query for logs_v2_resource table + // adding 1800 as our bucket size is 1800 seconds + ttlLogsV2Resource := fmt.Sprintf( + "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + "+ + "INTERVAL %v SECOND DELETE", tableNameArray[1], r.cluster, params.DelDuration) + if len(params.ColdStorageVolume) > 0 { + ttlLogsV2Resource += fmt.Sprintf(", toDateTime(seen_at_ts_bucket_start) + toIntervalSecond(1800) + "+ + "INTERVAL %v SECOND TO VOLUME '%s'", + params.ToColdStorageDuration, params.ColdStorageVolume) + } + + ttlPayload := map[string]string{ + tableNameArray[0]: ttlLogsV2, + tableNameArray[1]: ttlLogsV2Resource, + } + + // set the ttl if nothing is pending/ no errors + go func(ttlPayload map[string]string) { + for tableName, query := range ttlPayload { + // https://github.com/SigNoz/signoz/issues/5470 + // we will change ttl for only the new parts and not the old ones + query += " SETTINGS materialize_ttl_after_modify=0" + + _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) + if dbErr != nil { + zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr)) + return + } + + err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) + if err != nil { + zap.L().Error("error in setting cold storage", zap.Error(err)) + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + if err == nil { + _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + if dbErr != nil { + zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) + return + } + } + return + } + zap.L().Info("Executing TTL request: ", zap.String("request", query)) + statusItem, _ := r.checkTTLStatusItem(ctx, tableName) + if err := r.db.Exec(ctx, query); err != nil { + zap.L().Error("error while setting ttl", zap.Error(err)) + _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + if dbErr != nil { + zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) + return + } + return + } + _, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) + if dbErr != nil { + zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) + return + } + } + + }(ttlPayload) + return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil +} + +// SetTTL sets the TTL for traces or metrics or logs tables. +// This is an async API which creates goroutines to set TTL. +// Status of TTL update is tracked with ttl_status table in sqlite db. // SetTTL sets the TTL for traces or metrics or logs tables. // This is an async API which creates goroutines to set TTL. // Status of TTL update is tracked with ttl_status table in sqlite db. @@ -2139,6 +2241,10 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, go metricTTL(tableName) } case constants.LogsTTL: + if r.useLogsNewSchema { + return r.SetTTLLogsV2(ctx, params) + } + tableName := r.logsDB + "." + r.logsLocalTable statusItem, err := r.checkTTLStatusItem(ctx, tableName) if err != nil { @@ -2372,7 +2478,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa getLogsTTL := func() (*model.DBResponseTTL, *model.ApiError) { var dbResp []model.DBResponseTTL - query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsLocalTable, r.logsDB) + query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsLocalTableName, r.logsDB) err := r.db.Select(ctx, &dbResp, query)