feat(sqlmigration): update apdex and TTL status tables (#7517)

* feat(sqlmigration): update the alertmanager tables

* feat(sqlmigration): update the alertmanager tables

* feat(sqlmigration): make the preference package multi tenant

* feat(preference): address nit pick comments

* feat(preference): added the cascade delete for preferences

* feat(sqlmigration): update apdex and TTL status tables  (#7481)

* feat(sqlmigration): update the apdex and ttl tables

* feat(sqlmigration): register the new migration and rename table

* feat(sqlmigration): fix the ttl queries

* feat(sqlmigration): update the TTL and apdex tables

* feat(sqlmigration): update the TTL and apdex tables
This commit is contained in:
Vikrant Gupta 2025-04-04 01:36:47 +05:30 committed by GitHub
parent 79e9d1b357
commit 0116eb20ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 617 additions and 130 deletions

View File

@ -5,23 +5,22 @@ import (
"github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2"
"github.com/jmoiron/sqlx"
"github.com/SigNoz/signoz/pkg/cache" "github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/prometheus" "github.com/SigNoz/signoz/pkg/prometheus"
basechr "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader" basechr "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/interfaces" "github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore"
) )
type ClickhouseReader struct { type ClickhouseReader struct {
conn clickhouse.Conn conn clickhouse.Conn
appdb *sqlx.DB appdb sqlstore.SQLStore
*basechr.ClickHouseReader *basechr.ClickHouseReader
} }
func NewDataConnector( func NewDataConnector(
localDB *sqlx.DB, sqlDB sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore, telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus, prometheus prometheus.Prometheus,
lm interfaces.FeatureLookup, lm interfaces.FeatureLookup,
@ -31,10 +30,10 @@ func NewDataConnector(
fluxIntervalForTraceDetail time.Duration, fluxIntervalForTraceDetail time.Duration,
cache cache.Cache, cache cache.Cache,
) *ClickhouseReader { ) *ClickhouseReader {
chReader := basechr.NewReader(localDB, telemetryStore, prometheus, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) chReader := basechr.NewReader(sqlDB, telemetryStore, prometheus, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
return &ClickhouseReader{ return &ClickhouseReader{
conn: telemetryStore.ClickhouseDB(), conn: telemetryStore.ClickhouseDB(),
appdb: localDB, appdb: sqlDB,
ClickHouseReader: chReader, ClickHouseReader: chReader,
} }
} }

View File

@ -139,7 +139,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
} }
reader := db.NewDataConnector( reader := db.NewDataConnector(
serverOptions.SigNoz.SQLStore.SQLxDB(), serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore, serverOptions.SigNoz.TelemetryStore,
serverOptions.SigNoz.Prometheus, serverOptions.SigNoz.Prometheus,
lm, lm,

View File

@ -17,7 +17,11 @@ import (
"github.com/SigNoz/signoz/pkg/prometheus" "github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer" "github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"honnef.co/go/tools/config" "honnef.co/go/tools/config"
"github.com/google/uuid" "github.com/google/uuid"
@ -31,7 +35,6 @@ import (
"github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/SigNoz/signoz/pkg/cache" "github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/jmoiron/sqlx"
"go.uber.org/zap" "go.uber.org/zap"
@ -115,7 +118,7 @@ var (
type ClickHouseReader struct { type ClickHouseReader struct {
db clickhouse.Conn db clickhouse.Conn
prometheus prometheus.Prometheus prometheus prometheus.Prometheus
localDB *sqlx.DB sqlDB sqlstore.SQLStore
TraceDB string TraceDB string
operationsTable string operationsTable string
durationTable string durationTable string
@ -166,7 +169,7 @@ type ClickHouseReader struct {
// NewTraceReader returns a TraceReader for the database // NewTraceReader returns a TraceReader for the database
func NewReader( func NewReader(
localDB *sqlx.DB, sqlDB sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore, telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus, prometheus prometheus.Prometheus,
featureFlag interfaces.FeatureLookup, featureFlag interfaces.FeatureLookup,
@ -177,12 +180,12 @@ func NewReader(
cache cache.Cache, cache cache.Cache,
) *ClickHouseReader { ) *ClickHouseReader {
options := NewOptions(primaryNamespace, archiveNamespace) options := NewOptions(primaryNamespace, archiveNamespace)
return NewReaderFromClickhouseConnection(options, localDB, telemetryStore, prometheus, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) return NewReaderFromClickhouseConnection(options, sqlDB, telemetryStore, prometheus, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
} }
func NewReaderFromClickhouseConnection( func NewReaderFromClickhouseConnection(
options *Options, options *Options,
localDB *sqlx.DB, sqlDB sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore, telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus, prometheus prometheus.Prometheus,
featureFlag interfaces.FeatureLookup, featureFlag interfaces.FeatureLookup,
@ -209,7 +212,7 @@ func NewReaderFromClickhouseConnection(
return &ClickHouseReader{ return &ClickHouseReader{
db: telemetryStore.ClickhouseDB(), db: telemetryStore.ClickhouseDB(),
prometheus: prometheus, prometheus: prometheus,
localDB: localDB, sqlDB: sqlDB,
TraceDB: options.primary.TraceDB, TraceDB: options.primary.TraceDB,
operationsTable: options.primary.OperationsTable, operationsTable: options.primary.OperationsTable,
indexTable: options.primary.IndexTable, indexTable: options.primary.IndexTable,
@ -1685,9 +1688,7 @@ func getLocalTableName(tableName string) string {
} }
func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// Keep only latest 100 transactions/requests
r.deleteTtlTransactions(ctx, 100)
// uuid is used as transaction id // uuid is used as transaction id
uuidWithHyphen := uuid.New() uuidWithHyphen := uuid.New()
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
@ -1701,7 +1702,7 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa
// check if there is existing things to be done // check if there is existing things to be done
for _, tableName := range tableNameArray { for _, tableName := range tableNameArray {
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err != nil { if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
} }
@ -1743,7 +1744,27 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa
// we will change ttl for only the new parts and not the old ones // we will change ttl for only the new parts and not the old ones
query += " SETTINGS materialize_ttl_after_modify=0" query += " SETTINGS materialize_ttl_after_modify=0"
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
_, dbErr := r.
sqlDB.
BunDB().
NewInsert().
Model(&ttl).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr)) zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return return
@ -1752,9 +1773,17 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil { if err != nil {
zap.L().Error("error in setting cold storage", zap.Error(err)) zap.L().Error("error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err == nil { if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@ -1763,17 +1792,33 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa
return return
} }
zap.L().Info("Executing TTL request: ", zap.String("request", query)) zap.L().Info("Executing TTL request: ", zap.String("request", query))
statusItem, _ := r.checkTTLStatusItem(ctx, tableName) statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(ctx, query); err != nil { if err := r.db.Exec(ctx, query); err != nil {
zap.L().Error("error while setting ttl", zap.Error(err)) zap.L().Error("error while setting ttl", zap.Error(err))
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
} }
return return
} }
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) _, dbErr = r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@ -1784,7 +1829,7 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
} }
func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// uuid is used as transaction id // uuid is used as transaction id
uuidWithHyphen := uuid.New() uuidWithHyphen := uuid.New()
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
@ -1804,7 +1849,7 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL
// check if there is existing things to be done // check if there is existing things to be done
for _, tableName := range tableNames { for _, tableName := range tableNames {
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err != nil { if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
} }
@ -1831,11 +1876,32 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL
timestamp = "end" timestamp = "end"
} }
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
_, dbErr := r.
sqlDB.
BunDB().
NewInsert().
Model(&ttl).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr)) zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return return
} }
req := fmt.Sprintf(ttlV2, tableName, r.cluster, timestamp, params.DelDuration) req := fmt.Sprintf(ttlV2, tableName, r.cluster, timestamp, params.DelDuration)
if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) { if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) {
req = fmt.Sprintf(ttlV2Resource, tableName, r.cluster, params.DelDuration) req = fmt.Sprintf(ttlV2Resource, tableName, r.cluster, params.DelDuration)
@ -1851,9 +1917,17 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil { if err != nil {
zap.L().Error("Error in setting cold storage", zap.Error(err)) zap.L().Error("Error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err == nil { if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@ -1863,17 +1937,33 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL
} }
req += " SETTINGS materialize_ttl_after_modify=0;" req += " SETTINGS materialize_ttl_after_modify=0;"
zap.L().Error(" ExecutingTTL request: ", zap.String("request", req)) zap.L().Error(" ExecutingTTL request: ", zap.String("request", req))
statusItem, _ := r.checkTTLStatusItem(ctx, tableName) statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(ctx, req); err != nil { if err := r.db.Exec(ctx, req); err != nil {
zap.L().Error("Error in executing set TTL query", zap.Error(err)) zap.L().Error("Error in executing set TTL query", zap.Error(err))
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
} }
return return
} }
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) _, dbErr = r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@ -1886,10 +1976,10 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL
// SetTTL sets the TTL for traces or metrics or logs tables. // SetTTL sets the TTL for traces or metrics or logs tables.
// This is an async API which creates goroutines to set TTL. // This is an async API which creates goroutines to set TTL.
// Status of TTL update is tracked with ttl_status table in sqlite db. // Status of TTL update is tracked with ttl_status table in sqlite db.
func (r *ClickHouseReader) SetTTL(ctx context.Context, func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string,
params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
// Keep only latest 100 transactions/requests // Keep only latest 100 transactions/requests
r.deleteTtlTransactions(ctx, 100) r.deleteTtlTransactions(ctx, orgID, 100)
// uuid is used as transaction id // uuid is used as transaction id
uuidWithHyphen := uuid.New() uuidWithHyphen := uuid.New()
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
@ -1902,7 +1992,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
switch params.Type { switch params.Type {
case constants.TraceTTL: case constants.TraceTTL:
if r.useTraceNewSchema { if r.useTraceNewSchema {
return r.SetTTLTracesV2(ctx, params) return r.SetTTLTracesV2(ctx, orgID, params)
} }
tableNames := []string{ tableNames := []string{
@ -1915,7 +2005,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
} }
for _, tableName := range tableNames { for _, tableName := range tableNames {
tableName := getLocalTableName(tableName) tableName := getLocalTableName(tableName)
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err != nil { if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
} }
@ -1927,9 +2017,29 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
tableName := getLocalTableName(tableName) tableName := getLocalTableName(tableName)
// TODO: DB queries should be implemented with transactional statements but currently clickhouse doesn't support them. Issue: https://github.com/ClickHouse/ClickHouse/issues/22086 // TODO: DB queries should be implemented with transactional statements but currently clickhouse doesn't support them. Issue: https://github.com/ClickHouse/ClickHouse/issues/22086
go func(tableName string) { go func(tableName string) {
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
_, dbErr := r.
sqlDB.
BunDB().
NewInsert().
Model(&ttl).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr)) zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return return
} }
req := fmt.Sprintf( req := fmt.Sprintf(
@ -1942,9 +2052,17 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil { if err != nil {
zap.L().Error("Error in setting cold storage", zap.Error(err)) zap.L().Error("Error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err == nil { if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@ -1954,17 +2072,33 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
} }
req += " SETTINGS materialize_ttl_after_modify=0;" req += " SETTINGS materialize_ttl_after_modify=0;"
zap.L().Error("Executing TTL request: ", zap.String("request", req)) zap.L().Error("Executing TTL request: ", zap.String("request", req))
statusItem, _ := r.checkTTLStatusItem(ctx, tableName) statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(context.Background(), req); err != nil { if err := r.db.Exec(context.Background(), req); err != nil {
zap.L().Error("Error in executing set TTL query", zap.Error(err)) zap.L().Error("Error in executing set TTL query", zap.Error(err))
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
} }
return return
} }
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) _, dbErr = r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@ -1984,7 +2118,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
signozMetricDBName + "." + signozTSLocalTableNameV41Week, signozMetricDBName + "." + signozTSLocalTableNameV41Week,
} }
for _, tableName := range tableNames { for _, tableName := range tableNames {
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err != nil { if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
} }
@ -1993,9 +2127,29 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
} }
} }
metricTTL := func(tableName string) { metricTTL := func(tableName string) {
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
_, dbErr := r.
sqlDB.
BunDB().
NewInsert().
Model(&ttl).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr)) zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return return
} }
timeColumn := "timestamp_ms" timeColumn := "timestamp_ms"
@ -2014,9 +2168,17 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil { if err != nil {
zap.L().Error("Error in setting cold storage", zap.Error(err)) zap.L().Error("Error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err == nil { if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@ -2026,17 +2188,33 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
} }
req += " SETTINGS materialize_ttl_after_modify=0" req += " SETTINGS materialize_ttl_after_modify=0"
zap.L().Info("Executing TTL request: ", zap.String("request", req)) zap.L().Info("Executing TTL request: ", zap.String("request", req))
statusItem, _ := r.checkTTLStatusItem(ctx, tableName) statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(ctx, req); err != nil { if err := r.db.Exec(ctx, req); err != nil {
zap.L().Error("error while setting ttl.", zap.Error(err)) zap.L().Error("error while setting ttl.", zap.Error(err))
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
} }
return return
} }
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) _, dbErr = r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@ -2047,11 +2225,11 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
} }
case constants.LogsTTL: case constants.LogsTTL:
if r.useLogsNewSchema { if r.useLogsNewSchema {
return r.SetTTLLogsV2(ctx, params) return r.SetTTLLogsV2(ctx, orgID, params)
} }
tableName := r.logsDB + "." + r.logsLocalTable tableName := r.logsDB + "." + r.logsLocalTable
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err != nil { if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
} }
@ -2059,7 +2237,27 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
} }
go func(tableName string) { go func(tableName string) {
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) ttl := types.TTLSetting{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
TransactionID: uuid,
TableName: tableName,
TTL: int(params.DelDuration),
Status: constants.StatusPending,
ColdStorageTTL: coldStorageDuration,
OrgID: orgID,
}
_, dbErr := r.
sqlDB.
BunDB().
NewInsert().
Model(&ttl).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr)) zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr))
return return
@ -2075,9 +2273,17 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil { if err != nil {
zap.L().Error("error in setting cold storage", zap.Error(err)) zap.L().Error("error in setting cold storage", zap.Error(err))
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
if err == nil { if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@ -2087,17 +2293,33 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
} }
req += " SETTINGS materialize_ttl_after_modify=0" req += " SETTINGS materialize_ttl_after_modify=0"
zap.L().Info("Executing TTL request: ", zap.String("request", req)) zap.L().Info("Executing TTL request: ", zap.String("request", req))
statusItem, _ := r.checkTTLStatusItem(ctx, tableName) statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName)
if err := r.db.Exec(ctx, req); err != nil { if err := r.db.Exec(ctx, req); err != nil {
zap.L().Error("error while setting ttl", zap.Error(err)) zap.L().Error("error while setting ttl", zap.Error(err))
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) _, dbErr := r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusFailed).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
} }
return return
} }
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) _, dbErr = r.
sqlDB.
BunDB().
NewUpdate().
Model(new(types.TTLSetting)).
Set("updated_at = ?", time.Now()).
Set("status = ?", constants.StatusSuccess).
Where("id = ?", statusItem.ID.StringValue()).
Exec(ctx)
if dbErr != nil { if dbErr != nil {
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
return return
@ -2112,47 +2334,63 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
} }
func (r *ClickHouseReader) deleteTtlTransactions(_ context.Context, numberOfTransactionsStore int) { func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, orgID string, numberOfTransactionsStore int) {
_, err := r.localDB.Exec("DELETE FROM ttl_status WHERE transaction_id NOT IN (SELECT distinct transaction_id FROM ttl_status ORDER BY created_at DESC LIMIT ?)", numberOfTransactionsStore) limitTransactions := []string{}
err := r.
sqlDB.
BunDB().
NewSelect().
ColumnExpr("distinct(transaction_id)").
Model(new(types.TTLSetting)).
Where("org_id = ?", orgID).
OrderExpr("created_at DESC").
Limit(numberOfTransactionsStore).
Scan(ctx, &limitTransactions)
if err != nil {
zap.L().Error("Error in processing ttl_status delete sql query", zap.Error(err))
}
_, err = r.
sqlDB.
BunDB().
NewDelete().
Model(new(types.TTLSetting)).
Where("transaction_id NOT IN (?)", bun.In(limitTransactions)).
Exec(ctx)
if err != nil { if err != nil {
zap.L().Error("Error in processing ttl_status delete sql query", zap.Error(err)) zap.L().Error("Error in processing ttl_status delete sql query", zap.Error(err))
} }
} }
// checkTTLStatusItem checks if ttl_status table has an entry for the given table name // checkTTLStatusItem checks if ttl_status table has an entry for the given table name
func (r *ClickHouseReader) checkTTLStatusItem(_ context.Context, tableName string) (model.TTLStatusItem, *model.ApiError) { func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, orgID string, tableName string) (*types.TTLSetting, *model.ApiError) {
statusItem := []model.TTLStatusItem{} zap.L().Info("checkTTLStatusItem query", zap.String("tableName", tableName))
ttl := new(types.TTLSetting)
query := `SELECT id, status, ttl, cold_storage_ttl FROM ttl_status WHERE table_name = ? ORDER BY created_at DESC` err := r.
sqlDB.
zap.L().Info("checkTTLStatusItem query", zap.String("query", query), zap.String("tableName", tableName)) BunDB().
NewSelect().
stmt, err := r.localDB.Preparex(query) Model(ttl).
Where("table_name = ?", tableName).
if err != nil { Where("org_id = ?", orgID).
zap.L().Error("Error preparing query for checkTTLStatusItem", zap.Error(err)) OrderExpr("created_at DESC").
return model.TTLStatusItem{}, &model.ApiError{Typ: model.ErrorInternal, Err: err} Limit(1).
} Scan(ctx)
if err != nil && err != sql.ErrNoRows {
err = stmt.Select(&statusItem, tableName)
if len(statusItem) == 0 {
return model.TTLStatusItem{}, nil
}
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err)) zap.L().Error("Error in processing sql query", zap.Error(err))
return model.TTLStatusItem{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} return ttl, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
} }
return statusItem[0], nil return ttl, nil
} }
// setTTLQueryStatus fetches ttl_status table status from DB // setTTLQueryStatus fetches ttl_status table status from DB
func (r *ClickHouseReader) setTTLQueryStatus(ctx context.Context, tableNameArray []string) (string, *model.ApiError) { func (r *ClickHouseReader) setTTLQueryStatus(ctx context.Context, orgID string, tableNameArray []string) (string, *model.ApiError) {
failFlag := false failFlag := false
status := constants.StatusSuccess status := constants.StatusSuccess
for _, tableName := range tableNameArray { for _, tableName := range tableNameArray {
statusItem, err := r.checkTTLStatusItem(ctx, tableName) statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName)
emptyStatusStruct := model.TTLStatusItem{} emptyStatusStruct := new(types.TTLSetting)
if statusItem == emptyStatusStruct { if statusItem == emptyStatusStruct {
return "", nil return "", nil
} }
@ -2213,7 +2451,7 @@ func getLocalTableNameArray(tableNames []string) []string {
} }
// GetTTL returns current ttl, expected ttl and past setTTL status for metrics/traces. // GetTTL returns current ttl, expected ttl and past setTTL status for metrics/traces.
func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) { func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
parseTTL := func(queryResp string) (int, int) { parseTTL := func(queryResp string) (int, int) {
@ -2303,7 +2541,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable} tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable}
tableNameArray = getLocalTableNameArray(tableNameArray) tableNameArray = getLocalTableNameArray(tableNameArray)
status, err := r.setTTLQueryStatus(ctx, tableNameArray) status, err := r.setTTLQueryStatus(ctx, orgID, tableNameArray)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -2311,22 +2549,22 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
if err != nil { if err != nil {
return nil, err return nil, err
} }
ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0]) ttlQuery, err := r.checkTTLStatusItem(ctx, orgID, tableNameArray[0])
if err != nil { if err != nil {
return nil, err return nil, err
} }
ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours
if ttlQuery.ColdStorageTtl != -1 { if ttlQuery.ColdStorageTTL != -1 {
ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours ttlQuery.ColdStorageTTL = ttlQuery.ColdStorageTTL / 3600 // convert to hours
} }
delTTL, moveTTL := parseTTL(dbResp.EngineFull) delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
case constants.MetricsTTL: case constants.MetricsTTL:
tableNameArray := []string{signozMetricDBName + "." + signozSampleTableName} tableNameArray := []string{signozMetricDBName + "." + signozSampleTableName}
tableNameArray = getLocalTableNameArray(tableNameArray) tableNameArray = getLocalTableNameArray(tableNameArray)
status, err := r.setTTLQueryStatus(ctx, tableNameArray) status, err := r.setTTLQueryStatus(ctx, orgID, tableNameArray)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -2334,22 +2572,22 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
if err != nil { if err != nil {
return nil, err return nil, err
} }
ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0]) ttlQuery, err := r.checkTTLStatusItem(ctx, orgID, tableNameArray[0])
if err != nil { if err != nil {
return nil, err return nil, err
} }
ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours
if ttlQuery.ColdStorageTtl != -1 { if ttlQuery.ColdStorageTTL != -1 {
ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours ttlQuery.ColdStorageTTL = ttlQuery.ColdStorageTTL / 3600 // convert to hours
} }
delTTL, moveTTL := parseTTL(dbResp.EngineFull) delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
case constants.LogsTTL: case constants.LogsTTL:
tableNameArray := []string{r.logsDB + "." + r.logsTable} tableNameArray := []string{r.logsDB + "." + r.logsTableName}
tableNameArray = getLocalTableNameArray(tableNameArray) tableNameArray = getLocalTableNameArray(tableNameArray)
status, err := r.setTTLQueryStatus(ctx, tableNameArray) status, err := r.setTTLQueryStatus(ctx, orgID, tableNameArray)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -2357,17 +2595,17 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
if err != nil { if err != nil {
return nil, err return nil, err
} }
ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0]) ttlQuery, err := r.checkTTLStatusItem(ctx, orgID, tableNameArray[0])
if err != nil { if err != nil {
return nil, err return nil, err
} }
ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours
if ttlQuery.ColdStorageTtl != -1 { if ttlQuery.ColdStorageTTL != -1 {
ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours ttlQuery.ColdStorageTTL = ttlQuery.ColdStorageTTL / 3600 // convert to hours
} }
delTTL, moveTTL := parseTTL(dbResp.EngineFull) delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil return &model.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil
default: default:
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v", return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v",

