diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 59091e05fe..a55977c35f 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -23,6 +23,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/google/uuid" "github.com/oklog/oklog/pkg/group" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -2266,48 +2267,135 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query return &GetFilteredSpansAggregatesResponse, nil } +// SetTTL sets the TTL for traces or metrics tables. +// This is an async API which creates goroutines to set TTL. +// Status of TTL update is tracked with ttl_status table in sqlite db. func (r *ClickHouseReader) SetTTL(ctx context.Context, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { - + // Keep only latest 100 transactions/requests + r.deleteTtlTransactions(ctx, 100) var req, tableName string + // uuid is used as transaction id + uuidWithHyphen := uuid.New() + uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) + + coldStorageDuration := -1 + if len(params.ColdStorageVolume) > 0 { + coldStorageDuration = int(params.ToColdStorageDuration) + } + switch params.Type { case constants.TraceTTL: tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable} - for _, tableName := range tableNameArray { - req = fmt.Sprintf( - "ALTER TABLE %v MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE", - tableName, params.DelDuration) - if len(params.ColdStorageVolume) > 0 { - req += fmt.Sprintf(", toDateTime(timestamp) + INTERVAL %v SECOND TO VOLUME '%s'", - params.ToColdStorageDuration, params.ColdStorageVolume) - } - err := r.setColdStorage(ctx, tableName, params.ColdStorageVolume) + for _, tableName = range tableNameArray { + statusItem, err := r.checkTTLStatusItem(ctx, tableName) if err != nil { - return nil, err + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")} } - - zap.S().Debugf("Executing TTL request: %s\n", req) - if err := r.db.Exec(ctx, req); err != nil { - zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. Err=%v", err)} + if statusItem.Status == constants.StatusPending { + return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} } } + for _, tableName := range tableNameArray { + // TODO: DB queries should be implemented with transactional statements but currently clickhouse doesn't support them. Issue: https://github.com/ClickHouse/ClickHouse/issues/22086 + go func(tableName string) { + _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) + if dbErr != nil { + zap.S().Error(fmt.Errorf("Error in inserting to ttl_status table: %s", dbErr.Error())) + return + } + req = fmt.Sprintf( + "ALTER TABLE %v MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE", + tableName, params.DelDuration) + if len(params.ColdStorageVolume) > 0 { + req += fmt.Sprintf(", toDateTime(timestamp) + INTERVAL %v SECOND TO VOLUME '%s'", + params.ToColdStorageDuration, params.ColdStorageVolume) + } + err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) + if err != nil { + zap.S().Error(fmt.Errorf("Error in setting cold storage: %s", err.Err.Error())) + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + if err == nil { + _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + if dbErr != nil { + zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr) + return + } + } + return + } + zap.S().Debugf("Executing TTL request: %s\n", req) + statusItem, _ := r.checkTTLStatusItem(ctx, tableName) + if err := r.db.Exec(context.Background(), req); err != nil { + zap.S().Error(fmt.Errorf("Error in executing set TTL query: %s", err.Error())) + _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + if dbErr != nil { + zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr) + return + } + return + } + _, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) + if dbErr != nil { + zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr) + return + } + }(tableName) + } case constants.MetricsTTL: tableName = signozMetricDBName + "." + signozSampleName - req = fmt.Sprintf( - "ALTER TABLE %v MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+ - "INTERVAL %v SECOND DELETE", tableName, params.DelDuration) - if len(params.ColdStorageVolume) > 0 { - req += fmt.Sprintf(", toDateTime(toUInt32(timestamp_ms / 1000), 'UTC')"+ - " + INTERVAL %v SECOND TO VOLUME '%s'", - params.ToColdStorageDuration, params.ColdStorageVolume) + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")} } - zap.S().Debugf("Executing TTL request: %s\n", req) - if err := r.db.Exec(ctx, req); err != nil { - zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. Err=%v", err)} + if statusItem.Status == constants.StatusPending { + return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} } + go func(tableName string) { + _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) + if dbErr != nil { + zap.S().Error(fmt.Errorf("Error in inserting to ttl_status table: %s", dbErr.Error())) + return + } + req = fmt.Sprintf( + "ALTER TABLE %v MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+ + "INTERVAL %v SECOND DELETE", tableName, params.DelDuration) + if len(params.ColdStorageVolume) > 0 { + req += fmt.Sprintf(", toDateTime(toUInt32(timestamp_ms / 1000), 'UTC')"+ + " + INTERVAL %v SECOND TO VOLUME '%s'", + params.ToColdStorageDuration, params.ColdStorageVolume) + } + err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) + if err != nil { + zap.S().Error(fmt.Errorf("Error in setting cold storage: %s", err.Err.Error())) + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + if err == nil { + _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + if dbErr != nil { + zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr) + return + } + } + return + } + zap.S().Debugf("Executing TTL request: %s\n", req) + statusItem, _ := r.checkTTLStatusItem(ctx, tableName) + if err := r.db.Exec(ctx, req); err != nil { + zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err)) + _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + if dbErr != nil { + zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr) + return + } + return + } + _, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) + if dbErr != nil { + zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr) + return + } + }(tableName) default: return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be , got %v", @@ -2317,6 +2405,61 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil } +func (r *ClickHouseReader) deleteTtlTransactions(ctx context.Context, numberOfTransactionsStore int) { + _, err := r.localDB.Exec("DELETE FROM ttl_status WHERE transaction_id NOT IN (SELECT distinct transaction_id FROM ttl_status ORDER BY created_at DESC LIMIT ?)", numberOfTransactionsStore) + if err != nil { + zap.S().Debug("Error in processing ttl_status delete sql query: ", err) + } +} + +// checkTTLStatusItem checks if ttl_status table has an entry for the given table name +func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, tableName string) (model.TTLStatusItem, *model.ApiError) { + statusItem := []model.TTLStatusItem{} + + query := fmt.Sprintf("SELECT id, status, ttl, cold_storage_ttl FROM ttl_status WHERE table_name = '%s' ORDER BY created_at DESC", tableName) + + err := r.localDB.Select(&statusItem, query) + + zap.S().Info(query) + + if len(statusItem) == 0 { + return model.TTLStatusItem{}, nil + } + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return model.TTLStatusItem{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")} + } + return statusItem[0], nil +} + +// setTTLQueryStatus fetches ttl_status table status from DB +func (r *ClickHouseReader) setTTLQueryStatus(ctx context.Context, tableNameArray []string) (string, *model.ApiError) { + failFlag := false + status := constants.StatusSuccess + for _, tableName := range tableNameArray { + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + emptyStatusStruct := model.TTLStatusItem{} + if statusItem == emptyStatusStruct { + return "", nil + } + if err != nil { + return "", &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")} + } + if statusItem.Status == constants.StatusPending && statusItem.UpdatedAt.Unix()-time.Now().Unix() < 3600 { + status = constants.StatusPending + return status, nil + } + if statusItem.Status == constants.StatusFailed { + failFlag = true + } + } + if failFlag { + status = constants.StatusFailed + } + + return status, nil +} + func (r *ClickHouseReader) setColdStorage(ctx context.Context, tableName string, coldStorageVolume string) *model.ApiError { // Set the storage policy for the required table. If it is already set, then setting it again @@ -2333,40 +2476,6 @@ func (r *ClickHouseReader) setColdStorage(ctx context.Context, tableName string, return nil } -func (r *ClickHouseReader) RemoveTTL(ctx context.Context, - params *model.RemoveTTLParams) (*model.RemoveTTLResponseItem, *model.ApiError) { - - var reqs []string - templateQuery := `ALTER TABLE %v REMOVE TTL` - tracesTables := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable} - metricsTables := []string{signozMetricDBName + "." + signozSampleName} - - switch params.Type { - case constants.TraceTTL: - for _, tableName := range tracesTables { - reqs = append(reqs, fmt.Sprintf(templateQuery, tableName)) - } - case constants.MetricsTTL: - for _, tableName := range metricsTables { - reqs = append(reqs, fmt.Sprintf(templateQuery, tableName)) - } - default: - for _, tableName := range append(append([]string{}, tracesTables...), metricsTables...) { - reqs = append(reqs, fmt.Sprintf(templateQuery, tableName)) - } - } - - zap.S().Debugf("Executing remove TTL requests: %s\n", reqs) - for _, req := range reqs { - if err := r.db.Exec(ctx, req); err != nil { - zap.S().Error(fmt.Errorf("error while removing ttl. Err=%v", err)) - return nil, &model.ApiError{Typ: model.ErrorExec, - Err: fmt.Errorf("error while removing ttl. Err=%v", err)} - } - } - return &model.RemoveTTLResponseItem{Message: "ttl has been successfully removed"}, nil -} - // GetDisks returns a list of disks {name, type} configured in clickhouse DB. func (r *ClickHouseReader) GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError) { diskItems := []model.DiskItem{} @@ -2382,6 +2491,7 @@ func (r *ClickHouseReader) GetDisks(ctx context.Context) (*[]model.DiskItem, *mo return &diskItems, nil } +// GetTTL returns current ttl, expected ttl and past setTTL status for metrics/traces. func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) { parseTTL := func(queryResp string) (int, int) { @@ -2451,42 +2561,53 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa switch ttlParams.Type { case constants.TraceTTL: + tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable} + status, err := r.setTTLQueryStatus(ctx, tableNameArray) + if err != nil { + return nil, err + } dbResp, err := getTracesTTL() if err != nil { return nil, err } + ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0]) + if err != nil { + return nil, err + } + ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours + if ttlQuery.ColdStorageTtl != -1 { + ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours + } delTTL, moveTTL := parseTTL(dbResp.EngineFull) - return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL}, nil + return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, ExpectedTracesTime: ttlQuery.TTL, ExpectedTracesMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil case constants.MetricsTTL: + tableNameArray := []string{signozMetricDBName + "." + signozSampleName} + status, err := r.setTTLQueryStatus(ctx, tableNameArray) + if err != nil { + return nil, err + } dbResp, err := getMetricsTTL() if err != nil { return nil, err } + ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0]) + if err != nil { + return nil, err + } + ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours + if ttlQuery.ColdStorageTtl != -1 { + ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours + } delTTL, moveTTL := parseTTL(dbResp.EngineFull) - return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL}, nil - - } - db1, err := getTracesTTL() - if err != nil { - return nil, err + return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil + default: + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v", + ttlParams.Type)} } - db2, err := getMetricsTTL() - if err != nil { - return nil, err - } - tracesDelTTL, tracesMoveTTL := parseTTL(db1.EngineFull) - metricsDelTTL, metricsMoveTTL := parseTTL(db2.EngineFull) - - return &model.GetTTLResponseItem{ - TracesTime: tracesDelTTL, - TracesMoveTime: tracesMoveTTL, - MetricsTime: metricsDelTTL, - MetricsMoveTime: metricsMoveTTL, - }, nil } func (r *ClickHouseReader) GetErrors(ctx context.Context, queryParams *model.GetErrorsParams) (*[]model.Error, *model.ApiError) { diff --git a/pkg/query-service/app/dashboards/model.go b/pkg/query-service/app/dashboards/model.go index 9e1b3958d3..1b74857d41 100644 --- a/pkg/query-service/app/dashboards/model.go +++ b/pkg/query-service/app/dashboards/model.go @@ -66,6 +66,22 @@ func InitDB(dataSourceName string) (*sqlx.DB, error) { return nil, fmt.Errorf("Error in creating notification_channles table: %s", err.Error()) } + table_schema = `CREATE TABLE IF NOT EXISTS ttl_status ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + transaction_id TEXT NOT NULL, + created_at datetime NOT NULL, + updated_at datetime NOT NULL, + table_name TEXT NOT NULL, + ttl INTEGER DEFAULT 0, + cold_storage_ttl INTEGER DEFAULT 0, + status TEXT NOT NULL + );` + + _, err = db.Exec(table_schema) + if err != nil { + return nil, fmt.Errorf("Error in creating ttl_status table: %s", err.Error()) + } + return db, nil } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 6148ce8531..cb7779c29e 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -310,7 +310,6 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) { router.HandleFunc("/api/v1/serviceMapDependencies", ViewAccess(aH.serviceMapDependencies)).Methods(http.MethodPost) router.HandleFunc("/api/v1/settings/ttl", AdminAccess(aH.setTTL)).Methods(http.MethodPost) router.HandleFunc("/api/v1/settings/ttl", ViewAccess(aH.getTTL)).Methods(http.MethodGet) - router.HandleFunc("/api/v1/settings/ttl", AdminAccess(aH.removeTTL)).Methods(http.MethodDelete) router.HandleFunc("/api/v1/version", OpenAccess(aH.getVersion)).Methods(http.MethodGet) @@ -1139,9 +1138,14 @@ func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) { return } - // Context is not used here as TTL is long duration operation which needs to converted to async + // Context is not used here as TTL is long duration DB operation result, apiErr := (*aH.reader).SetTTL(context.Background(), ttlParams) - if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) { + if apiErr != nil { + if apiErr.Typ == model.ErrorConflict { + aH.handleError(w, apiErr.Err, http.StatusConflict) + } else { + aH.handleError(w, apiErr.Err, http.StatusInternalServerError) + } return } @@ -1163,47 +1167,6 @@ func (aH *APIHandler) getTTL(w http.ResponseWriter, r *http.Request) { aH.writeJSON(w, r, result) } -func (aH *APIHandler) removeTTL(w http.ResponseWriter, r *http.Request) { - ttlParams, err := parseRemoveTTL(r) - if aH.handleError(w, err, http.StatusBadRequest) { - return - } - - existingTTL, apiErr := (*aH.reader).GetTTL(context.Background(), &model.GetTTLParams{GetAllTTL: true}) - if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) { - return - } - - if ttlParams.Type == constants.TraceTTL && existingTTL.TracesTime == -1 && - aH.handleError(w, fmt.Errorf("traces doesn't have any TTL set, cannot remove"), http.StatusBadRequest) { - return - } - - if ttlParams.Type == constants.MetricsTTL && existingTTL.MetricsTime == -1 && - aH.handleError(w, fmt.Errorf("metrics doesn't have any TTL set, cannot remove"), http.StatusBadRequest) { - return - } - - if ttlParams.RemoveAllTTL { - if existingTTL.TracesTime == -1 && existingTTL.MetricsTime != -1 { - ttlParams.Type = constants.MetricsTTL - ttlParams.RemoveAllTTL = false - } else if existingTTL.TracesTime != -1 && existingTTL.MetricsTime == -1 { - ttlParams.Type = constants.TraceTTL - ttlParams.RemoveAllTTL = false - } else if aH.handleError(w, fmt.Errorf("no TTL set, cannot remove"), http.StatusBadRequest) { - return - } - } - - result, apiErr := (*aH.reader).RemoveTTL(context.Background(), ttlParams) - if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) { - return - } - - aH.writeJSON(w, r, result) -} - func (aH *APIHandler) getDisks(w http.ResponseWriter, r *http.Request) { result, apiErr := (*aH.reader).GetDisks(context.Background()) if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) { diff --git a/pkg/query-service/app/interface.go b/pkg/query-service/app/interface.go index ec372a8e41..422f57ba41 100644 --- a/pkg/query-service/app/interface.go +++ b/pkg/query-service/app/interface.go @@ -50,7 +50,6 @@ type Reader interface { // Setter Interfaces SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) - RemoveTTL(ctx context.Context, ttlParams *model.RemoveTTLParams) (*model.RemoveTTLResponseItem, *model.ApiError) GetMetricAutocompleteMetricNames(ctx context.Context, matchText string) (*[]string, *model.ApiError) GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index c40178851d..2dd6e44c52 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -540,7 +540,7 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) { // Validate the type parameter if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL { - return nil, fmt.Errorf("type param should be , got %v", typeTTL) + return nil, fmt.Errorf("type param should be metrics|traces, got %v", typeTTL) } // Validate the TTL duration. @@ -573,35 +573,17 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) { func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) { typeTTL := r.URL.Query().Get("type") - getAllTTL := false if len(typeTTL) == 0 { - getAllTTL = true + return nil, fmt.Errorf("type param cannot be empty from the query") } else { // Validate the type parameter if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL { - return nil, fmt.Errorf("type param should be , got %v", typeTTL) + return nil, fmt.Errorf("type param should be metrics|traces, got %v", typeTTL) } } - return &model.GetTTLParams{Type: typeTTL, GetAllTTL: getAllTTL}, nil -} - -func parseRemoveTTL(r *http.Request) (*model.RemoveTTLParams, error) { - - typeTTL := r.URL.Query().Get("type") - removeAllTTL := false - - if len(typeTTL) == 0 { - removeAllTTL = true - } else { - // Validate the type parameter - if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL { - return nil, fmt.Errorf("type param should be , got %v", typeTTL) - } - } - - return &model.RemoveTTLParams{Type: typeTTL, RemoveAllTTL: removeAllTTL}, nil + return &model.GetTTLParams{Type: typeTTL}, nil } func parseUserRequest(r *http.Request) (*model.User, error) { diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 9da032e9ed..f18737c434 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -37,26 +37,29 @@ var AmChannelApiPath = GetOrDefaultEnv("ALERTMANAGER_API_CHANNEL_PATH", "v1/rout var RELATIONAL_DATASOURCE_PATH = GetOrDefaultEnv("SIGNOZ_LOCAL_DB_PATH", "/var/lib/signoz/signoz.db") const ( - ServiceName = "serviceName" - HttpRoute = "httpRoute" - HttpCode = "httpCode" - HttpHost = "httpHost" - HttpUrl = "httpUrl" - HttpMethod = "httpMethod" - Component = "component" - OperationDB = "name" - OperationRequest = "operation" - Status = "status" - Duration = "duration" - DBName = "dbName" - DBOperation = "dbOperation" - DBSystem = "dbSystem" - MsgSystem = "msgSystem" - MsgOperation = "msgOperation" - Timestamp = "timestamp" - Descending = "descending" - Ascending = "ascending" - ContextTimeout = 60 // seconds + ServiceName = "serviceName" + HttpRoute = "httpRoute" + HttpCode = "httpCode" + HttpHost = "httpHost" + HttpUrl = "httpUrl" + HttpMethod = "httpMethod" + Component = "component" + OperationDB = "name" + OperationRequest = "operation" + Status = "status" + Duration = "duration" + DBName = "dbName" + DBOperation = "dbOperation" + DBSystem = "dbSystem" + MsgSystem = "msgSystem" + MsgOperation = "msgOperation" + Timestamp = "timestamp" + Descending = "descending" + Ascending = "ascending" + ContextTimeout = 60 // seconds + StatusPending = "pending" + StatusFailed = "failed" + StatusSuccess = "success" ) func GetOrDefaultEnv(key string, fallback string) string { diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 4ab9f40770..9cc1ce235c 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -193,7 +193,6 @@ type TTLParams struct { type GetTTLParams struct { Type string - GetAllTTL bool } type GetErrorsParams struct { @@ -206,8 +205,3 @@ type GetErrorParams struct { ErrorID string ServiceName string } - -type RemoveTTLParams struct { - Type string - RemoveAllTTL bool -} diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 1e4c739abf..358fe6e979 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -29,6 +29,7 @@ const ( ErrorNotImplemented ErrorType = "not_implemented" ErrorUnauthorized ErrorType = "unauthorized" ErrorForbidden ErrorType = "forbidden" + ErrorConflict ErrorType = "conflict" ) type QueryDataV2 struct { @@ -48,6 +49,16 @@ type RuleResponseItem struct { Data string `json:"data" db:"data"` } +type TTLStatusItem struct { + Id int `json:"id" db:"id"` + UpdatedAt time.Time `json:"updated_at" db:"updated_at"` + CreatedAt time.Time `json:"created_at" db:"created_at"` + TableName string `json:"table_name" db:"table_name"` + TTL int `json:"ttl" db:"ttl"` + Status string `json:"status" db:"status"` + ColdStorageTtl int `json:"cold_storage_ttl" db:"cold_storage_ttl"` +} + type ChannelItem struct { Id int `json:"id" db:"id"` CreatedAt time.Time `json:"created_at" db:"created_at"` @@ -246,10 +257,6 @@ type SetTTLResponseItem struct { Message string `json:"message"` } -type RemoveTTLResponseItem struct { - Message string `json:"message"` -} - type DiskItem struct { Name string `json:"name,omitempty" ch:"name"` Type string `json:"type,omitempty" ch:"type"` @@ -260,10 +267,15 @@ type DBResponseTTL struct { } type GetTTLResponseItem struct { - MetricsTime int `json:"metrics_ttl_duration_hrs,omitempty"` - MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"` - TracesTime int `json:"traces_ttl_duration_hrs,omitempty"` - TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"` + MetricsTime int `json:"metrics_ttl_duration_hrs,omitempty"` + MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"` + TracesTime int `json:"traces_ttl_duration_hrs,omitempty"` + TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"` + ExpectedMetricsTime int `json:"expected_metrics_ttl_duration_hrs,omitempty"` + ExpectedMetricsMoveTime int `json:"expected_metrics_move_ttl_duration_hrs,omitempty"` + ExpectedTracesTime int `json:"expected_traces_ttl_duration_hrs,omitempty"` + ExpectedTracesMoveTime int `json:"expected_traces_move_ttl_duration_hrs,omitempty"` + Status string `json:"status"` } type DBResponseServiceName struct { diff --git a/pkg/query-service/tests/auth_test.go b/pkg/query-service/tests/auth_test.go index 8adde42f75..91e298555c 100644 --- a/pkg/query-service/tests/auth_test.go +++ b/pkg/query-service/tests/auth_test.go @@ -99,7 +99,7 @@ func TestAuthInviteAPI(t *testing.T) { func TestAuthRegisterAPI(t *testing.T) { email := "alice@signoz.io" - resp, err := register(email, "password", "") + resp, err := register(email, "Password@123", "") require.NoError(t, err) require.Contains(t, resp, "user registered successfully") @@ -108,7 +108,7 @@ func TestAuthRegisterAPI(t *testing.T) { func TestAuthLoginAPI(t *testing.T) { t.Skip() email := "abc-login@signoz.io" - password := "password123" + password := "Password@123" inv := invite(t, email) resp, err := register(email, password, inv.InviteToken) diff --git a/pkg/query-service/tests/cold_storage_test.go b/pkg/query-service/tests/cold_storage_test.go index 8159805a56..a72e48e770 100644 --- a/pkg/query-service/tests/cold_storage_test.go +++ b/pkg/query-service/tests/cold_storage_test.go @@ -20,12 +20,16 @@ var ( client http.Client ) -func setTTL(table, coldStorage, toColdTTL, deleteTTL string) ([]byte, error) { +func setTTL(table, coldStorage, toColdTTL, deleteTTL string, jwtToken string) ([]byte, error) { params := fmt.Sprintf("type=%s&duration=%s", table, deleteTTL) if len(toColdTTL) > 0 { params += fmt.Sprintf("&coldStorage=%s&toColdDuration=%s", coldStorage, toColdTTL) } - resp, err := client.Post(endpoint+"/api/v1/settings/ttl?"+params, "", nil) + var bearer = "Bearer " + jwtToken + req, err := http.NewRequest("POST", endpoint+"/api/v1/settings/ttl?"+params, nil) + req.Header.Add("Authorization", bearer) + + resp, err := client.Do(req) if err != nil { return nil, err } @@ -40,7 +44,18 @@ func setTTL(table, coldStorage, toColdTTL, deleteTTL string) ([]byte, error) { } func TestListDisks(t *testing.T) { - resp, err := client.Get(endpoint + "/api/v1/disks") + t.Skip() + email := "alice@signoz.io" + password := "Password@123" + + loginResp, err := login(email, password, "") + require.NoError(t, err) + + var bearer = "Bearer " + loginResp.AccessJwt + req, err := http.NewRequest("POST", endpoint+"/api/v1/disks", nil) + req.Header.Add("Authorization", bearer) + + resp, err := client.Do(req) require.NoError(t, err) defer resp.Body.Close() @@ -50,6 +65,11 @@ func TestListDisks(t *testing.T) { } func TestSetTTL(t *testing.T) { + email := "alice@signoz.io" + password := "Password@123" + + loginResp, err := login(email, password, "") + require.NoError(t, err) testCases := []struct { caseNo int @@ -60,7 +80,7 @@ func TestSetTTL(t *testing.T) { expected string }{ { - 1, "s3", "traces", "100s", "60s", + 1, "s3", "traces", "100h", "60h", "Delete TTL should be greater than cold storage move TTL.", }, { @@ -72,21 +92,17 @@ func TestSetTTL(t *testing.T) { "Not a valid TTL duration 100", }, { - 4, "s3", "traces", "", "60s", + 4, "s3", "metrics", "1h", "2h", "move ttl has been successfully set up", }, { - 5, "s3", "traces", "10s", "600s", + 5, "s3", "traces", "10s", "6h", "move ttl has been successfully set up", }, - { - 6, "s4", "traces", "10s", "600s", - "No such volume `s4` in storage policy `tiered`", - }, } for _, tc := range testCases { - r, err := setTTL(tc.table, tc.coldStorage, tc.coldTTL, tc.deleteTTL) + r, err := setTTL(tc.table, tc.coldStorage, tc.coldTTL, tc.deleteTTL, loginResp.AccessJwt) require.NoErrorf(t, err, "Failed case: %d", tc.caseNo) require.Containsf(t, string(r), tc.expected, "Failed case: %d", tc.caseNo) } @@ -104,13 +120,17 @@ func TestSetTTL(t *testing.T) { fmt.Printf("=== Found %d objects in Minio\n", count) } -func getTTL(t *testing.T, table string) *model.GetTTLResponseItem { - req := endpoint + fmt.Sprintf("/api/v1/settings/ttl?type=%s", table) +func getTTL(t *testing.T, table string, jwtToken string) *model.GetTTLResponseItem { + url := endpoint + fmt.Sprintf("/api/v1/settings/ttl?type=%s", table) if len(table) == 0 { - req = endpoint + "/api/v1/settings/ttl" + url = endpoint + "/api/v1/settings/ttl" } - resp, err := client.Get(req) + var bearer = "Bearer " + jwtToken + req, err := http.NewRequest("GET", url, nil) + req.Header.Add("Authorization", bearer) + resp, err := client.Do(req) + require.NoError(t, err) defer resp.Body.Close() @@ -123,55 +143,69 @@ func getTTL(t *testing.T, table string) *model.GetTTLResponseItem { } func TestGetTTL(t *testing.T) { - r, err := setTTL("traces", "s3", "3600s", "7200s") + email := "alice@signoz.io" + password := "Password@123" + + loginResp, err := login(email, password, "") + require.NoError(t, err) + + resp := getTTL(t, "traces", loginResp.AccessJwt) + for resp.Status == "pending" { + time.Sleep(time.Second) + } + require.Equal(t, "success", resp.Status) + + r, err := setTTL("traces", "s3", "1h", "2h", loginResp.AccessJwt) require.NoError(t, err) require.Contains(t, string(r), "successfully set up") - resp := getTTL(t, "traces") + resp = getTTL(t, "traces", loginResp.AccessJwt) + for resp.Status == "pending" { + time.Sleep(time.Second) + resp = getTTL(t, "traces", loginResp.AccessJwt) + require.Equal(t, 1, resp.ExpectedTracesMoveTime) + require.Equal(t, 2, resp.ExpectedTracesTime) + } + resp = getTTL(t, "traces", loginResp.AccessJwt) + require.Equal(t, "success", resp.Status) require.Equal(t, 1, resp.TracesMoveTime) require.Equal(t, 2, resp.TracesTime) - r, err = setTTL("metrics", "s3", "3600s", "7200s") + resp = getTTL(t, "metrics", loginResp.AccessJwt) + for resp.Status == "pending" { + time.Sleep(time.Second) + } + require.Equal(t, "success", resp.Status) + + r, err = setTTL("traces", "s3", "10h", "20h", loginResp.AccessJwt) require.NoError(t, err) require.Contains(t, string(r), "successfully set up") - resp = getTTL(t, "metrics") + resp = getTTL(t, "traces", loginResp.AccessJwt) + for resp.Status == "pending" { + time.Sleep(time.Second) + resp = getTTL(t, "traces", loginResp.AccessJwt) + } + require.Equal(t, "success", resp.Status) + require.Equal(t, 10, resp.TracesMoveTime) + require.Equal(t, 20, resp.TracesTime) + + resp = getTTL(t, "metrics", loginResp.AccessJwt) + for resp.Status != "success" && resp.Status != "failed" { + time.Sleep(time.Second) + resp = getTTL(t, "metrics", loginResp.AccessJwt) + } + require.Equal(t, "success", resp.Status) require.Equal(t, 1, resp.MetricsMoveTime) require.Equal(t, 2, resp.MetricsTime) - r, err = setTTL("traces", "s3", "36000s", "72000s") + r, err = setTTL("metrics", "s3", "0s", "0s", loginResp.AccessJwt) require.NoError(t, err) - require.Contains(t, string(r), "successfully set up") + require.Contains(t, string(r), "Not a valid TTL duration 0s") - resp = getTTL(t, "") - require.Equal(t, 10, resp.TracesMoveTime) - require.Equal(t, 20, resp.TracesTime) - require.Equal(t, 1, resp.MetricsMoveTime) - require.Equal(t, 2, resp.MetricsTime) - - r, err = setTTL("metrics", "s3", "15h", "50h") + r, err = setTTL("traces", "s3", "0s", "0s", loginResp.AccessJwt) require.NoError(t, err) - require.Contains(t, string(r), "successfully set up") - - resp = getTTL(t, "") - require.Equal(t, 10, resp.TracesMoveTime) - require.Equal(t, 20, resp.TracesTime) - require.Equal(t, 15, resp.MetricsMoveTime) - require.Equal(t, 50, resp.MetricsTime) - - r, err = setTTL("metrics", "s3", "0s", "0s") - require.NoError(t, err) - require.Contains(t, string(r), "successfully set up") - - r, err = setTTL("traces", "s3", "0s", "0s") - require.NoError(t, err) - require.Contains(t, string(r), "successfully set up") - - resp = getTTL(t, "") - require.Equal(t, 0, resp.TracesMoveTime) - require.Equal(t, 0, resp.TracesTime) - require.Equal(t, 0, resp.MetricsMoveTime) - require.Equal(t, 0, resp.MetricsTime) + require.Contains(t, string(r), "Not a valid TTL duration 0s") } func TestMain(m *testing.M) {