mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 03:39:02 +08:00
feat(query-service): Add cold storage support (#837)
* Initial work on s3 * some more work * Add policy api * Cleanup * Add multi-tier TTL and remove storagePolicy API * Cleanup * Typo fix * Revert constants * Cleanup * Add API to get disks * Add more validations * Cleanup
This commit is contained in:
parent
c00f0f159b
commit
c902a6bac8
@ -45,6 +45,33 @@
|
||||
</client>
|
||||
</openSSL>
|
||||
|
||||
<!-- Example config for tiered storage -->
|
||||
<!-- <storage_configuration> -->
|
||||
<!-- <disks> -->
|
||||
<!-- <default> -->
|
||||
<!-- </default> -->
|
||||
<!-- <s3> -->
|
||||
<!-- <type>s3</type> -->
|
||||
<!-- <endpoint>http://172.17.0.1:9100/test/random/</endpoint> -->
|
||||
<!-- <access_key_id>ash</access_key_id> -->
|
||||
<!-- <secret_access_key>password</secret_access_key> -->
|
||||
<!-- </s3> -->
|
||||
<!-- </disks> -->
|
||||
<!-- <policies> -->
|
||||
<!-- <tiered> -->
|
||||
<!-- <volumes> -->
|
||||
<!-- <default> -->
|
||||
<!-- <disk>default</disk> -->
|
||||
<!-- </default> -->
|
||||
<!-- <s3> -->
|
||||
<!-- <disk>s3</disk> -->
|
||||
<!-- </s3> -->
|
||||
<!-- </volumes> -->
|
||||
<!-- </tiered> -->
|
||||
<!-- </policies> -->
|
||||
<!-- </storage_configuration> -->
|
||||
|
||||
|
||||
<!-- Default root page on http[s] server. For example load UI from https://tabix.io/ when opening http://localhost:8123 -->
|
||||
<!--
|
||||
<http_server_default_response><![CDATA[<html ng-app="SMI2"><head><base href="http://ui.tabix.io/"></head><body><div ui-view="" class="content-ui"></div><script src="http://loader.tabix.io/master.js"></script></body></html>]]></http_server_default_response>
|
||||
|
@ -2579,39 +2579,73 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query
|
||||
return &GetFilteredSpansAggregatesResponse, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
|
||||
switch ttlParams.Type {
|
||||
func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
||||
params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||
|
||||
var req, tableName string
|
||||
switch params.Type {
|
||||
case constants.TraceTTL:
|
||||
// error is skipped, handled earlier as bad request
|
||||
tracesDuration, _ := time.ParseDuration(ttlParams.Duration)
|
||||
second := int(tracesDuration.Seconds())
|
||||
query := fmt.Sprintf("ALTER TABLE default.%v MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND", signozTraceTableName, second)
|
||||
_, err := r.db.Exec(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err))
|
||||
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while setting ttl. Err=%v", err)}
|
||||
tableName = signozTraceTableName
|
||||
req = fmt.Sprintf(
|
||||
"ALTER TABLE default.%v MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE",
|
||||
tableName, params.DelDuration)
|
||||
if len(params.ColdStorageVolume) > 0 {
|
||||
req += fmt.Sprintf(", toDateTime(timestamp) + INTERVAL %v SECOND TO VOLUME '%s'",
|
||||
params.ToColdStorageDuration, params.ColdStorageVolume)
|
||||
}
|
||||
|
||||
case constants.MetricsTTL:
|
||||
// error is skipped, handled earlier as bad request
|
||||
metricsDuration, _ := time.ParseDuration(ttlParams.Duration)
|
||||
second := int(metricsDuration.Seconds())
|
||||
query := fmt.Sprintf("ALTER TABLE %v.%v MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + INTERVAL %v SECOND", signozMetricDBName, signozSampleName, second)
|
||||
_, err := r.db.Exec(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err))
|
||||
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while setting ttl. Err=%v", err)}
|
||||
tableName = signozMetricDBName + "." + signozSampleName
|
||||
req = fmt.Sprintf(
|
||||
"ALTER TABLE %v MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+
|
||||
"INTERVAL %v SECOND DELETE", tableName, params.DelDuration)
|
||||
if len(params.ColdStorageVolume) > 0 {
|
||||
req += fmt.Sprintf(", toDateTime(toUInt32(timestamp_ms / 1000), 'UTC')"+
|
||||
" + INTERVAL %v SECOND TO VOLUME '%s'",
|
||||
params.ToColdStorageDuration, params.ColdStorageVolume)
|
||||
}
|
||||
|
||||
default:
|
||||
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v", ttlParams.Type)}
|
||||
return nil, &model.ApiError{model.ErrorExec,
|
||||
fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v",
|
||||
params.Type)}
|
||||
}
|
||||
|
||||
return &model.SetTTLResponseItem{Message: "ttl has been successfully set up"}, nil
|
||||
// Set the storage policy for the required table. If it is already set, then setting it again
|
||||
// will not a problem.
|
||||
if len(params.ColdStorageVolume) > 0 {
|
||||
policyReq := fmt.Sprintf("ALTER TABLE %s MODIFY SETTING storage_policy='tiered'", tableName)
|
||||
|
||||
zap.S().Debugf("Executing Storage policy request: %s\n", policyReq)
|
||||
if _, err := r.db.Exec(policyReq); err != nil {
|
||||
zap.S().Error(fmt.Errorf("error while setting storage policy. Err=%v", err))
|
||||
return nil, &model.ApiError{model.ErrorExec,
|
||||
fmt.Errorf("error while setting storage policy. Err=%v", err)}
|
||||
}
|
||||
}
|
||||
|
||||
zap.S().Debugf("Executing TTL request: %s\n", req)
|
||||
if _, err := r.db.Exec(req); err != nil {
|
||||
zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err))
|
||||
return nil, &model.ApiError{model.ErrorExec,
|
||||
fmt.Errorf("error while setting ttl. Err=%v", err)}
|
||||
}
|
||||
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||
}
|
||||
|
||||
// GetDisks returns a list of disks {name, type} configured in clickhouse DB.
|
||||
func (r *ClickHouseReader) GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError) {
|
||||
diskItems := []model.DiskItem{}
|
||||
|
||||
query := "SELECT name,type FROM system.disks"
|
||||
if err := r.db.Select(&diskItems, query); err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, &model.ApiError{model.ErrorExec,
|
||||
fmt.Errorf("error while getting disks. Err=%v", err)}
|
||||
}
|
||||
|
||||
zap.S().Debugf("GetDisks response: %+v\n", diskItems)
|
||||
return &diskItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
|
||||
|
@ -161,6 +161,10 @@ func (druid *DruidReader) SetTTL(_ context.Context, _ *model.TTLParams) (*model.
|
||||
return nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("druid does not support setting ttl configuration")}
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError) {
|
||||
return nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("druid does not support getting disk list")}
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetTTL(_ context.Context, _ *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
|
||||
return nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("druid does not support setting ttl configuration")}
|
||||
}
|
||||
|
@ -218,6 +218,8 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
|
||||
router.HandleFunc("/api/v1/errors", aH.getErrors).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/errorWithId", aH.getErrorForId).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/errorWithType", aH.getErrorForType).Methods(http.MethodGet)
|
||||
|
||||
router.HandleFunc("/api/v1/disks", aH.getDisks).Methods(http.MethodGet)
|
||||
}
|
||||
|
||||
func Intersection(a, b []int) (c []int) {
|
||||
@ -1059,7 +1061,7 @@ func (aH *APIHandler) getTagValues(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) {
|
||||
ttlParams, err := parseDuration(r)
|
||||
ttlParams, err := parseTTLParams(r)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -1087,6 +1089,15 @@ func (aH *APIHandler) getTTL(w http.ResponseWriter, r *http.Request) {
|
||||
aH.writeJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getDisks(w http.ResponseWriter, r *http.Request) {
|
||||
result, apiErr := (*aH.reader).GetDisks(context.Background())
|
||||
if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
|
||||
aH.writeJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getUserPreferences(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
result, apiError := (*aH.relationalDB).FetchUserPreference(context.Background())
|
||||
|
@ -37,6 +37,10 @@ type Reader interface {
|
||||
GetServicesList(ctx context.Context) (*[]string, error)
|
||||
GetServiceMapDependencies(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error)
|
||||
GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError)
|
||||
|
||||
// GetDisks returns a list of disks configured in the underlying DB. It is supported by
|
||||
// clickhouse only.
|
||||
GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError)
|
||||
GetSpanFilters(ctx context.Context, query *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError)
|
||||
GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*[]model.TagFilters, *model.ApiError)
|
||||
GetTagValues(ctx context.Context, query *model.TagFilterParams) (*[]model.TagValues, *model.ApiError)
|
||||
|
@ -883,28 +883,48 @@ func parseSetRulesRequest(r *http.Request) (string, *model.ApiError) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func parseDuration(r *http.Request) (*model.TTLParams, error) {
|
||||
func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
|
||||
|
||||
// make sure either of the query params are present
|
||||
typeTTL := r.URL.Query().Get("type")
|
||||
duration := r.URL.Query().Get("duration")
|
||||
delDuration := r.URL.Query().Get("duration")
|
||||
coldStorage := r.URL.Query().Get("coldStorage")
|
||||
toColdDuration := r.URL.Query().Get("toColdDuration")
|
||||
|
||||
if len(typeTTL) == 0 || len(duration) == 0 {
|
||||
if len(typeTTL) == 0 || len(delDuration) == 0 {
|
||||
return nil, fmt.Errorf("type and duration param cannot be empty from the query")
|
||||
}
|
||||
|
||||
// Validate the duration as a valid time.Duration
|
||||
_, err := time.ParseDuration(duration)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("duration parameter is not a valid time.Duration value. Err=%v", err)
|
||||
}
|
||||
|
||||
// Validate the type parameter
|
||||
if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL {
|
||||
return nil, fmt.Errorf("type param should be <metrics|traces>, got %v", typeTTL)
|
||||
}
|
||||
|
||||
return &model.TTLParams{Duration: duration, Type: typeTTL}, nil
|
||||
// Validate the TTL duration.
|
||||
durationParsed, err := time.ParseDuration(delDuration)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Not a valid TTL duration %v", delDuration)
|
||||
}
|
||||
|
||||
var toColdParsed time.Duration
|
||||
|
||||
// If some cold storage is provided, validate the cold storage move TTL.
|
||||
if len(coldStorage) > 0 {
|
||||
toColdParsed, err = time.ParseDuration(toColdDuration)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Not a valid toCold TTL duration %v", toColdDuration)
|
||||
}
|
||||
if toColdParsed.Seconds() >= durationParsed.Seconds() {
|
||||
return nil, fmt.Errorf("Delete TTL should be greater than cold storage move TTL.")
|
||||
}
|
||||
}
|
||||
|
||||
return &model.TTLParams{
|
||||
Type: typeTTL,
|
||||
DelDuration: durationParsed.Seconds(),
|
||||
ColdStorageVolume: coldStorage,
|
||||
ToColdStorageDuration: toColdParsed.Seconds(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) {
|
||||
|
@ -4,7 +4,7 @@ go 1.14
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.88.0 // indirect
|
||||
github.com/ClickHouse/clickhouse-go v1.4.5
|
||||
github.com/ClickHouse/clickhouse-go v1.5.4
|
||||
github.com/Microsoft/go-winio v0.5.1 // indirect
|
||||
github.com/OneOfOne/xxhash v1.2.8 // indirect
|
||||
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
|
||||
|
@ -55,8 +55,9 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/ClickHouse/clickhouse-go v1.3.12/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
|
||||
github.com/ClickHouse/clickhouse-go v1.4.5 h1:FfhyEnv6/BaWldyjgT2k4gDDmeNwJ9C4NbY/MXxJlXk=
|
||||
github.com/ClickHouse/clickhouse-go v1.4.5/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
|
||||
github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV1coaaFcI0=
|
||||
github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
|
||||
github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5/go.mod h1:tTuCMEN+UleMWgg9dVx4Hu52b1bJo+59jBh3ajtinzw=
|
||||
github.com/Microsoft/go-winio v0.4.16/go.mod h1:XB6nPKklQyQ7GC9LdcBEcBl8PF76WugXOPRXwdLnMv0=
|
||||
github.com/Microsoft/go-winio v0.5.1 h1:aPJp2QD7OOrhO5tQXqQoGSJc+DjDtWTGLOmNyAm6FgY=
|
||||
|
@ -208,8 +208,10 @@ type TagFilterParams struct {
|
||||
}
|
||||
|
||||
type TTLParams struct {
|
||||
Type string
|
||||
Duration string
|
||||
Type string // It can be one of {traces, metrics}.
|
||||
ColdStorageVolume string // Name of the cold storage volume.
|
||||
ToColdStorageDuration float64 // Seconds after which data will be moved to cold storage.
|
||||
DelDuration float64 // Seconds after which data will be deleted.
|
||||
}
|
||||
|
||||
type GetTTLParams struct {
|
||||
|
@ -294,6 +294,11 @@ type SetTTLResponseItem struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type DiskItem struct {
|
||||
Name string `json:"name,omitempty" db:"name,omitempty"`
|
||||
Type string `json:"type,omitempty" db:"type,omitempty"`
|
||||
}
|
||||
|
||||
type DBResponseTTL struct {
|
||||
EngineFull string `db:"engine_full"`
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user