View File

@ -1870,8 +1870,15 @@ func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) {
return return
} }
ctx := r.Context()
claims, ok := authtypes.ClaimsFromContext(ctx)
if !ok {
RespondError(w, &model.ApiError{Err: errors.New("failed to get org id from context"), Typ: model.ErrorInternal}, nil)
return
}
// Context is not used here as TTL is long duration DB operation // Context is not used here as TTL is long duration DB operation
result, apiErr := aH.reader.SetTTL(context.Background(), ttlParams) result, apiErr := aH.reader.SetTTL(context.Background(), claims.OrgID, ttlParams)
if apiErr != nil { if apiErr != nil {
if apiErr.Typ == model.ErrorConflict { if apiErr.Typ == model.ErrorConflict {
aH.HandleError(w, apiErr.Err, http.StatusConflict) aH.HandleError(w, apiErr.Err, http.StatusConflict)
@ -1891,7 +1898,14 @@ func (aH *APIHandler) getTTL(w http.ResponseWriter, r *http.Request) {
return return
} }
result, apiErr := aH.reader.GetTTL(r.Context(), ttlParams) ctx := r.Context()
claims, ok := authtypes.ClaimsFromContext(ctx)
if !ok {
RespondError(w, &model.ApiError{Err: errors.New("failed to get org id from context"), Typ: model.ErrorInternal}, nil)
return
}
result, apiErr := aH.reader.GetTTL(r.Context(), claims.OrgID, ttlParams)
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) { if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
return return
} }

View File

@ -117,7 +117,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
} }
reader := clickhouseReader.NewReader( reader := clickhouseReader.NewReader(
serverOptions.SigNoz.SQLStore.SQLxDB(), serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore, serverOptions.SigNoz.TelemetryStore,
serverOptions.SigNoz.Prometheus, serverOptions.SigNoz.Prometheus,
fm, fm,

View File

@ -5,6 +5,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun" "github.com/uptrace/bun"
) )
@ -48,6 +49,7 @@ func (mds *ModelDaoSqlite) GetApdexSettings(ctx context.Context, orgID string, s
func (mds *ModelDaoSqlite) SetApdexSettings(ctx context.Context, orgID string, apdexSettings *types.ApdexSettings) *model.ApiError { func (mds *ModelDaoSqlite) SetApdexSettings(ctx context.Context, orgID string, apdexSettings *types.ApdexSettings) *model.ApiError {
// Set the org_id from the parameter since it's required for the foreign key constraint // Set the org_id from the parameter since it's required for the foreign key constraint
apdexSettings.OrgID = orgID apdexSettings.OrgID = orgID
apdexSettings.Identifiable.ID = valuer.GenerateUUID()
_, err := mds.bundb.NewInsert(). _, err := mds.bundb.NewInsert().
Model(apdexSettings). Model(apdexSettings).

View File

@ -22,7 +22,7 @@ type Reader interface {
GetServicesList(ctx context.Context) (*[]string, error) GetServicesList(ctx context.Context) (*[]string, error)
GetDependencyGraph(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) GetDependencyGraph(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error)
GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError)
// GetDisks returns a list of disks configured in the underlying DB. It is supported by // GetDisks returns a list of disks configured in the underlying DB. It is supported by
// clickhouse only. // clickhouse only.
@ -44,7 +44,7 @@ type Reader interface {
GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError) GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError)
// Setter Interfaces // Setter Interfaces
SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) SetTTL(ctx context.Context, orgID string, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error)
GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest, skipDotNames bool, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest, skipDotNames bool, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)

