mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-11 17:59:09 +08:00
feat(query-service): Add cold storage support in getTTL API (#922)
* Add cold storage support in getTTL API
This commit is contained in:
parent
1002ab553e
commit
1d28ceb3d7
@ -13,6 +13,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -44,8 +45,8 @@ import (
|
|||||||
"github.com/prometheus/prometheus/util/strutil"
|
"github.com/prometheus/prometheus/util/strutil"
|
||||||
|
|
||||||
"go.signoz.io/query-service/constants"
|
"go.signoz.io/query-service/constants"
|
||||||
"go.signoz.io/query-service/model"
|
|
||||||
am "go.signoz.io/query-service/integrations/alertManager"
|
am "go.signoz.io/query-service/integrations/alertManager"
|
||||||
|
"go.signoz.io/query-service/model"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -75,7 +76,7 @@ type ClickHouseReader struct {
|
|||||||
remoteStorage *remote.Storage
|
remoteStorage *remote.Storage
|
||||||
ruleManager *rules.Manager
|
ruleManager *rules.Manager
|
||||||
promConfig *config.Config
|
promConfig *config.Config
|
||||||
alertManager am.Manager
|
alertManager am.Manager
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTraceReader returns a TraceReader for the database
|
// NewTraceReader returns a TraceReader for the database
|
||||||
@ -95,7 +96,7 @@ func NewReader(localDB *sqlx.DB) *ClickHouseReader {
|
|||||||
return &ClickHouseReader{
|
return &ClickHouseReader{
|
||||||
db: db,
|
db: db,
|
||||||
localDB: localDB,
|
localDB: localDB,
|
||||||
alertManager: alertManager,
|
alertManager: alertManager,
|
||||||
operationsTable: options.primary.OperationsTable,
|
operationsTable: options.primary.OperationsTable,
|
||||||
indexTable: options.primary.IndexTable,
|
indexTable: options.primary.IndexTable,
|
||||||
errorTable: options.primary.ErrorTable,
|
errorTable: options.primary.ErrorTable,
|
||||||
@ -850,7 +851,6 @@ func (r *ClickHouseReader) EditChannel(receiver *am.Receiver, id string) (*am.Re
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (r *ClickHouseReader) CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError) {
|
func (r *ClickHouseReader) CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError) {
|
||||||
|
|
||||||
tx, err := r.localDB.Begin()
|
tx, err := r.localDB.Begin()
|
||||||
@ -860,8 +860,8 @@ func (r *ClickHouseReader) CreateChannel(receiver *am.Receiver) (*am.Receiver, *
|
|||||||
|
|
||||||
channel_type := getChannelType(receiver)
|
channel_type := getChannelType(receiver)
|
||||||
receiverString, _ := json.Marshal(receiver)
|
receiverString, _ := json.Marshal(receiver)
|
||||||
|
|
||||||
// todo: check if the channel name already exists, raise an error if so
|
// todo: check if the channel name already exists, raise an error if so
|
||||||
|
|
||||||
{
|
{
|
||||||
stmt, err := tx.Prepare(`INSERT INTO notification_channels (created_at, updated_at, name, type, data) VALUES($1,$2,$3,$4,$5);`)
|
stmt, err := tx.Prepare(`INSERT INTO notification_channels (created_at, updated_at, name, type, data) VALUES($1,$2,$3,$4,$5);`)
|
||||||
@ -884,7 +884,7 @@ func (r *ClickHouseReader) CreateChannel(receiver *am.Receiver) (*am.Receiver, *
|
|||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return nil, apiError
|
return nil, apiError
|
||||||
}
|
}
|
||||||
|
|
||||||
err = tx.Commit()
|
err = tx.Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.S().Errorf("Error in commiting transaction for INSERT to notification_channels\n", err)
|
zap.S().Errorf("Error in commiting transaction for INSERT to notification_channels\n", err)
|
||||||
@ -2602,7 +2602,6 @@ func (r *ClickHouseReader) GetDisks(ctx context.Context) (*[]model.DiskItem, *mo
|
|||||||
fmt.Errorf("error while getting disks. Err=%v", err)}
|
fmt.Errorf("error while getting disks. Err=%v", err)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
zap.S().Infof("Got response: %+v\n", diskItems)
|
zap.S().Infof("Got response: %+v\n", diskItems)
|
||||||
|
|
||||||
return &diskItems, nil
|
return &diskItems, nil
|
||||||
@ -2610,29 +2609,33 @@ func (r *ClickHouseReader) GetDisks(ctx context.Context) (*[]model.DiskItem, *mo
|
|||||||
|
|
||||||
func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
|
func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
|
||||||
|
|
||||||
parseTTL := func(queryResp string) int {
|
parseTTL := func(queryResp string) (int, int) {
|
||||||
values := strings.Split(queryResp, " ")
|
|
||||||
N := len(values)
|
|
||||||
ttlIdx := -1
|
|
||||||
|
|
||||||
for i := 0; i < N; i++ {
|
zap.S().Debugf("Parsing TTL from: %s", queryResp)
|
||||||
if strings.Contains(values[i], "toIntervalSecond") {
|
deleteTTLExp := regexp.MustCompile(`toIntervalSecond\(([0-9]*)\)`)
|
||||||
ttlIdx = i
|
moveTTLExp := regexp.MustCompile(`toIntervalSecond\(([0-9]*)\) TO VOLUME`)
|
||||||
break
|
|
||||||
|
var delTTL, moveTTL int = -1, -1
|
||||||
|
|
||||||
|
m := deleteTTLExp.FindStringSubmatch(queryResp)
|
||||||
|
if len(m) > 1 {
|
||||||
|
seconds_int, err := strconv.Atoi(m[1])
|
||||||
|
if err != nil {
|
||||||
|
return -1, -1
|
||||||
}
|
}
|
||||||
}
|
delTTL = seconds_int / 3600
|
||||||
if ttlIdx == -1 {
|
|
||||||
return ttlIdx
|
|
||||||
}
|
}
|
||||||
|
|
||||||
output := strings.SplitN(values[ttlIdx], "(", 2)
|
m = moveTTLExp.FindStringSubmatch(queryResp)
|
||||||
timePart := strings.Trim(output[1], ")")
|
if len(m) > 1 {
|
||||||
seconds_int, err := strconv.Atoi(timePart)
|
seconds_int, err := strconv.Atoi(m[1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return -1
|
return -1, -1
|
||||||
|
}
|
||||||
|
moveTTL = seconds_int / 3600
|
||||||
}
|
}
|
||||||
ttl_hrs := seconds_int / 3600
|
|
||||||
return ttl_hrs
|
return delTTL, moveTTL
|
||||||
}
|
}
|
||||||
|
|
||||||
getMetricsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
|
getMetricsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
|
||||||
@ -2671,7 +2674,8 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &model.GetTTLResponseItem{TracesTime: parseTTL(dbResp.EngineFull)}, nil
|
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
|
||||||
|
return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL}, nil
|
||||||
|
|
||||||
case constants.MetricsTTL:
|
case constants.MetricsTTL:
|
||||||
dbResp, err := getMetricsTTL()
|
dbResp, err := getMetricsTTL()
|
||||||
@ -2679,7 +2683,9 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &model.GetTTLResponseItem{MetricsTime: parseTTL(dbResp.EngineFull)}, nil
|
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
|
||||||
|
return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL}, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
db1, err := getTracesTTL()
|
db1, err := getTracesTTL()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -2690,9 +2696,15 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
tracesDelTTL, tracesMoveTTL := parseTTL(db1.EngineFull)
|
||||||
|
metricsDelTTL, metricsMoveTTL := parseTTL(db2.EngineFull)
|
||||||
|
|
||||||
return &model.GetTTLResponseItem{TracesTime: parseTTL(db1.EngineFull), MetricsTime: parseTTL(db2.EngineFull)}, nil
|
return &model.GetTTLResponseItem{
|
||||||
|
TracesTime: tracesDelTTL,
|
||||||
|
TracesMoveTime: tracesMoveTTL,
|
||||||
|
MetricsTime: metricsDelTTL,
|
||||||
|
MetricsMoveTime: metricsMoveTTL,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetErrors(ctx context.Context, queryParams *model.GetErrorsParams) (*[]model.Error, *model.ApiError) {
|
func (r *ClickHouseReader) GetErrors(ctx context.Context, queryParams *model.GetErrorsParams) (*[]model.Error, *model.ApiError) {
|
||||||
|
@ -914,7 +914,7 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Not a valid toCold TTL duration %v", toColdDuration)
|
return nil, fmt.Errorf("Not a valid toCold TTL duration %v", toColdDuration)
|
||||||
}
|
}
|
||||||
if toColdParsed.Seconds() >= durationParsed.Seconds() {
|
if toColdParsed.Seconds() != 0 && toColdParsed.Seconds() >= durationParsed.Seconds() {
|
||||||
return nil, fmt.Errorf("Delete TTL should be greater than cold storage move TTL.")
|
return nil, fmt.Errorf("Delete TTL should be greater than cold storage move TTL.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -283,8 +283,10 @@ type DBResponseTTL struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type GetTTLResponseItem struct {
|
type GetTTLResponseItem struct {
|
||||||
MetricsTime int `json:"metrics_ttl_duration_hrs"`
|
MetricsTime int `json:"metrics_ttl_duration_hrs,omitempty"`
|
||||||
TracesTime int `json:"traces_ttl_duration_hrs"`
|
MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"`
|
||||||
|
TracesTime int `json:"traces_ttl_duration_hrs,omitempty"`
|
||||||
|
TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type DBResponseMinMaxDuration struct {
|
type DBResponseMinMaxDuration struct {
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package tests
|
package tests
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -8,6 +9,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.signoz.io/query-service/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -102,6 +104,76 @@ func TestSetTTL(t *testing.T) {
|
|||||||
fmt.Printf("=== Found %d objects in Minio\n", count)
|
fmt.Printf("=== Found %d objects in Minio\n", count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getTTL(t *testing.T, table string) *model.GetTTLResponseItem {
|
||||||
|
req := endpoint + fmt.Sprintf("/api/v1/settings/ttl?type=%s", table)
|
||||||
|
if len(table) == 0 {
|
||||||
|
req = endpoint + "/api/v1/settings/ttl"
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := client.Get(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer resp.Body.Close()
|
||||||
|
b, err := ioutil.ReadAll(resp.Body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
res := &model.GetTTLResponseItem{}
|
||||||
|
require.NoError(t, json.Unmarshal(b, res))
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetTTL(t *testing.T) {
|
||||||
|
r, err := setTTL("traces", "s3", "3600s", "7200s")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Contains(t, string(r), "successfully set up")
|
||||||
|
|
||||||
|
resp := getTTL(t, "traces")
|
||||||
|
require.Equal(t, 1, resp.TracesMoveTime)
|
||||||
|
require.Equal(t, 2, resp.TracesTime)
|
||||||
|
|
||||||
|
r, err = setTTL("metrics", "s3", "3600s", "7200s")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Contains(t, string(r), "successfully set up")
|
||||||
|
|
||||||
|
resp = getTTL(t, "metrics")
|
||||||
|
require.Equal(t, 1, resp.MetricsMoveTime)
|
||||||
|
require.Equal(t, 2, resp.MetricsTime)
|
||||||
|
|
||||||
|
r, err = setTTL("traces", "s3", "36000s", "72000s")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Contains(t, string(r), "successfully set up")
|
||||||
|
|
||||||
|
resp = getTTL(t, "")
|
||||||
|
require.Equal(t, 10, resp.TracesMoveTime)
|
||||||
|
require.Equal(t, 20, resp.TracesTime)
|
||||||
|
require.Equal(t, 1, resp.MetricsMoveTime)
|
||||||
|
require.Equal(t, 2, resp.MetricsTime)
|
||||||
|
|
||||||
|
r, err = setTTL("metrics", "s3", "15h", "50h")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Contains(t, string(r), "successfully set up")
|
||||||
|
|
||||||
|
resp = getTTL(t, "")
|
||||||
|
require.Equal(t, 10, resp.TracesMoveTime)
|
||||||
|
require.Equal(t, 20, resp.TracesTime)
|
||||||
|
require.Equal(t, 15, resp.MetricsMoveTime)
|
||||||
|
require.Equal(t, 50, resp.MetricsTime)
|
||||||
|
|
||||||
|
r, err = setTTL("metrics", "s3", "0s", "0s")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Contains(t, string(r), "successfully set up")
|
||||||
|
|
||||||
|
r, err = setTTL("traces", "s3", "0s", "0s")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Contains(t, string(r), "successfully set up")
|
||||||
|
|
||||||
|
resp = getTTL(t, "")
|
||||||
|
require.Equal(t, 0, resp.TracesMoveTime)
|
||||||
|
require.Equal(t, 0, resp.TracesTime)
|
||||||
|
require.Equal(t, 0, resp.MetricsMoveTime)
|
||||||
|
require.Equal(t, 0, resp.MetricsTime)
|
||||||
|
}
|
||||||
|
|
||||||
func TestMain(m *testing.M) {
|
func TestMain(m *testing.M) {
|
||||||
if err := startCluster(); err != nil {
|
if err := startCluster(); err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
|
BIN
pkg/query-service/tests/test-deploy/data/signoz.db
Normal file
BIN
pkg/query-service/tests/test-deploy/data/signoz.db
Normal file
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user