From 61ebd3aded491a984ad423edfb3de2bc85b7d9df Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Thu, 4 Aug 2022 14:28:10 +0530 Subject: [PATCH] logs ttl support added in ttl api --- .../app/clickhouseReader/reader.go | 96 ++++++++++++++++++- pkg/query-service/app/parser.go | 8 +- pkg/query-service/constants/constants.go | 1 + pkg/query-service/model/response.go | 4 + 4 files changed, 104 insertions(+), 5 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index b5e6948197..49fe724df5 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -1888,7 +1888,7 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query return &GetFilteredSpansAggregatesResponse, nil } -// SetTTL sets the TTL for traces or metrics tables. +// SetTTL sets the TTL for traces or metrics or logs tables. // This is an async API which creates goroutines to set TTL. // Status of TTL update is tracked with ttl_status table in sqlite db. func (r *ClickHouseReader) SetTTL(ctx context.Context, @@ -2017,6 +2017,59 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, return } }(tableName) + case constants.LogsTTL: + tableName = r.logsDB + "." + r.logsTable + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} + } + if statusItem.Status == constants.StatusPending { + return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} + } + go func(tableName string) { + _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) + if dbErr != nil { + zap.S().Error(fmt.Errorf("error in inserting to ttl_status table: %s", dbErr.Error())) + return + } + req = fmt.Sprintf( + "ALTER TABLE %v MODIFY TTL toDateTime(timestamp / 1000000000) + "+ + "INTERVAL %v SECOND DELETE", tableName, params.DelDuration) + if len(params.ColdStorageVolume) > 0 { + req += fmt.Sprintf(", toDateTime(timestamp / 1000000000)"+ + " + INTERVAL %v SECOND TO VOLUME '%s'", + params.ToColdStorageDuration, params.ColdStorageVolume) + } + err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) + if err != nil { + zap.S().Error(fmt.Errorf("error in setting cold storage: %s", err.Err.Error())) + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + if err == nil { + _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + if dbErr != nil { + zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr) + return + } + } + return + } + zap.S().Debugf("Executing TTL request: %s\n", req) + statusItem, _ := r.checkTTLStatusItem(ctx, tableName) + if err := r.db.Exec(ctx, req); err != nil { + zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err)) + _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + if dbErr != nil { + zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr) + return + } + return + } + _, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) + if dbErr != nil { + zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr) + return + } + }(tableName) default: return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be , got %v", @@ -2180,6 +2233,24 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa } } + getLogsTTL := func() (*model.DBResponseTTL, *model.ApiError) { + var dbResp []model.DBResponseTTL + + query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsTable, r.logsDB) + + err := r.db.Select(ctx, &dbResp, query) + + if err != nil { + zap.S().Error(fmt.Errorf("error while getting ttl. Err=%v", err)) + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. Err=%v", err)} + } + if len(dbResp) == 0 { + return nil, nil + } else { + return &dbResp[0], nil + } + } + switch ttlParams.Type { case constants.TraceTTL: tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable} @@ -2224,6 +2295,29 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa delTTL, moveTTL := parseTTL(dbResp.EngineFull) return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil + + case constants.LogsTTL: + tableNameArray := []string{r.logsDB + "." + r.logsTable} + status, err := r.setTTLQueryStatus(ctx, tableNameArray) + if err != nil { + return nil, err + } + dbResp, err := getLogsTTL() + if err != nil { + return nil, err + } + ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0]) + if err != nil { + return nil, err + } + ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours + if ttlQuery.ColdStorageTtl != -1 { + ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours + } + + delTTL, moveTTL := parseTTL(dbResp.EngineFull) + return &model.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil + default: return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v", ttlParams.Type)} diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index e81b986a3d..ddd0ebd42d 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -467,8 +467,8 @@ func parseCountErrorsRequest(r *http.Request) (*model.CountErrorsParams, error) } params := &model.CountErrorsParams{ - Start: startTime, - End: endTime, + Start: startTime, + End: endTime, } return params, nil @@ -590,7 +590,7 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) { } // Validate the type parameter - if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL { + if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL && typeTTL != constants.LogsTTL { return nil, fmt.Errorf("type param should be metrics|traces, got %v", typeTTL) } @@ -629,7 +629,7 @@ func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) { return nil, fmt.Errorf("type param cannot be empty from the query") } else { // Validate the type parameter - if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL { + if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL && typeTTL != constants.LogsTTL { return nil, fmt.Errorf("type param should be metrics|traces, got %v", typeTTL) } } diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index a730c1ee2f..2e01c976cb 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -26,6 +26,7 @@ func IsTelemetryEnabled() bool { const TraceTTL = "traces" const MetricsTTL = "metrics" +const LogsTTL = "logs" func GetAlertManagerApiPrefix() string { if os.Getenv("ALERTMANAGER_API_PREFIX") != "" { diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index b6d4a056f9..feca7de5aa 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -274,10 +274,14 @@ type GetTTLResponseItem struct { MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"` TracesTime int `json:"traces_ttl_duration_hrs,omitempty"` TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"` + LogsTime int `json:"logs_ttl_duration_hrs,omitempty"` + LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"` ExpectedMetricsTime int `json:"expected_metrics_ttl_duration_hrs,omitempty"` ExpectedMetricsMoveTime int `json:"expected_metrics_move_ttl_duration_hrs,omitempty"` ExpectedTracesTime int `json:"expected_traces_ttl_duration_hrs,omitempty"` ExpectedTracesMoveTime int `json:"expected_traces_move_ttl_duration_hrs,omitempty"` + ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"` + ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"` Status string `json:"status"` }