View File

@ -317,9 +317,9 @@ func createTelemetry() {
getLogsInfoInLastHeartBeatInterval, _ := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION) getLogsInfoInLastHeartBeatInterval, _ := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION)
traceTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.TraceTTL}) traceTTL, _ := telemetry.reader.GetTTL(ctx, "", &model.GetTTLParams{Type: constants.TraceTTL})
metricsTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.MetricsTTL}) metricsTTL, _ := telemetry.reader.GetTTL(ctx, "", &model.GetTTLParams{Type: constants.MetricsTTL})
logsTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.LogsTTL}) logsTTL, _ := telemetry.reader.GetTTL(ctx, "", &model.GetTTLParams{Type: constants.LogsTTL})
userCount, _ := telemetry.userCountCallback(ctx) userCount, _ := telemetry.userCountCallback(ctx)

View File

@ -293,7 +293,7 @@ func NewFilterSuggestionsTestBed(t *testing.T) *FilterSuggestionsTestBed {
testDB := utils.NewQueryServiceDBForTests(t) testDB := utils.NewQueryServiceDBForTests(t)
fm := featureManager.StartManager() fm := featureManager.StartManager()
reader, mockClickhouse := NewMockClickhouseReader(t, testDB.SQLxDB(), fm) reader, mockClickhouse := NewMockClickhouseReader(t, testDB, fm)
mockClickhouse.MatchExpectationsInOrder(false) mockClickhouse.MatchExpectationsInOrder(false)
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{ apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{

View File

@ -355,7 +355,7 @@ func NewCloudIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *CloudI
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
reader, mockClickhouse := NewMockClickhouseReader(t, testDB.SQLxDB(), fm) reader, mockClickhouse := NewMockClickhouseReader(t, testDB, fm)
mockClickhouse.MatchExpectationsInOrder(false) mockClickhouse.MatchExpectationsInOrder(false)
apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{ apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{

View File

@ -557,7 +557,7 @@ func NewIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *Integration
} }
fm := featureManager.StartManager() fm := featureManager.StartManager()
reader, mockClickhouse := NewMockClickhouseReader(t, testDB.SQLxDB(), fm) reader, mockClickhouse := NewMockClickhouseReader(t, testDB, fm)
mockClickhouse.MatchExpectationsInOrder(false) mockClickhouse.MatchExpectationsInOrder(false)
cloudIntegrationsController, err := cloudintegrations.NewController(testDB) cloudIntegrationsController, err := cloudintegrations.NewController(testDB)

View File

@ -23,12 +23,12 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/dao" "github.com/SigNoz/signoz/pkg/query-service/dao"
"github.com/SigNoz/signoz/pkg/query-service/interfaces" "github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest" "github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
mockhouse "github.com/srikanthccv/ClickHouse-go-mock" mockhouse "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -38,7 +38,7 @@ import (
var jwt = authtypes.NewJWT("secret", 1*time.Hour, 2*time.Hour) var jwt = authtypes.NewJWT("secret", 1*time.Hour, 2*time.Hour)
func NewMockClickhouseReader( func NewMockClickhouseReader(
t *testing.T, testDB *sqlx.DB, featureFlags interfaces.FeatureLookup, t *testing.T, testDB sqlstore.SQLStore, featureFlags interfaces.FeatureLookup,
) ( ) (
*clickhouseReader.ClickHouseReader, mockhouse.ClickConnMockCommon, *clickhouseReader.ClickHouseReader, mockhouse.ClickConnMockCommon,
) { ) {

View File

@ -67,6 +67,7 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM
sqlmigration.NewUpdatePatFactory(sqlstore), sqlmigration.NewUpdatePatFactory(sqlstore),
sqlmigration.NewUpdateAlertmanagerFactory(sqlstore), sqlmigration.NewUpdateAlertmanagerFactory(sqlstore),
sqlmigration.NewUpdatePreferencesFactory(sqlstore), sqlmigration.NewUpdatePreferencesFactory(sqlstore),
sqlmigration.NewUpdateApdexTtlFactory(sqlstore),
) )
} }

View File

@ -0,0 +1,233 @@
package sqlmigration
import (
"context"
"database/sql"
"fmt"
"reflect"
"time"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type updateApdexTtl struct {
store sqlstore.SQLStore
}
type existingApdexSettings struct {
bun.BaseModel `bun:"table:apdex_settings"`
OrgID string `bun:"org_id,pk,type:text" json:"orgId"`
ServiceName string `bun:"service_name,pk,type:text" json:"serviceName"`
Threshold float64 `bun:"threshold,type:float,notnull" json:"threshold"`
ExcludeStatusCodes string `bun:"exclude_status_codes,type:text,notnull" json:"excludeStatusCodes"`
}
type newApdexSettings struct {
bun.BaseModel `bun:"table:apdex_setting"`
types.Identifiable
OrgID string `bun:"org_id,type:text" json:"orgId"`
ServiceName string `bun:"service_name,type:text" json:"serviceName"`
Threshold float64 `bun:"threshold,type:float,notnull" json:"threshold"`
ExcludeStatusCodes string `bun:"exclude_status_codes,type:text,notnull" json:"excludeStatusCodes"`
}
type existingTTLStatus struct {
bun.BaseModel `bun:"table:ttl_status"`
ID int `bun:"id,pk,autoincrement"`
TransactionID string `bun:"transaction_id,type:text,notnull"`
CreatedAt time.Time `bun:"created_at,type:datetime,notnull"`
UpdatedAt time.Time `bun:"updated_at,type:datetime,notnull"`
TableName string `bun:"table_name,type:text,notnull"`
TTL int `bun:"ttl,notnull,default:0"`
ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"`
Status string `bun:"status,type:text,notnull"`
}
type newTTLStatus struct {
bun.BaseModel `bun:"table:ttl_setting"`
types.Identifiable
types.TimeAuditable
TransactionID string `bun:"transaction_id,type:text,notnull"`
TableName string `bun:"table_name,type:text,notnull"`
TTL int `bun:"ttl,notnull,default:0"`
ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"`
Status string `bun:"status,type:text,notnull"`
OrgID string `json:"-" bun:"org_id,notnull"`
}
func NewUpdateApdexTtlFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
return factory.
NewProviderFactory(
factory.MustNewName("update_apdex_ttl"),
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return newUpdateApdexTtl(ctx, ps, c, sqlstore)
})
}
func newUpdateApdexTtl(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) {
return &updateApdexTtl{store: store}, nil
}
func (migration *updateApdexTtl) Register(migrations *migrate.Migrations) error {
if err := migrations.
Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *updateApdexTtl) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.
BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
err = migration.
store.
Dialect().
RenameTableAndModifyModel(ctx, tx, new(existingApdexSettings), new(newApdexSettings), func(ctx context.Context) error {
existingApdexSettings := make([]*existingApdexSettings, 0)
err = tx.
NewSelect().
Model(&existingApdexSettings).
Scan(ctx)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
if err == nil && len(existingApdexSettings) > 0 {
newSettings := migration.
CopyExistingApdexSettingsToNewApdexSettings(existingApdexSettings)
_, err = tx.
NewInsert().
Model(&newSettings).
Exec(ctx)
if err != nil {
return err
}
}
tableName := tx.Dialect().Tables().Get(reflect.TypeOf(new(newApdexSettings))).Name
_, err = tx.
ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX IF NOT EXISTS %s_unique_idx ON %s (service_name, org_id)", tableName, tableName))
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
err = migration.
store.
Dialect().
RenameTableAndModifyModel(ctx, tx, new(existingTTLStatus), new(newTTLStatus), func(ctx context.Context) error {
existingTTLStatus := make([]*existingTTLStatus, 0)
err = tx.
NewSelect().
Model(&existingTTLStatus).
Scan(ctx)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
if err == nil && len(existingTTLStatus) > 0 {
var orgID string
err := migration.
store.
BunDB().
NewSelect().
Model((*types.Organization)(nil)).
Column("id").
Scan(ctx, &orgID)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
if err == nil {
newTTLStatus := migration.
CopyExistingTTLStatusToNewTTLStatus(existingTTLStatus, orgID)
_, err = tx.
NewInsert().
Model(&newTTLStatus).
Exec(ctx)
if err != nil {
return err
}
}
}
return nil
})
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
}
return nil
}
func (migration *updateApdexTtl) Down(context.Context, *bun.DB) error {
return nil
}
func (migration *updateApdexTtl) CopyExistingApdexSettingsToNewApdexSettings(existingApdexSettings []*existingApdexSettings) []*newApdexSettings {
newSettings := make([]*newApdexSettings, 0)
for _, apdexSetting := range existingApdexSettings {
newSettings = append(newSettings, &newApdexSettings{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
ServiceName: apdexSetting.ServiceName,
Threshold: apdexSetting.Threshold,
ExcludeStatusCodes: apdexSetting.ExcludeStatusCodes,
OrgID: apdexSetting.OrgID,
})
}
return newSettings
}
func (migration *updateApdexTtl) CopyExistingTTLStatusToNewTTLStatus(existingTTLStatus []*existingTTLStatus, orgID string) []*newTTLStatus {
newTTLStatuses := make([]*newTTLStatus, 0)
for _, ttl := range existingTTLStatus {
newTTLStatuses = append(newTTLStatuses, &newTTLStatus{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
TimeAuditable: types.TimeAuditable{
CreatedAt: ttl.CreatedAt,
UpdatedAt: ttl.UpdatedAt,
},
TransactionID: ttl.TransactionID,
TTL: ttl.TTL,
TableName: ttl.TableName,
ColdStorageTTL: ttl.ColdStorageTTL,
Status: ttl.Status,
OrgID: orgID,
})
}
return newTTLStatuses
}

