diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index bc8da8c615..281bbc5a6d 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -2055,7 +2055,6 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { // Keep only latest 100 transactions/requests r.deleteTtlTransactions(ctx, 100) - var req, tableName string // uuid is used as transaction id uuidWithHyphen := uuid.New() uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) @@ -2068,8 +2067,8 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, switch params.Type { case constants.TraceTTL: tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable} - for _, tableName = range tableNameArray { - tableName = getLocalTableName(tableName) + for _, tableName := range tableNameArray { + tableName := getLocalTableName(tableName) statusItem, err := r.checkTTLStatusItem(ctx, tableName) if err != nil { return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")} @@ -2079,7 +2078,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } } for _, tableName := range tableNameArray { - tableName = getLocalTableName(tableName) + tableName := getLocalTableName(tableName) // TODO: DB queries should be implemented with transactional statements but currently clickhouse doesn't support them. Issue: https://github.com/ClickHouse/ClickHouse/issues/22086 go func(tableName string) { _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) @@ -2087,7 +2086,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, zap.S().Error(fmt.Errorf("Error in inserting to ttl_status table: %s", dbErr.Error())) return } - req = fmt.Sprintf( + req := fmt.Sprintf( "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE", tableName, cluster, params.DelDuration) if len(params.ColdStorageVolume) > 0 { @@ -2107,6 +2106,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } return } + req += fmt.Sprint(" SETTINGS distributed_ddl_task_timeout = -1;") zap.S().Debugf("Executing TTL request: %s\n", req) statusItem, _ := r.checkTTLStatusItem(ctx, tableName) if err := r.db.Exec(context.Background(), req); err != nil { @@ -2127,7 +2127,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } case constants.MetricsTTL: - tableName = signozMetricDBName + "." + signozSampleLocalTableName + tableName := signozMetricDBName + "." + signozSampleLocalTableName statusItem, err := r.checkTTLStatusItem(ctx, tableName) if err != nil { return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")} @@ -2141,7 +2141,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, zap.S().Error(fmt.Errorf("Error in inserting to ttl_status table: %s", dbErr.Error())) return } - req = fmt.Sprintf( + req := fmt.Sprintf( "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+ "INTERVAL %v SECOND DELETE", tableName, cluster, params.DelDuration) if len(params.ColdStorageVolume) > 0 { @@ -2162,6 +2162,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } return } + req += fmt.Sprint(" SETTINGS distributed_ddl_task_timeout = -1") zap.S().Debugf("Executing TTL request: %s\n", req) statusItem, _ := r.checkTTLStatusItem(ctx, tableName) if err := r.db.Exec(ctx, req); err != nil { @@ -2180,7 +2181,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } }(tableName) case constants.LogsTTL: - tableName = r.logsDB + "." + r.logsLocalTable + tableName := r.logsDB + "." + r.logsLocalTable statusItem, err := r.checkTTLStatusItem(ctx, tableName) if err != nil { return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} @@ -2194,7 +2195,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, zap.S().Error(fmt.Errorf("error in inserting to ttl_status table: %s", dbErr.Error())) return } - req = fmt.Sprintf( + req := fmt.Sprintf( "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp / 1000000000) + "+ "INTERVAL %v SECOND DELETE", tableName, cluster, params.DelDuration) if len(params.ColdStorageVolume) > 0 { @@ -2215,6 +2216,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } return } + req += fmt.Sprint(" SETTINGS distributed_ddl_task_timeout = -1") zap.S().Debugf("Executing TTL request: %s\n", req) statusItem, _ := r.checkTTLStatusItem(ctx, tableName) if err := r.db.Exec(ctx, req); err != nil {