mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-11 04:39:59 +08:00
fix: remove shared variable in TTL and async TTL queries (#1821)
* fix: remove shared variable in TTL * fix: set distributed_ddl_task_timeout to 0 for async TTL * chore: updated distributed_ddl_task_timeout to sync TTL queries
This commit is contained in:
parent
8a9d6f664a
commit
6931b18382
@ -2055,7 +2055,6 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
|||||||
params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||||
// Keep only latest 100 transactions/requests
|
// Keep only latest 100 transactions/requests
|
||||||
r.deleteTtlTransactions(ctx, 100)
|
r.deleteTtlTransactions(ctx, 100)
|
||||||
var req, tableName string
|
|
||||||
// uuid is used as transaction id
|
// uuid is used as transaction id
|
||||||
uuidWithHyphen := uuid.New()
|
uuidWithHyphen := uuid.New()
|
||||||
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
|
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
|
||||||
@ -2068,8 +2067,8 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
|||||||
switch params.Type {
|
switch params.Type {
|
||||||
case constants.TraceTTL:
|
case constants.TraceTTL:
|
||||||
tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable}
|
tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable}
|
||||||
for _, tableName = range tableNameArray {
|
for _, tableName := range tableNameArray {
|
||||||
tableName = getLocalTableName(tableName)
|
tableName := getLocalTableName(tableName)
|
||||||
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
|
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
|
||||||
@ -2079,7 +2078,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, tableName := range tableNameArray {
|
for _, tableName := range tableNameArray {
|
||||||
tableName = getLocalTableName(tableName)
|
tableName := getLocalTableName(tableName)
|
||||||
// TODO: DB queries should be implemented with transactional statements but currently clickhouse doesn't support them. Issue: https://github.com/ClickHouse/ClickHouse/issues/22086
|
// TODO: DB queries should be implemented with transactional statements but currently clickhouse doesn't support them. Issue: https://github.com/ClickHouse/ClickHouse/issues/22086
|
||||||
go func(tableName string) {
|
go func(tableName string) {
|
||||||
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration)
|
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration)
|
||||||
@ -2087,7 +2086,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
|||||||
zap.S().Error(fmt.Errorf("Error in inserting to ttl_status table: %s", dbErr.Error()))
|
zap.S().Error(fmt.Errorf("Error in inserting to ttl_status table: %s", dbErr.Error()))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
req = fmt.Sprintf(
|
req := fmt.Sprintf(
|
||||||
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE",
|
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE",
|
||||||
tableName, cluster, params.DelDuration)
|
tableName, cluster, params.DelDuration)
|
||||||
if len(params.ColdStorageVolume) > 0 {
|
if len(params.ColdStorageVolume) > 0 {
|
||||||
@ -2107,6 +2106,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
req += fmt.Sprint(" SETTINGS distributed_ddl_task_timeout = -1;")
|
||||||
zap.S().Debugf("Executing TTL request: %s\n", req)
|
zap.S().Debugf("Executing TTL request: %s\n", req)
|
||||||
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
|
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
|
||||||
if err := r.db.Exec(context.Background(), req); err != nil {
|
if err := r.db.Exec(context.Background(), req); err != nil {
|
||||||
@ -2127,7 +2127,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
case constants.MetricsTTL:
|
case constants.MetricsTTL:
|
||||||
tableName = signozMetricDBName + "." + signozSampleLocalTableName
|
tableName := signozMetricDBName + "." + signozSampleLocalTableName
|
||||||
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
|
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
|
||||||
@ -2141,7 +2141,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
|||||||
zap.S().Error(fmt.Errorf("Error in inserting to ttl_status table: %s", dbErr.Error()))
|
zap.S().Error(fmt.Errorf("Error in inserting to ttl_status table: %s", dbErr.Error()))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
req = fmt.Sprintf(
|
req := fmt.Sprintf(
|
||||||
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+
|
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+
|
||||||
"INTERVAL %v SECOND DELETE", tableName, cluster, params.DelDuration)
|
"INTERVAL %v SECOND DELETE", tableName, cluster, params.DelDuration)
|
||||||
if len(params.ColdStorageVolume) > 0 {
|
if len(params.ColdStorageVolume) > 0 {
|
||||||
@ -2162,6 +2162,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
req += fmt.Sprint(" SETTINGS distributed_ddl_task_timeout = -1")
|
||||||
zap.S().Debugf("Executing TTL request: %s\n", req)
|
zap.S().Debugf("Executing TTL request: %s\n", req)
|
||||||
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
|
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
|
||||||
if err := r.db.Exec(ctx, req); err != nil {
|
if err := r.db.Exec(ctx, req); err != nil {
|
||||||
@ -2180,7 +2181,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
|||||||
}
|
}
|
||||||
}(tableName)
|
}(tableName)
|
||||||
case constants.LogsTTL:
|
case constants.LogsTTL:
|
||||||
tableName = r.logsDB + "." + r.logsLocalTable
|
tableName := r.logsDB + "." + r.logsLocalTable
|
||||||
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
|
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
|
||||||
@ -2194,7 +2195,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
|||||||
zap.S().Error(fmt.Errorf("error in inserting to ttl_status table: %s", dbErr.Error()))
|
zap.S().Error(fmt.Errorf("error in inserting to ttl_status table: %s", dbErr.Error()))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
req = fmt.Sprintf(
|
req := fmt.Sprintf(
|
||||||
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp / 1000000000) + "+
|
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp / 1000000000) + "+
|
||||||
"INTERVAL %v SECOND DELETE", tableName, cluster, params.DelDuration)
|
"INTERVAL %v SECOND DELETE", tableName, cluster, params.DelDuration)
|
||||||
if len(params.ColdStorageVolume) > 0 {
|
if len(params.ColdStorageVolume) > 0 {
|
||||||
@ -2215,6 +2216,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
req += fmt.Sprint(" SETTINGS distributed_ddl_task_timeout = -1")
|
||||||
zap.S().Debugf("Executing TTL request: %s\n", req)
|
zap.S().Debugf("Executing TTL request: %s\n", req)
|
||||||
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
|
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
|
||||||
if err := r.db.Exec(ctx, req); err != nil {
|
if err := r.db.Exec(ctx, req); err != nil {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user