View File

@ -94,15 +94,14 @@ type PlannedMaintenance struct {
UpdatedBy string `bun:"updated_by,type:text,notnull"` UpdatedBy string `bun:"updated_by,type:text,notnull"`
} }
type TTLStatus struct { type TTLSetting struct {
bun.BaseModel `bun:"table:ttl_status"` bun.BaseModel `bun:"table:ttl_setting"`
Identifiable
ID int `bun:"id,pk,autoincrement"` TimeAuditable
TransactionID string `bun:"transaction_id,type:text,notnull"` TransactionID string `bun:"transaction_id,type:text,notnull"`
CreatedAt time.Time `bun:"created_at,type:datetime,notnull"` TableName string `bun:"table_name,type:text,notnull"`
UpdatedAt time.Time `bun:"updated_at,type:datetime,notnull"` TTL int `bun:"ttl,notnull,default:0"`
TableName string `bun:"table_name,type:text,notnull"` ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"`
TTL int `bun:"ttl,notnull,default:0"` Status string `bun:"status,type:text,notnull"`
ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"` OrgID string `json:"-" bun:"org_id,notnull"`
Status string `bun:"status,type:text,notnull"`
} }

View File

@ -7,7 +7,6 @@ import (
// TODO: check constraints are not working // TODO: check constraints are not working
type Organization struct { type Organization struct {
bun.BaseModel `bun:"table:organizations"` bun.BaseModel `bun:"table:organizations"`
TimeAuditable TimeAuditable
ID string `bun:"id,pk,type:text" json:"id"` ID string `bun:"id,pk,type:text" json:"id"`
Name string `bun:"name,type:text,notnull" json:"name"` Name string `bun:"name,type:text,notnull" json:"name"`
@ -16,8 +15,10 @@ type Organization struct {
} }
type ApdexSettings struct { type ApdexSettings struct {
OrgID string `bun:"org_id,pk,type:text" json:"orgId"` bun.BaseModel `bun:"table:apdex_setting"`
ServiceName string `bun:"service_name,pk,type:text" json:"serviceName"` Identifiable
OrgID string `bun:"org_id,type:text" json:"orgId"`
ServiceName string `bun:"service_name,type:text" json:"serviceName"`
Threshold float64 `bun:"threshold,type:float,notnull" json:"threshold"` Threshold float64 `bun:"threshold,type:float,notnull" json:"threshold"`
ExcludeStatusCodes string `bun:"exclude_status_codes,type:text,notnull" json:"excludeStatusCodes"` ExcludeStatusCodes string `bun:"exclude_status_codes,type:text,notnull" json:"excludeStatusCodes"`
} }