diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 167aa6236a..de49aa0e49 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -13,6 +13,7 @@ import ( "net/http" "net/url" "os" + "regexp" "sort" "strconv" "strings" @@ -44,8 +45,8 @@ import ( "github.com/prometheus/prometheus/util/strutil" "go.signoz.io/query-service/constants" - "go.signoz.io/query-service/model" am "go.signoz.io/query-service/integrations/alertManager" + "go.signoz.io/query-service/model" "go.uber.org/zap" ) @@ -75,7 +76,7 @@ type ClickHouseReader struct { remoteStorage *remote.Storage ruleManager *rules.Manager promConfig *config.Config - alertManager am.Manager + alertManager am.Manager } // NewTraceReader returns a TraceReader for the database @@ -95,7 +96,7 @@ func NewReader(localDB *sqlx.DB) *ClickHouseReader { return &ClickHouseReader{ db: db, localDB: localDB, - alertManager: alertManager, + alertManager: alertManager, operationsTable: options.primary.OperationsTable, indexTable: options.primary.IndexTable, errorTable: options.primary.ErrorTable, @@ -850,7 +851,6 @@ func (r *ClickHouseReader) EditChannel(receiver *am.Receiver, id string) (*am.Re } - func (r *ClickHouseReader) CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError) { tx, err := r.localDB.Begin() @@ -860,8 +860,8 @@ func (r *ClickHouseReader) CreateChannel(receiver *am.Receiver) (*am.Receiver, * channel_type := getChannelType(receiver) receiverString, _ := json.Marshal(receiver) - - // todo: check if the channel name already exists, raise an error if so + + // todo: check if the channel name already exists, raise an error if so { stmt, err := tx.Prepare(`INSERT INTO notification_channels (created_at, updated_at, name, type, data) VALUES($1,$2,$3,$4,$5);`) @@ -884,7 +884,7 @@ func (r *ClickHouseReader) CreateChannel(receiver *am.Receiver) (*am.Receiver, * tx.Rollback() return nil, apiError } - + err = tx.Commit() if err != nil { zap.S().Errorf("Error in commiting transaction for INSERT to notification_channels\n", err) @@ -2602,7 +2602,6 @@ func (r *ClickHouseReader) GetDisks(ctx context.Context) (*[]model.DiskItem, *mo fmt.Errorf("error while getting disks. Err=%v", err)} } - zap.S().Infof("Got response: %+v\n", diskItems) return &diskItems, nil @@ -2610,29 +2609,33 @@ func (r *ClickHouseReader) GetDisks(ctx context.Context) (*[]model.DiskItem, *mo func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) { - parseTTL := func(queryResp string) int { - values := strings.Split(queryResp, " ") - N := len(values) - ttlIdx := -1 + parseTTL := func(queryResp string) (int, int) { - for i := 0; i < N; i++ { - if strings.Contains(values[i], "toIntervalSecond") { - ttlIdx = i - break + zap.S().Debugf("Parsing TTL from: %s", queryResp) + deleteTTLExp := regexp.MustCompile(`toIntervalSecond\(([0-9]*)\)`) + moveTTLExp := regexp.MustCompile(`toIntervalSecond\(([0-9]*)\) TO VOLUME`) + + var delTTL, moveTTL int = -1, -1 + + m := deleteTTLExp.FindStringSubmatch(queryResp) + if len(m) > 1 { + seconds_int, err := strconv.Atoi(m[1]) + if err != nil { + return -1, -1 } - } - if ttlIdx == -1 { - return ttlIdx + delTTL = seconds_int / 3600 } - output := strings.SplitN(values[ttlIdx], "(", 2) - timePart := strings.Trim(output[1], ")") - seconds_int, err := strconv.Atoi(timePart) - if err != nil { - return -1 + m = moveTTLExp.FindStringSubmatch(queryResp) + if len(m) > 1 { + seconds_int, err := strconv.Atoi(m[1]) + if err != nil { + return -1, -1 + } + moveTTL = seconds_int / 3600 } - ttl_hrs := seconds_int / 3600 - return ttl_hrs + + return delTTL, moveTTL } getMetricsTTL := func() (*model.DBResponseTTL, *model.ApiError) { @@ -2671,7 +2674,8 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa return nil, err } - return &model.GetTTLResponseItem{TracesTime: parseTTL(dbResp.EngineFull)}, nil + delTTL, moveTTL := parseTTL(dbResp.EngineFull) + return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL}, nil case constants.MetricsTTL: dbResp, err := getMetricsTTL() @@ -2679,7 +2683,9 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa return nil, err } - return &model.GetTTLResponseItem{MetricsTime: parseTTL(dbResp.EngineFull)}, nil + delTTL, moveTTL := parseTTL(dbResp.EngineFull) + return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL}, nil + } db1, err := getTracesTTL() if err != nil { @@ -2690,9 +2696,15 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa if err != nil { return nil, err } + tracesDelTTL, tracesMoveTTL := parseTTL(db1.EngineFull) + metricsDelTTL, metricsMoveTTL := parseTTL(db2.EngineFull) - return &model.GetTTLResponseItem{TracesTime: parseTTL(db1.EngineFull), MetricsTime: parseTTL(db2.EngineFull)}, nil - + return &model.GetTTLResponseItem{ + TracesTime: tracesDelTTL, + TracesMoveTime: tracesMoveTTL, + MetricsTime: metricsDelTTL, + MetricsMoveTime: metricsMoveTTL, + }, nil } func (r *ClickHouseReader) GetErrors(ctx context.Context, queryParams *model.GetErrorsParams) (*[]model.Error, *model.ApiError) { diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 3e3abc6c79..ae9abc5204 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -914,7 +914,7 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) { if err != nil { return nil, fmt.Errorf("Not a valid toCold TTL duration %v", toColdDuration) } - if toColdParsed.Seconds() >= durationParsed.Seconds() { + if toColdParsed.Seconds() != 0 && toColdParsed.Seconds() >= durationParsed.Seconds() { return nil, fmt.Errorf("Delete TTL should be greater than cold storage move TTL.") } } diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index bbee7224e9..8d4bd4b766 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -283,8 +283,10 @@ type DBResponseTTL struct { } type GetTTLResponseItem struct { - MetricsTime int `json:"metrics_ttl_duration_hrs"` - TracesTime int `json:"traces_ttl_duration_hrs"` + MetricsTime int `json:"metrics_ttl_duration_hrs,omitempty"` + MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"` + TracesTime int `json:"traces_ttl_duration_hrs,omitempty"` + TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"` } type DBResponseMinMaxDuration struct { diff --git a/pkg/query-service/tests/cold_storage_test.go b/pkg/query-service/tests/cold_storage_test.go index f748db30dc..8159805a56 100644 --- a/pkg/query-service/tests/cold_storage_test.go +++ b/pkg/query-service/tests/cold_storage_test.go @@ -1,6 +1,7 @@ package tests import ( + "encoding/json" "fmt" "io/ioutil" "net/http" @@ -8,6 +9,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.signoz.io/query-service/model" ) const ( @@ -102,6 +104,76 @@ func TestSetTTL(t *testing.T) { fmt.Printf("=== Found %d objects in Minio\n", count) } +func getTTL(t *testing.T, table string) *model.GetTTLResponseItem { + req := endpoint + fmt.Sprintf("/api/v1/settings/ttl?type=%s", table) + if len(table) == 0 { + req = endpoint + "/api/v1/settings/ttl" + } + + resp, err := client.Get(req) + require.NoError(t, err) + + defer resp.Body.Close() + b, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + + res := &model.GetTTLResponseItem{} + require.NoError(t, json.Unmarshal(b, res)) + return res +} + +func TestGetTTL(t *testing.T) { + r, err := setTTL("traces", "s3", "3600s", "7200s") + require.NoError(t, err) + require.Contains(t, string(r), "successfully set up") + + resp := getTTL(t, "traces") + require.Equal(t, 1, resp.TracesMoveTime) + require.Equal(t, 2, resp.TracesTime) + + r, err = setTTL("metrics", "s3", "3600s", "7200s") + require.NoError(t, err) + require.Contains(t, string(r), "successfully set up") + + resp = getTTL(t, "metrics") + require.Equal(t, 1, resp.MetricsMoveTime) + require.Equal(t, 2, resp.MetricsTime) + + r, err = setTTL("traces", "s3", "36000s", "72000s") + require.NoError(t, err) + require.Contains(t, string(r), "successfully set up") + + resp = getTTL(t, "") + require.Equal(t, 10, resp.TracesMoveTime) + require.Equal(t, 20, resp.TracesTime) + require.Equal(t, 1, resp.MetricsMoveTime) + require.Equal(t, 2, resp.MetricsTime) + + r, err = setTTL("metrics", "s3", "15h", "50h") + require.NoError(t, err) + require.Contains(t, string(r), "successfully set up") + + resp = getTTL(t, "") + require.Equal(t, 10, resp.TracesMoveTime) + require.Equal(t, 20, resp.TracesTime) + require.Equal(t, 15, resp.MetricsMoveTime) + require.Equal(t, 50, resp.MetricsTime) + + r, err = setTTL("metrics", "s3", "0s", "0s") + require.NoError(t, err) + require.Contains(t, string(r), "successfully set up") + + r, err = setTTL("traces", "s3", "0s", "0s") + require.NoError(t, err) + require.Contains(t, string(r), "successfully set up") + + resp = getTTL(t, "") + require.Equal(t, 0, resp.TracesMoveTime) + require.Equal(t, 0, resp.TracesTime) + require.Equal(t, 0, resp.MetricsMoveTime) + require.Equal(t, 0, resp.MetricsTime) +} + func TestMain(m *testing.M) { if err := startCluster(); err != nil { fmt.Println(err) diff --git a/pkg/query-service/tests/test-deploy/data/signoz.db b/pkg/query-service/tests/test-deploy/data/signoz.db new file mode 100644 index 0000000000..c19319ab34 Binary files /dev/null and b/pkg/query-service/tests/test-deploy/data/signoz.db differ