From 0116eb20ab2b2229fa847c2ccf54a464cfe6f085 Mon Sep 17 00:00:00 2001 From: Vikrant Gupta Date: Fri, 4 Apr 2025 01:36:47 +0530 Subject: [PATCH] feat(sqlmigration): update apdex and TTL status tables (#7517) * feat(sqlmigration): update the alertmanager tables * feat(sqlmigration): update the alertmanager tables * feat(sqlmigration): make the preference package multi tenant * feat(preference): address nit pick comments * feat(preference): added the cascade delete for preferences * feat(sqlmigration): update apdex and TTL status tables (#7481) * feat(sqlmigration): update the apdex and ttl tables * feat(sqlmigration): register the new migration and rename table * feat(sqlmigration): fix the ttl queries * feat(sqlmigration): update the TTL and apdex tables * feat(sqlmigration): update the TTL and apdex tables --- ee/query-service/app/db/reader.go | 11 +- ee/query-service/app/server.go | 2 +- .../app/clickhouseReader/reader.go | 430 ++++++++++++++---- pkg/query-service/app/http_handler.go | 18 +- pkg/query-service/app/server.go | 2 +- pkg/query-service/dao/sqlite/apdex.go | 2 + pkg/query-service/interfaces/interface.go | 4 +- pkg/query-service/telemetry/telemetry.go | 6 +- .../integration/filter_suggestions_test.go | 2 +- .../signoz_cloud_integrations_test.go | 2 +- .../integration/signoz_integrations_test.go | 2 +- .../tests/integration/test_utils.go | 4 +- pkg/signoz/provider.go | 1 + pkg/sqlmigration/023_update_apdex_ttl.go | 233 ++++++++++ pkg/types/dashboard.go | 21 +- pkg/types/organization.go | 7 +- 16 files changed, 617 insertions(+), 130 deletions(-) create mode 100644 pkg/sqlmigration/023_update_apdex_ttl.go diff --git a/ee/query-service/app/db/reader.go b/ee/query-service/app/db/reader.go index 5a0c54c51d..7775908d48 100644 --- a/ee/query-service/app/db/reader.go +++ b/ee/query-service/app/db/reader.go @@ -5,23 +5,22 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" - "github.com/jmoiron/sqlx" - "github.com/SigNoz/signoz/pkg/cache" "github.com/SigNoz/signoz/pkg/prometheus" basechr "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader" "github.com/SigNoz/signoz/pkg/query-service/interfaces" + "github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/telemetrystore" ) type ClickhouseReader struct { conn clickhouse.Conn - appdb *sqlx.DB + appdb sqlstore.SQLStore *basechr.ClickHouseReader } func NewDataConnector( - localDB *sqlx.DB, + sqlDB sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, lm interfaces.FeatureLookup, @@ -31,10 +30,10 @@ func NewDataConnector( fluxIntervalForTraceDetail time.Duration, cache cache.Cache, ) *ClickhouseReader { - chReader := basechr.NewReader(localDB, telemetryStore, prometheus, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) + chReader := basechr.NewReader(sqlDB, telemetryStore, prometheus, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) return &ClickhouseReader{ conn: telemetryStore.ClickhouseDB(), - appdb: localDB, + appdb: sqlDB, ClickHouseReader: chReader, } } diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 9c77f8abae..79d07aa65d 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -139,7 +139,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { } reader := db.NewDataConnector( - serverOptions.SigNoz.SQLStore.SQLxDB(), + serverOptions.SigNoz.SQLStore, serverOptions.SigNoz.TelemetryStore, serverOptions.SigNoz.Prometheus, lm, diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 1d78edcbd7..5bad4d2165 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -17,7 +17,11 @@ import ( "github.com/SigNoz/signoz/pkg/prometheus" "github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer" + "github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/telemetrystore" + "github.com/SigNoz/signoz/pkg/types" + "github.com/SigNoz/signoz/pkg/valuer" + "github.com/uptrace/bun" "honnef.co/go/tools/config" "github.com/google/uuid" @@ -31,7 +35,6 @@ import ( "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/SigNoz/signoz/pkg/cache" "github.com/SigNoz/signoz/pkg/types/authtypes" - "github.com/jmoiron/sqlx" "go.uber.org/zap" @@ -115,7 +118,7 @@ var ( type ClickHouseReader struct { db clickhouse.Conn prometheus prometheus.Prometheus - localDB *sqlx.DB + sqlDB sqlstore.SQLStore TraceDB string operationsTable string durationTable string @@ -166,7 +169,7 @@ type ClickHouseReader struct { // NewTraceReader returns a TraceReader for the database func NewReader( - localDB *sqlx.DB, + sqlDB sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, featureFlag interfaces.FeatureLookup, @@ -177,12 +180,12 @@ func NewReader( cache cache.Cache, ) *ClickHouseReader { options := NewOptions(primaryNamespace, archiveNamespace) - return NewReaderFromClickhouseConnection(options, localDB, telemetryStore, prometheus, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) + return NewReaderFromClickhouseConnection(options, sqlDB, telemetryStore, prometheus, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) } func NewReaderFromClickhouseConnection( options *Options, - localDB *sqlx.DB, + sqlDB sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, featureFlag interfaces.FeatureLookup, @@ -209,7 +212,7 @@ func NewReaderFromClickhouseConnection( return &ClickHouseReader{ db: telemetryStore.ClickhouseDB(), prometheus: prometheus, - localDB: localDB, + sqlDB: sqlDB, TraceDB: options.primary.TraceDB, operationsTable: options.primary.OperationsTable, indexTable: options.primary.IndexTable, @@ -1685,9 +1688,7 @@ func getLocalTableName(tableName string) string { } -func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { - // Keep only latest 100 transactions/requests - r.deleteTtlTransactions(ctx, 100) +func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { // uuid is used as transaction id uuidWithHyphen := uuid.New() uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) @@ -1701,7 +1702,7 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa // check if there is existing things to be done for _, tableName := range tableNameArray { - statusItem, err := r.checkTTLStatusItem(ctx, tableName) + statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) if err != nil { return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} } @@ -1743,7 +1744,27 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa // we will change ttl for only the new parts and not the old ones query += " SETTINGS materialize_ttl_after_modify=0" - _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) + ttl := types.TTLSetting{ + Identifiable: types.Identifiable{ + ID: valuer.GenerateUUID(), + }, + TimeAuditable: types.TimeAuditable{ + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + TransactionID: uuid, + TableName: tableName, + TTL: int(params.DelDuration), + Status: constants.StatusPending, + ColdStorageTTL: coldStorageDuration, + OrgID: orgID, + } + _, dbErr := r. + sqlDB. + BunDB(). + NewInsert(). + Model(&ttl). + Exec(ctx) if dbErr != nil { zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr)) return @@ -1752,9 +1773,17 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) if err != nil { zap.L().Error("error in setting cold storage", zap.Error(err)) - statusItem, err := r.checkTTLStatusItem(ctx, tableName) + statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) if err == nil { - _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + _, dbErr := r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusFailed). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return @@ -1763,17 +1792,33 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa return } zap.L().Info("Executing TTL request: ", zap.String("request", query)) - statusItem, _ := r.checkTTLStatusItem(ctx, tableName) + statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName) if err := r.db.Exec(ctx, query); err != nil { zap.L().Error("error while setting ttl", zap.Error(err)) - _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + _, dbErr := r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusFailed). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return } return } - _, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) + _, dbErr = r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusSuccess). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return @@ -1784,7 +1829,7 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, params *model.TTLPa return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil } -func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { +func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { // uuid is used as transaction id uuidWithHyphen := uuid.New() uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) @@ -1804,7 +1849,7 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL // check if there is existing things to be done for _, tableName := range tableNames { - statusItem, err := r.checkTTLStatusItem(ctx, tableName) + statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) if err != nil { return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} } @@ -1831,11 +1876,32 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL timestamp = "end" } - _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) + ttl := types.TTLSetting{ + Identifiable: types.Identifiable{ + ID: valuer.GenerateUUID(), + }, + TimeAuditable: types.TimeAuditable{ + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + TransactionID: uuid, + TableName: tableName, + TTL: int(params.DelDuration), + Status: constants.StatusPending, + ColdStorageTTL: coldStorageDuration, + OrgID: orgID, + } + _, dbErr := r. + sqlDB. + BunDB(). + NewInsert(). + Model(&ttl). + Exec(ctx) if dbErr != nil { - zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr)) + zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr)) return } + req := fmt.Sprintf(ttlV2, tableName, r.cluster, timestamp, params.DelDuration) if strings.HasSuffix(distributedTableName, r.traceResourceTableV3) { req = fmt.Sprintf(ttlV2Resource, tableName, r.cluster, params.DelDuration) @@ -1851,9 +1917,17 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) if err != nil { zap.L().Error("Error in setting cold storage", zap.Error(err)) - statusItem, err := r.checkTTLStatusItem(ctx, tableName) + statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) if err == nil { - _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + _, dbErr := r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusFailed). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return @@ -1863,17 +1937,33 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL } req += " SETTINGS materialize_ttl_after_modify=0;" zap.L().Error(" ExecutingTTL request: ", zap.String("request", req)) - statusItem, _ := r.checkTTLStatusItem(ctx, tableName) + statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName) if err := r.db.Exec(ctx, req); err != nil { zap.L().Error("Error in executing set TTL query", zap.Error(err)) - _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + _, dbErr := r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusFailed). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return } return } - _, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) + _, dbErr = r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusSuccess). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return @@ -1886,10 +1976,10 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL // SetTTL sets the TTL for traces or metrics or logs tables. // This is an async API which creates goroutines to set TTL. // Status of TTL update is tracked with ttl_status table in sqlite db. -func (r *ClickHouseReader) SetTTL(ctx context.Context, +func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { // Keep only latest 100 transactions/requests - r.deleteTtlTransactions(ctx, 100) + r.deleteTtlTransactions(ctx, orgID, 100) // uuid is used as transaction id uuidWithHyphen := uuid.New() uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) @@ -1902,7 +1992,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, switch params.Type { case constants.TraceTTL: if r.useTraceNewSchema { - return r.SetTTLTracesV2(ctx, params) + return r.SetTTLTracesV2(ctx, orgID, params) } tableNames := []string{ @@ -1915,7 +2005,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } for _, tableName := range tableNames { tableName := getLocalTableName(tableName) - statusItem, err := r.checkTTLStatusItem(ctx, tableName) + statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) if err != nil { return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} } @@ -1927,9 +2017,29 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, tableName := getLocalTableName(tableName) // TODO: DB queries should be implemented with transactional statements but currently clickhouse doesn't support them. Issue: https://github.com/ClickHouse/ClickHouse/issues/22086 go func(tableName string) { - _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) + ttl := types.TTLSetting{ + Identifiable: types.Identifiable{ + ID: valuer.GenerateUUID(), + }, + TimeAuditable: types.TimeAuditable{ + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + TransactionID: uuid, + TableName: tableName, + TTL: int(params.DelDuration), + Status: constants.StatusPending, + ColdStorageTTL: coldStorageDuration, + OrgID: orgID, + } + _, dbErr := r. + sqlDB. + BunDB(). + NewInsert(). + Model(&ttl). + Exec(ctx) if dbErr != nil { - zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr)) + zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr)) return } req := fmt.Sprintf( @@ -1942,9 +2052,17 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) if err != nil { zap.L().Error("Error in setting cold storage", zap.Error(err)) - statusItem, err := r.checkTTLStatusItem(ctx, tableName) + statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) if err == nil { - _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + _, dbErr := r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusFailed). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return @@ -1954,17 +2072,33 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } req += " SETTINGS materialize_ttl_after_modify=0;" zap.L().Error("Executing TTL request: ", zap.String("request", req)) - statusItem, _ := r.checkTTLStatusItem(ctx, tableName) + statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName) if err := r.db.Exec(context.Background(), req); err != nil { zap.L().Error("Error in executing set TTL query", zap.Error(err)) - _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + _, dbErr := r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusFailed). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return } return } - _, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) + _, dbErr = r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusSuccess). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return @@ -1984,7 +2118,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, signozMetricDBName + "." + signozTSLocalTableNameV41Week, } for _, tableName := range tableNames { - statusItem, err := r.checkTTLStatusItem(ctx, tableName) + statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) if err != nil { return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} } @@ -1993,9 +2127,29 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } } metricTTL := func(tableName string) { - _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) + ttl := types.TTLSetting{ + Identifiable: types.Identifiable{ + ID: valuer.GenerateUUID(), + }, + TimeAuditable: types.TimeAuditable{ + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + TransactionID: uuid, + TableName: tableName, + TTL: int(params.DelDuration), + Status: constants.StatusPending, + ColdStorageTTL: coldStorageDuration, + OrgID: orgID, + } + _, dbErr := r. + sqlDB. + BunDB(). + NewInsert(). + Model(&ttl). + Exec(ctx) if dbErr != nil { - zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr)) + zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr)) return } timeColumn := "timestamp_ms" @@ -2014,9 +2168,17 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) if err != nil { zap.L().Error("Error in setting cold storage", zap.Error(err)) - statusItem, err := r.checkTTLStatusItem(ctx, tableName) + statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) if err == nil { - _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + _, dbErr := r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusFailed). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return @@ -2026,17 +2188,33 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } req += " SETTINGS materialize_ttl_after_modify=0" zap.L().Info("Executing TTL request: ", zap.String("request", req)) - statusItem, _ := r.checkTTLStatusItem(ctx, tableName) + statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName) if err := r.db.Exec(ctx, req); err != nil { zap.L().Error("error while setting ttl.", zap.Error(err)) - _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + _, dbErr := r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusFailed). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return } return } - _, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) + _, dbErr = r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusSuccess). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return @@ -2047,11 +2225,11 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } case constants.LogsTTL: if r.useLogsNewSchema { - return r.SetTTLLogsV2(ctx, params) + return r.SetTTLLogsV2(ctx, orgID, params) } tableName := r.logsDB + "." + r.logsLocalTable - statusItem, err := r.checkTTLStatusItem(ctx, tableName) + statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) if err != nil { return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} } @@ -2059,7 +2237,27 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} } go func(tableName string) { - _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) + ttl := types.TTLSetting{ + Identifiable: types.Identifiable{ + ID: valuer.GenerateUUID(), + }, + TimeAuditable: types.TimeAuditable{ + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + TransactionID: uuid, + TableName: tableName, + TTL: int(params.DelDuration), + Status: constants.StatusPending, + ColdStorageTTL: coldStorageDuration, + OrgID: orgID, + } + _, dbErr := r. + sqlDB. + BunDB(). + NewInsert(). + Model(&ttl). + Exec(ctx) if dbErr != nil { zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr)) return @@ -2075,9 +2273,17 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) if err != nil { zap.L().Error("error in setting cold storage", zap.Error(err)) - statusItem, err := r.checkTTLStatusItem(ctx, tableName) + statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) if err == nil { - _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + _, dbErr := r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusFailed). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return @@ -2087,17 +2293,33 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } req += " SETTINGS materialize_ttl_after_modify=0" zap.L().Info("Executing TTL request: ", zap.String("request", req)) - statusItem, _ := r.checkTTLStatusItem(ctx, tableName) + statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName) if err := r.db.Exec(ctx, req); err != nil { zap.L().Error("error while setting ttl", zap.Error(err)) - _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + _, dbErr := r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusFailed). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return } return } - _, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) + _, dbErr = r. + sqlDB. + BunDB(). + NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", constants.StatusSuccess). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) if dbErr != nil { zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return @@ -2112,47 +2334,63 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil } -func (r *ClickHouseReader) deleteTtlTransactions(_ context.Context, numberOfTransactionsStore int) { - _, err := r.localDB.Exec("DELETE FROM ttl_status WHERE transaction_id NOT IN (SELECT distinct transaction_id FROM ttl_status ORDER BY created_at DESC LIMIT ?)", numberOfTransactionsStore) +func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, orgID string, numberOfTransactionsStore int) { + limitTransactions := []string{} + err := r. + sqlDB. + BunDB(). + NewSelect(). + ColumnExpr("distinct(transaction_id)"). + Model(new(types.TTLSetting)). + Where("org_id = ?", orgID). + OrderExpr("created_at DESC"). + Limit(numberOfTransactionsStore). + Scan(ctx, &limitTransactions) + + if err != nil { + zap.L().Error("Error in processing ttl_status delete sql query", zap.Error(err)) + } + + _, err = r. + sqlDB. + BunDB(). + NewDelete(). + Model(new(types.TTLSetting)). + Where("transaction_id NOT IN (?)", bun.In(limitTransactions)). + Exec(ctx) if err != nil { zap.L().Error("Error in processing ttl_status delete sql query", zap.Error(err)) } } // checkTTLStatusItem checks if ttl_status table has an entry for the given table name -func (r *ClickHouseReader) checkTTLStatusItem(_ context.Context, tableName string) (model.TTLStatusItem, *model.ApiError) { - statusItem := []model.TTLStatusItem{} - - query := `SELECT id, status, ttl, cold_storage_ttl FROM ttl_status WHERE table_name = ? ORDER BY created_at DESC` - - zap.L().Info("checkTTLStatusItem query", zap.String("query", query), zap.String("tableName", tableName)) - - stmt, err := r.localDB.Preparex(query) - - if err != nil { - zap.L().Error("Error preparing query for checkTTLStatusItem", zap.Error(err)) - return model.TTLStatusItem{}, &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - - err = stmt.Select(&statusItem, tableName) - - if len(statusItem) == 0 { - return model.TTLStatusItem{}, nil - } - if err != nil { +func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, orgID string, tableName string) (*types.TTLSetting, *model.ApiError) { + zap.L().Info("checkTTLStatusItem query", zap.String("tableName", tableName)) + ttl := new(types.TTLSetting) + err := r. + sqlDB. + BunDB(). + NewSelect(). + Model(ttl). + Where("table_name = ?", tableName). + Where("org_id = ?", orgID). + OrderExpr("created_at DESC"). + Limit(1). + Scan(ctx) + if err != nil && err != sql.ErrNoRows { zap.L().Error("Error in processing sql query", zap.Error(err)) - return model.TTLStatusItem{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} + return ttl, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} } - return statusItem[0], nil + return ttl, nil } // setTTLQueryStatus fetches ttl_status table status from DB -func (r *ClickHouseReader) setTTLQueryStatus(ctx context.Context, tableNameArray []string) (string, *model.ApiError) { +func (r *ClickHouseReader) setTTLQueryStatus(ctx context.Context, orgID string, tableNameArray []string) (string, *model.ApiError) { failFlag := false status := constants.StatusSuccess for _, tableName := range tableNameArray { - statusItem, err := r.checkTTLStatusItem(ctx, tableName) - emptyStatusStruct := model.TTLStatusItem{} + statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) + emptyStatusStruct := new(types.TTLSetting) if statusItem == emptyStatusStruct { return "", nil } @@ -2213,7 +2451,7 @@ func getLocalTableNameArray(tableNames []string) []string { } // GetTTL returns current ttl, expected ttl and past setTTL status for metrics/traces. -func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) { +func (r *ClickHouseReader) GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) { parseTTL := func(queryResp string) (int, int) { @@ -2303,7 +2541,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable} tableNameArray = getLocalTableNameArray(tableNameArray) - status, err := r.setTTLQueryStatus(ctx, tableNameArray) + status, err := r.setTTLQueryStatus(ctx, orgID, tableNameArray) if err != nil { return nil, err } @@ -2311,22 +2549,22 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa if err != nil { return nil, err } - ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0]) + ttlQuery, err := r.checkTTLStatusItem(ctx, orgID, tableNameArray[0]) if err != nil { return nil, err } ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours - if ttlQuery.ColdStorageTtl != -1 { - ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours + if ttlQuery.ColdStorageTTL != -1 { + ttlQuery.ColdStorageTTL = ttlQuery.ColdStorageTTL / 3600 // convert to hours } delTTL, moveTTL := parseTTL(dbResp.EngineFull) - return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil + return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil case constants.MetricsTTL: tableNameArray := []string{signozMetricDBName + "." + signozSampleTableName} tableNameArray = getLocalTableNameArray(tableNameArray) - status, err := r.setTTLQueryStatus(ctx, tableNameArray) + status, err := r.setTTLQueryStatus(ctx, orgID, tableNameArray) if err != nil { return nil, err } @@ -2334,22 +2572,22 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa if err != nil { return nil, err } - ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0]) + ttlQuery, err := r.checkTTLStatusItem(ctx, orgID, tableNameArray[0]) if err != nil { return nil, err } ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours - if ttlQuery.ColdStorageTtl != -1 { - ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours + if ttlQuery.ColdStorageTTL != -1 { + ttlQuery.ColdStorageTTL = ttlQuery.ColdStorageTTL / 3600 // convert to hours } delTTL, moveTTL := parseTTL(dbResp.EngineFull) - return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil + return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil case constants.LogsTTL: - tableNameArray := []string{r.logsDB + "." + r.logsTable} + tableNameArray := []string{r.logsDB + "." + r.logsTableName} tableNameArray = getLocalTableNameArray(tableNameArray) - status, err := r.setTTLQueryStatus(ctx, tableNameArray) + status, err := r.setTTLQueryStatus(ctx, orgID, tableNameArray) if err != nil { return nil, err } @@ -2357,17 +2595,17 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa if err != nil { return nil, err } - ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0]) + ttlQuery, err := r.checkTTLStatusItem(ctx, orgID, tableNameArray[0]) if err != nil { return nil, err } ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours - if ttlQuery.ColdStorageTtl != -1 { - ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours + if ttlQuery.ColdStorageTTL != -1 { + ttlQuery.ColdStorageTTL = ttlQuery.ColdStorageTTL / 3600 // convert to hours } delTTL, moveTTL := parseTTL(dbResp.EngineFull) - return &model.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil + return &model.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTTL, Status: status}, nil default: return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v", diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index cae4367707..91af39f57e 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -1870,8 +1870,15 @@ func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) { return } + ctx := r.Context() + claims, ok := authtypes.ClaimsFromContext(ctx) + if !ok { + RespondError(w, &model.ApiError{Err: errors.New("failed to get org id from context"), Typ: model.ErrorInternal}, nil) + return + } + // Context is not used here as TTL is long duration DB operation - result, apiErr := aH.reader.SetTTL(context.Background(), ttlParams) + result, apiErr := aH.reader.SetTTL(context.Background(), claims.OrgID, ttlParams) if apiErr != nil { if apiErr.Typ == model.ErrorConflict { aH.HandleError(w, apiErr.Err, http.StatusConflict) @@ -1891,7 +1898,14 @@ func (aH *APIHandler) getTTL(w http.ResponseWriter, r *http.Request) { return } - result, apiErr := aH.reader.GetTTL(r.Context(), ttlParams) + ctx := r.Context() + claims, ok := authtypes.ClaimsFromContext(ctx) + if !ok { + RespondError(w, &model.ApiError{Err: errors.New("failed to get org id from context"), Typ: model.ErrorInternal}, nil) + return + } + + result, apiErr := aH.reader.GetTTL(r.Context(), claims.OrgID, ttlParams) if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) { return } diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index ebf5c743c8..ff7ec2eba6 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -117,7 +117,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { } reader := clickhouseReader.NewReader( - serverOptions.SigNoz.SQLStore.SQLxDB(), + serverOptions.SigNoz.SQLStore, serverOptions.SigNoz.TelemetryStore, serverOptions.SigNoz.Prometheus, fm, diff --git a/pkg/query-service/dao/sqlite/apdex.go b/pkg/query-service/dao/sqlite/apdex.go index 79c1697739..5cded04086 100644 --- a/pkg/query-service/dao/sqlite/apdex.go +++ b/pkg/query-service/dao/sqlite/apdex.go @@ -5,6 +5,7 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/types" + "github.com/SigNoz/signoz/pkg/valuer" "github.com/uptrace/bun" ) @@ -48,6 +49,7 @@ func (mds *ModelDaoSqlite) GetApdexSettings(ctx context.Context, orgID string, s func (mds *ModelDaoSqlite) SetApdexSettings(ctx context.Context, orgID string, apdexSettings *types.ApdexSettings) *model.ApiError { // Set the org_id from the parameter since it's required for the foreign key constraint apdexSettings.OrgID = orgID + apdexSettings.Identifiable.ID = valuer.GenerateUUID() _, err := mds.bundb.NewInsert(). Model(apdexSettings). diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 5fa225d7c6..4b7b288b1b 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -22,7 +22,7 @@ type Reader interface { GetServicesList(ctx context.Context) (*[]string, error) GetDependencyGraph(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) - GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) + GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) // GetDisks returns a list of disks configured in the underlying DB. It is supported by // clickhouse only. @@ -44,7 +44,7 @@ type Reader interface { GetFlamegraphSpansForTrace(ctx context.Context, traceID string, req *model.GetFlamegraphSpansForTraceParams) (*model.GetFlamegraphSpansForTraceResponse, *model.ApiError) // Setter Interfaces - SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) + SetTTL(ctx context.Context, orgID string, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest, skipDotNames bool, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) diff --git a/pkg/query-service/telemetry/telemetry.go b/pkg/query-service/telemetry/telemetry.go index 404292f39d..de36970c68 100644 --- a/pkg/query-service/telemetry/telemetry.go +++ b/pkg/query-service/telemetry/telemetry.go @@ -317,9 +317,9 @@ func createTelemetry() { getLogsInfoInLastHeartBeatInterval, _ := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION) - traceTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.TraceTTL}) - metricsTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.MetricsTTL}) - logsTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.LogsTTL}) + traceTTL, _ := telemetry.reader.GetTTL(ctx, "", &model.GetTTLParams{Type: constants.TraceTTL}) + metricsTTL, _ := telemetry.reader.GetTTL(ctx, "", &model.GetTTLParams{Type: constants.MetricsTTL}) + logsTTL, _ := telemetry.reader.GetTTL(ctx, "", &model.GetTTLParams{Type: constants.LogsTTL}) userCount, _ := telemetry.userCountCallback(ctx) diff --git a/pkg/query-service/tests/integration/filter_suggestions_test.go b/pkg/query-service/tests/integration/filter_suggestions_test.go index ba1178ff6e..981a20a1f9 100644 --- a/pkg/query-service/tests/integration/filter_suggestions_test.go +++ b/pkg/query-service/tests/integration/filter_suggestions_test.go @@ -293,7 +293,7 @@ func NewFilterSuggestionsTestBed(t *testing.T) *FilterSuggestionsTestBed { testDB := utils.NewQueryServiceDBForTests(t) fm := featureManager.StartManager() - reader, mockClickhouse := NewMockClickhouseReader(t, testDB.SQLxDB(), fm) + reader, mockClickhouse := NewMockClickhouseReader(t, testDB, fm) mockClickhouse.MatchExpectationsInOrder(false) apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{ diff --git a/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go b/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go index ee0dcbf668..e7bf435293 100644 --- a/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go +++ b/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go @@ -355,7 +355,7 @@ func NewCloudIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *CloudI } fm := featureManager.StartManager() - reader, mockClickhouse := NewMockClickhouseReader(t, testDB.SQLxDB(), fm) + reader, mockClickhouse := NewMockClickhouseReader(t, testDB, fm) mockClickhouse.MatchExpectationsInOrder(false) apiHandler, err := app.NewAPIHandler(app.APIHandlerOpts{ diff --git a/pkg/query-service/tests/integration/signoz_integrations_test.go b/pkg/query-service/tests/integration/signoz_integrations_test.go index e6c1b36097..1413f80147 100644 --- a/pkg/query-service/tests/integration/signoz_integrations_test.go +++ b/pkg/query-service/tests/integration/signoz_integrations_test.go @@ -557,7 +557,7 @@ func NewIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *Integration } fm := featureManager.StartManager() - reader, mockClickhouse := NewMockClickhouseReader(t, testDB.SQLxDB(), fm) + reader, mockClickhouse := NewMockClickhouseReader(t, testDB, fm) mockClickhouse.MatchExpectationsInOrder(false) cloudIntegrationsController, err := cloudintegrations.NewController(testDB) diff --git a/pkg/query-service/tests/integration/test_utils.go b/pkg/query-service/tests/integration/test_utils.go index 274dec0a31..b245e13d7d 100644 --- a/pkg/query-service/tests/integration/test_utils.go +++ b/pkg/query-service/tests/integration/test_utils.go @@ -23,12 +23,12 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/dao" "github.com/SigNoz/signoz/pkg/query-service/interfaces" "github.com/SigNoz/signoz/pkg/query-service/model" + "github.com/SigNoz/signoz/pkg/sqlstore" "github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest" "github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/google/uuid" - "github.com/jmoiron/sqlx" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" mockhouse "github.com/srikanthccv/ClickHouse-go-mock" "github.com/stretchr/testify/require" @@ -38,7 +38,7 @@ import ( var jwt = authtypes.NewJWT("secret", 1*time.Hour, 2*time.Hour) func NewMockClickhouseReader( - t *testing.T, testDB *sqlx.DB, featureFlags interfaces.FeatureLookup, + t *testing.T, testDB sqlstore.SQLStore, featureFlags interfaces.FeatureLookup, ) ( *clickhouseReader.ClickHouseReader, mockhouse.ClickConnMockCommon, ) { diff --git a/pkg/signoz/provider.go b/pkg/signoz/provider.go index 1d69e81f5e..7af36f7830 100644 --- a/pkg/signoz/provider.go +++ b/pkg/signoz/provider.go @@ -67,6 +67,7 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM sqlmigration.NewUpdatePatFactory(sqlstore), sqlmigration.NewUpdateAlertmanagerFactory(sqlstore), sqlmigration.NewUpdatePreferencesFactory(sqlstore), + sqlmigration.NewUpdateApdexTtlFactory(sqlstore), ) } diff --git a/pkg/sqlmigration/023_update_apdex_ttl.go b/pkg/sqlmigration/023_update_apdex_ttl.go new file mode 100644 index 0000000000..02294d5c3e --- /dev/null +++ b/pkg/sqlmigration/023_update_apdex_ttl.go @@ -0,0 +1,233 @@ +package sqlmigration + +import ( + "context" + "database/sql" + "fmt" + "reflect" + "time" + + "github.com/SigNoz/signoz/pkg/factory" + "github.com/SigNoz/signoz/pkg/sqlstore" + "github.com/SigNoz/signoz/pkg/types" + "github.com/SigNoz/signoz/pkg/valuer" + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" +) + +type updateApdexTtl struct { + store sqlstore.SQLStore +} + +type existingApdexSettings struct { + bun.BaseModel `bun:"table:apdex_settings"` + OrgID string `bun:"org_id,pk,type:text" json:"orgId"` + ServiceName string `bun:"service_name,pk,type:text" json:"serviceName"` + Threshold float64 `bun:"threshold,type:float,notnull" json:"threshold"` + ExcludeStatusCodes string `bun:"exclude_status_codes,type:text,notnull" json:"excludeStatusCodes"` +} + +type newApdexSettings struct { + bun.BaseModel `bun:"table:apdex_setting"` + types.Identifiable + OrgID string `bun:"org_id,type:text" json:"orgId"` + ServiceName string `bun:"service_name,type:text" json:"serviceName"` + Threshold float64 `bun:"threshold,type:float,notnull" json:"threshold"` + ExcludeStatusCodes string `bun:"exclude_status_codes,type:text,notnull" json:"excludeStatusCodes"` +} + +type existingTTLStatus struct { + bun.BaseModel `bun:"table:ttl_status"` + ID int `bun:"id,pk,autoincrement"` + TransactionID string `bun:"transaction_id,type:text,notnull"` + CreatedAt time.Time `bun:"created_at,type:datetime,notnull"` + UpdatedAt time.Time `bun:"updated_at,type:datetime,notnull"` + TableName string `bun:"table_name,type:text,notnull"` + TTL int `bun:"ttl,notnull,default:0"` + ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"` + Status string `bun:"status,type:text,notnull"` +} + +type newTTLStatus struct { + bun.BaseModel `bun:"table:ttl_setting"` + types.Identifiable + types.TimeAuditable + TransactionID string `bun:"transaction_id,type:text,notnull"` + TableName string `bun:"table_name,type:text,notnull"` + TTL int `bun:"ttl,notnull,default:0"` + ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"` + Status string `bun:"status,type:text,notnull"` + OrgID string `json:"-" bun:"org_id,notnull"` +} + +func NewUpdateApdexTtlFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] { + return factory. + NewProviderFactory( + factory.MustNewName("update_apdex_ttl"), + func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) { + return newUpdateApdexTtl(ctx, ps, c, sqlstore) + }) +} + +func newUpdateApdexTtl(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) { + return &updateApdexTtl{store: store}, nil +} + +func (migration *updateApdexTtl) Register(migrations *migrate.Migrations) error { + if err := migrations. + Register(migration.Up, migration.Down); err != nil { + return err + } + + return nil +} + +func (migration *updateApdexTtl) Up(ctx context.Context, db *bun.DB) error { + tx, err := db. + BeginTx(ctx, nil) + if err != nil { + return err + } + + defer tx.Rollback() + + err = migration. + store. + Dialect(). + RenameTableAndModifyModel(ctx, tx, new(existingApdexSettings), new(newApdexSettings), func(ctx context.Context) error { + existingApdexSettings := make([]*existingApdexSettings, 0) + err = tx. + NewSelect(). + Model(&existingApdexSettings). + Scan(ctx) + if err != nil { + if err != sql.ErrNoRows { + return err + } + } + + if err == nil && len(existingApdexSettings) > 0 { + newSettings := migration. + CopyExistingApdexSettingsToNewApdexSettings(existingApdexSettings) + _, err = tx. + NewInsert(). + Model(&newSettings). + Exec(ctx) + if err != nil { + return err + } + } + + tableName := tx.Dialect().Tables().Get(reflect.TypeOf(new(newApdexSettings))).Name + _, err = tx. + ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX IF NOT EXISTS %s_unique_idx ON %s (service_name, org_id)", tableName, tableName)) + if err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + + err = migration. + store. + Dialect(). + RenameTableAndModifyModel(ctx, tx, new(existingTTLStatus), new(newTTLStatus), func(ctx context.Context) error { + existingTTLStatus := make([]*existingTTLStatus, 0) + err = tx. + NewSelect(). + Model(&existingTTLStatus). + Scan(ctx) + if err != nil { + if err != sql.ErrNoRows { + return err + } + } + + if err == nil && len(existingTTLStatus) > 0 { + var orgID string + err := migration. + store. + BunDB(). + NewSelect(). + Model((*types.Organization)(nil)). + Column("id"). + Scan(ctx, &orgID) + if err != nil { + if err != sql.ErrNoRows { + return err + } + } + + if err == nil { + newTTLStatus := migration. + CopyExistingTTLStatusToNewTTLStatus(existingTTLStatus, orgID) + _, err = tx. + NewInsert(). + Model(&newTTLStatus). + Exec(ctx) + if err != nil { + return err + } + } + + } + return nil + }) + if err != nil { + return err + } + + err = tx.Commit() + if err != nil { + return err + } + + return nil +} + +func (migration *updateApdexTtl) Down(context.Context, *bun.DB) error { + return nil +} + +func (migration *updateApdexTtl) CopyExistingApdexSettingsToNewApdexSettings(existingApdexSettings []*existingApdexSettings) []*newApdexSettings { + newSettings := make([]*newApdexSettings, 0) + for _, apdexSetting := range existingApdexSettings { + newSettings = append(newSettings, &newApdexSettings{ + Identifiable: types.Identifiable{ + ID: valuer.GenerateUUID(), + }, + ServiceName: apdexSetting.ServiceName, + Threshold: apdexSetting.Threshold, + ExcludeStatusCodes: apdexSetting.ExcludeStatusCodes, + OrgID: apdexSetting.OrgID, + }) + } + + return newSettings +} + +func (migration *updateApdexTtl) CopyExistingTTLStatusToNewTTLStatus(existingTTLStatus []*existingTTLStatus, orgID string) []*newTTLStatus { + newTTLStatuses := make([]*newTTLStatus, 0) + + for _, ttl := range existingTTLStatus { + newTTLStatuses = append(newTTLStatuses, &newTTLStatus{ + Identifiable: types.Identifiable{ + ID: valuer.GenerateUUID(), + }, + TimeAuditable: types.TimeAuditable{ + CreatedAt: ttl.CreatedAt, + UpdatedAt: ttl.UpdatedAt, + }, + TransactionID: ttl.TransactionID, + TTL: ttl.TTL, + TableName: ttl.TableName, + ColdStorageTTL: ttl.ColdStorageTTL, + Status: ttl.Status, + OrgID: orgID, + }) + } + + return newTTLStatuses +} diff --git a/pkg/types/dashboard.go b/pkg/types/dashboard.go index 686a3c5a56..04503f4781 100644 --- a/pkg/types/dashboard.go +++ b/pkg/types/dashboard.go @@ -94,15 +94,14 @@ type PlannedMaintenance struct { UpdatedBy string `bun:"updated_by,type:text,notnull"` } -type TTLStatus struct { - bun.BaseModel `bun:"table:ttl_status"` - - ID int `bun:"id,pk,autoincrement"` - TransactionID string `bun:"transaction_id,type:text,notnull"` - CreatedAt time.Time `bun:"created_at,type:datetime,notnull"` - UpdatedAt time.Time `bun:"updated_at,type:datetime,notnull"` - TableName string `bun:"table_name,type:text,notnull"` - TTL int `bun:"ttl,notnull,default:0"` - ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"` - Status string `bun:"status,type:text,notnull"` +type TTLSetting struct { + bun.BaseModel `bun:"table:ttl_setting"` + Identifiable + TimeAuditable + TransactionID string `bun:"transaction_id,type:text,notnull"` + TableName string `bun:"table_name,type:text,notnull"` + TTL int `bun:"ttl,notnull,default:0"` + ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"` + Status string `bun:"status,type:text,notnull"` + OrgID string `json:"-" bun:"org_id,notnull"` } diff --git a/pkg/types/organization.go b/pkg/types/organization.go index 3bbf92f331..a31a7b13c6 100644 --- a/pkg/types/organization.go +++ b/pkg/types/organization.go @@ -7,7 +7,6 @@ import ( // TODO: check constraints are not working type Organization struct { bun.BaseModel `bun:"table:organizations"` - TimeAuditable ID string `bun:"id,pk,type:text" json:"id"` Name string `bun:"name,type:text,notnull" json:"name"` @@ -16,8 +15,10 @@ type Organization struct { } type ApdexSettings struct { - OrgID string `bun:"org_id,pk,type:text" json:"orgId"` - ServiceName string `bun:"service_name,pk,type:text" json:"serviceName"` + bun.BaseModel `bun:"table:apdex_setting"` + Identifiable + OrgID string `bun:"org_id,type:text" json:"orgId"` + ServiceName string `bun:"service_name,type:text" json:"serviceName"` Threshold float64 `bun:"threshold,type:float,notnull" json:"threshold"` ExcludeStatusCodes string `bun:"exclude_status_codes,type:text,notnull" json:"excludeStatusCodes"` }