mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-14 04:26:02 +08:00
Added GET/SET endpoint for setting ttl for clickhouse (#304)
* feat: add ttl for clickhouse setup in signoz * feat: added ttl for metrics table Signed-off-by: Yash Sharma <yashrsharma44@gmail.com> * feat: changed the api to use type and duration as params Signed-off-by: Yash Sharma <yashrsharma44@gmail.com> * added a getter for ttl endpoint Signed-off-by: Yash Sharma <yashrsharma44@gmail.com> * added a feature to retunr ttl for both metrics and traces Signed-off-by: Yash Sharma <yashrsharma44@gmail.com>
This commit is contained in:
parent
dea74c5f8a
commit
992644dff7
@ -7,6 +7,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
_ "github.com/ClickHouse/clickhouse-go"
|
_ "github.com/ClickHouse/clickhouse-go"
|
||||||
@ -19,13 +20,18 @@ import (
|
|||||||
"github.com/prometheus/prometheus/storage/remote"
|
"github.com/prometheus/prometheus/storage/remote"
|
||||||
"github.com/prometheus/prometheus/util/stats"
|
"github.com/prometheus/prometheus/util/stats"
|
||||||
|
|
||||||
|
"go.signoz.io/query-service/constants"
|
||||||
"go.signoz.io/query-service/model"
|
"go.signoz.io/query-service/model"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
primaryNamespace = "clickhouse"
|
primaryNamespace = "clickhouse"
|
||||||
archiveNamespace = "clickhouse-archive"
|
archiveNamespace = "clickhouse-archive"
|
||||||
|
signozTraceTableName = "signoz_index"
|
||||||
|
signozMetricDBName = "signoz_metrics"
|
||||||
|
signozSampleName = "samples"
|
||||||
|
signozTSName = "time_series"
|
||||||
|
|
||||||
minTimespanForProgressiveSearch = time.Hour
|
minTimespanForProgressiveSearch = time.Hour
|
||||||
minTimespanForProgressiveSearchMargin = time.Minute
|
minTimespanForProgressiveSearchMargin = time.Minute
|
||||||
@ -812,3 +818,120 @@ func (r *ClickHouseReader) SearchSpansAggregate(ctx context.Context, queryParams
|
|||||||
return spanSearchAggregatesResponseItems, nil
|
return spanSearchAggregatesResponseItems, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||||
|
|
||||||
|
switch ttlParams.Type {
|
||||||
|
|
||||||
|
case constants.TraceTTL:
|
||||||
|
// error is skipped, handled earlier as bad request
|
||||||
|
tracesDuration, _ := time.ParseDuration(ttlParams.Duration)
|
||||||
|
second := int(tracesDuration.Seconds())
|
||||||
|
query := fmt.Sprintf("ALTER TABLE default.%v MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND", signozTraceTableName, second)
|
||||||
|
_, err := r.db.Exec(query)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err))
|
||||||
|
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while setting ttl. Err=%v", err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
case constants.MetricsTTL:
|
||||||
|
// error is skipped, handled earlier as bad request
|
||||||
|
metricsDuration, _ := time.ParseDuration(ttlParams.Duration)
|
||||||
|
second := int(metricsDuration.Seconds())
|
||||||
|
query := fmt.Sprintf("ALTER TABLE %v.%v MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + INTERVAL %v SECOND", signozMetricDBName, signozSampleName, second)
|
||||||
|
_, err := r.db.Exec(query)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err))
|
||||||
|
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while setting ttl. Err=%v", err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v", ttlParams.Type)}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &model.SetTTLResponseItem{Message: "ttl has been successfully set up"}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
|
||||||
|
|
||||||
|
parseTTL := func(queryResp string) string {
|
||||||
|
values := strings.Split(queryResp, " ")
|
||||||
|
N := len(values)
|
||||||
|
ttlIdx := -1
|
||||||
|
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
if strings.Contains(values[i], "toIntervalSecond") {
|
||||||
|
ttlIdx = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ttlIdx == -1 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
output := strings.SplitN(values[ttlIdx], "(", 2)
|
||||||
|
timePart := strings.Trim(output[1], ")")
|
||||||
|
return timePart
|
||||||
|
}
|
||||||
|
|
||||||
|
getMetricsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
|
||||||
|
var dbResp model.DBResponseTTL
|
||||||
|
|
||||||
|
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v'", signozSampleName)
|
||||||
|
|
||||||
|
err := r.db.QueryRowx(query).StructScan(&dbResp)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
zap.S().Error(fmt.Errorf("error while getting ttl. Err=%v", err))
|
||||||
|
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while getting ttl. Err=%v", err)}
|
||||||
|
}
|
||||||
|
return &dbResp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
getTracesTTL := func() (*model.DBResponseTTL, *model.ApiError) {
|
||||||
|
var dbResp model.DBResponseTTL
|
||||||
|
|
||||||
|
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v'", signozTraceTableName)
|
||||||
|
|
||||||
|
err := r.db.QueryRowx(query).StructScan(&dbResp)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
zap.S().Error(fmt.Errorf("error while getting ttl. Err=%v", err))
|
||||||
|
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while getting ttl. Err=%v", err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &dbResp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch ttlParams.Type {
|
||||||
|
case constants.TraceTTL:
|
||||||
|
dbResp, err := getTracesTTL()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &model.GetTTLResponseItem{TracesTime: parseTTL(dbResp.EngineFull)}, nil
|
||||||
|
|
||||||
|
case constants.MetricsTTL:
|
||||||
|
dbResp, err := getMetricsTTL()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &model.GetTTLResponseItem{MetricsTime: parseTTL(dbResp.EngineFull)}, nil
|
||||||
|
}
|
||||||
|
db1, err := getTracesTTL()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
db2, err := getMetricsTTL()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &model.GetTTLResponseItem{TracesTime: parseTTL(db1.EngineFull), MetricsTime: parseTTL(db2.EngineFull)}, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
@ -110,3 +110,11 @@ func (druid *DruidReader) GetServiceMapDependencies(ctx context.Context, query *
|
|||||||
func (druid *DruidReader) SearchSpansAggregate(ctx context.Context, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error) {
|
func (druid *DruidReader) SearchSpansAggregate(ctx context.Context, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error) {
|
||||||
return druidQuery.SearchSpansAggregate(druid.Client, queryParams)
|
return druidQuery.SearchSpansAggregate(druid.Client, queryParams)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (druid *DruidReader) SetTTL(_ context.Context, _ *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||||
|
return nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("druid does not support setting ttl configuration")}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (druid *DruidReader) GetTTL(_ context.Context, _ *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
|
||||||
|
return nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("druid does not support setting ttl configuration")}
|
||||||
|
}
|
||||||
|
@ -196,6 +196,8 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
|
|||||||
router.HandleFunc("/api/v1/traces/{traceId}", aH.searchTraces).Methods(http.MethodGet)
|
router.HandleFunc("/api/v1/traces/{traceId}", aH.searchTraces).Methods(http.MethodGet)
|
||||||
router.HandleFunc("/api/v1/usage", aH.getUsage).Methods(http.MethodGet)
|
router.HandleFunc("/api/v1/usage", aH.getUsage).Methods(http.MethodGet)
|
||||||
router.HandleFunc("/api/v1/serviceMapDependencies", aH.serviceMapDependencies).Methods(http.MethodGet)
|
router.HandleFunc("/api/v1/serviceMapDependencies", aH.serviceMapDependencies).Methods(http.MethodGet)
|
||||||
|
router.HandleFunc("/api/v1/settings/ttl", aH.setTTL).Methods(http.MethodPost)
|
||||||
|
router.HandleFunc("/api/v1/settings/ttl", aH.getTTL).Methods(http.MethodGet)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Intersection(a, b []int) (c []int) {
|
func Intersection(a, b []int) (c []int) {
|
||||||
@ -719,6 +721,35 @@ func (aH *APIHandler) searchSpans(w http.ResponseWriter, r *http.Request) {
|
|||||||
aH.writeJSON(w, r, result)
|
aH.writeJSON(w, r, result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ttlParams, err := parseDuration(r)
|
||||||
|
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
result, apiErr := (*aH.reader).SetTTL(context.Background(), ttlParams)
|
||||||
|
if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
aH.writeJSON(w, r, result)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (aH *APIHandler) getTTL(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ttlParams, err := parseGetTTL(r)
|
||||||
|
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
result, apiErr := (*aH.reader).GetTTL(context.Background(), ttlParams)
|
||||||
|
if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
aH.writeJSON(w, r, result)
|
||||||
|
}
|
||||||
|
|
||||||
// func (aH *APIHandler) getApplicationPercentiles(w http.ResponseWriter, r *http.Request) {
|
// func (aH *APIHandler) getApplicationPercentiles(w http.ResponseWriter, r *http.Request) {
|
||||||
// // vars := mux.Vars(r)
|
// // vars := mux.Vars(r)
|
||||||
|
|
||||||
|
@ -9,12 +9,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Reader interface {
|
type Reader interface {
|
||||||
|
// Getter Interfaces
|
||||||
GetInstantQueryMetricsResult(ctx context.Context, query *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
|
GetInstantQueryMetricsResult(ctx context.Context, query *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
|
||||||
GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
|
GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
|
||||||
GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error)
|
GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error)
|
||||||
GetServices(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceItem, error)
|
GetServices(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceItem, error)
|
||||||
// GetApplicationPercentiles(ctx context.Context, query *model.ApplicationPercentileParams) ([]godruid.Timeseries, error)
|
// GetApplicationPercentiles(ctx context.Context, query *model.ApplicationPercentileParams) ([]godruid.Timeseries, error)
|
||||||
SearchSpans(ctx context.Context, query *model.SpanSearchParams) (*[]model.SearchSpansResult, error)
|
|
||||||
GetServiceDBOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error)
|
GetServiceDBOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error)
|
||||||
GetServiceExternalAvgDuration(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error)
|
GetServiceExternalAvgDuration(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error)
|
||||||
GetServiceExternalErrors(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error)
|
GetServiceExternalErrors(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error)
|
||||||
@ -24,7 +24,14 @@ type Reader interface {
|
|||||||
GetOperations(ctx context.Context, serviceName string) (*[]string, error)
|
GetOperations(ctx context.Context, serviceName string) (*[]string, error)
|
||||||
GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error)
|
GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error)
|
||||||
GetServicesList(ctx context.Context) (*[]string, error)
|
GetServicesList(ctx context.Context) (*[]string, error)
|
||||||
SearchTraces(ctx context.Context, traceID string) (*[]model.SearchSpansResult, error)
|
|
||||||
GetServiceMapDependencies(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error)
|
GetServiceMapDependencies(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error)
|
||||||
|
GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError)
|
||||||
|
|
||||||
|
// Search Interfaces
|
||||||
SearchSpansAggregate(ctx context.Context, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error)
|
SearchSpansAggregate(ctx context.Context, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error)
|
||||||
|
SearchSpans(ctx context.Context, query *model.SpanSearchParams) (*[]model.SearchSpansResult, error)
|
||||||
|
SearchTraces(ctx context.Context, traceID string) (*[]model.SearchSpansResult, error)
|
||||||
|
|
||||||
|
// Setter Interfaces
|
||||||
|
SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
|
|
||||||
promModel "github.com/prometheus/common/model"
|
promModel "github.com/prometheus/common/model"
|
||||||
|
|
||||||
|
"go.signoz.io/query-service/constants"
|
||||||
"go.signoz.io/query-service/model"
|
"go.signoz.io/query-service/model"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
@ -18,8 +19,8 @@ import (
|
|||||||
var allowedDimesions = []string{"calls", "duration"}
|
var allowedDimesions = []string{"calls", "duration"}
|
||||||
|
|
||||||
var allowedAggregations = map[string][]string{
|
var allowedAggregations = map[string][]string{
|
||||||
"calls": []string{"count", "rate_per_sec"},
|
"calls": {"count", "rate_per_sec"},
|
||||||
"duration": []string{"avg", "p50", "p95", "p99"},
|
"duration": {"avg", "p50", "p95", "p99"},
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseGetTopEndpointsRequest(r *http.Request) (*model.GetTopEndpointsParams, error) {
|
func parseGetTopEndpointsRequest(r *http.Request) (*model.GetTopEndpointsParams, error) {
|
||||||
@ -566,3 +567,44 @@ func parseTimestamp(param string, r *http.Request) (*string, error) {
|
|||||||
return &timeStr, nil
|
return &timeStr, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func parseDuration(r *http.Request) (*model.TTLParams, error) {
|
||||||
|
|
||||||
|
// make sure either of the query params are present
|
||||||
|
typeTTL := r.URL.Query().Get("type")
|
||||||
|
duration := r.URL.Query().Get("duration")
|
||||||
|
|
||||||
|
if len(typeTTL) == 0 || len(duration) == 0 {
|
||||||
|
return nil, fmt.Errorf("type and duration param cannot be empty from the query")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate the duration as a valid time.Duration
|
||||||
|
_, err := time.ParseDuration(duration)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("duration parameter is not a valid time.Duration value. Err=%v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate the type parameter
|
||||||
|
if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL {
|
||||||
|
return nil, fmt.Errorf("type param should be <metrics|traces>, got %v", typeTTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &model.TTLParams{Duration: duration, Type: typeTTL}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) {
|
||||||
|
|
||||||
|
typeTTL := r.URL.Query().Get("type")
|
||||||
|
getAllTTL := false
|
||||||
|
|
||||||
|
if len(typeTTL) == 0 {
|
||||||
|
getAllTTL = true
|
||||||
|
} else {
|
||||||
|
// Validate the type parameter
|
||||||
|
if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL {
|
||||||
|
return nil, fmt.Errorf("type param should be <metrics|traces>, got %v", typeTTL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &model.GetTTLParams{Type: typeTTL, GetAllTTL: getAllTTL}, nil
|
||||||
|
}
|
||||||
|
@ -8,3 +8,6 @@ const HTTPHostPort = "0.0.0.0:8080"
|
|||||||
|
|
||||||
var DruidClientUrl = os.Getenv("DruidClientUrl")
|
var DruidClientUrl = os.Getenv("DruidClientUrl")
|
||||||
var DruidDatasource = os.Getenv("DruidDatasource")
|
var DruidDatasource = os.Getenv("DruidDatasource")
|
||||||
|
|
||||||
|
const TraceTTL = "traces"
|
||||||
|
const MetricsTTL = "metrics"
|
||||||
|
@ -105,3 +105,13 @@ type SpanSearchParams struct {
|
|||||||
BatchSize int64
|
BatchSize int64
|
||||||
Tags []TagQuery
|
Tags []TagQuery
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TTLParams struct {
|
||||||
|
Type string
|
||||||
|
Duration string
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetTTLParams struct {
|
||||||
|
Type string
|
||||||
|
GetAllTTL bool
|
||||||
|
}
|
||||||
|
@ -195,3 +195,16 @@ type SpanSearchAggregatesResponseItem struct {
|
|||||||
Time string `json:"time,omitempty" db:"time"`
|
Time string `json:"time,omitempty" db:"time"`
|
||||||
Value float32 `json:"value,omitempty" db:"value"`
|
Value float32 `json:"value,omitempty" db:"value"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SetTTLResponseItem struct {
|
||||||
|
Message string `json:"message"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type DBResponseTTL struct {
|
||||||
|
EngineFull string `db:"engine_full"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetTTLResponseItem struct {
|
||||||
|
MetricsTime string `json:"metrics_ttl_duration"`
|
||||||
|
TracesTime string `json:"traces_ttl_duration"`
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user