diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 71ea372456..255294d7a6 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "strconv" + "strings" "time" _ "github.com/ClickHouse/clickhouse-go" @@ -19,13 +20,18 @@ import ( "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/util/stats" + "go.signoz.io/query-service/constants" "go.signoz.io/query-service/model" "go.uber.org/zap" ) const ( - primaryNamespace = "clickhouse" - archiveNamespace = "clickhouse-archive" + primaryNamespace = "clickhouse" + archiveNamespace = "clickhouse-archive" + signozTraceTableName = "signoz_index" + signozMetricDBName = "signoz_metrics" + signozSampleName = "samples" + signozTSName = "time_series" minTimespanForProgressiveSearch = time.Hour minTimespanForProgressiveSearchMargin = time.Minute @@ -812,3 +818,120 @@ func (r *ClickHouseReader) SearchSpansAggregate(ctx context.Context, queryParams return spanSearchAggregatesResponseItems, nil } + +func (r *ClickHouseReader) SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { + + switch ttlParams.Type { + + case constants.TraceTTL: + // error is skipped, handled earlier as bad request + tracesDuration, _ := time.ParseDuration(ttlParams.Duration) + second := int(tracesDuration.Seconds()) + query := fmt.Sprintf("ALTER TABLE default.%v MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND", signozTraceTableName, second) + _, err := r.db.Exec(query) + + if err != nil { + zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err)) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while setting ttl. Err=%v", err)} + } + + case constants.MetricsTTL: + // error is skipped, handled earlier as bad request + metricsDuration, _ := time.ParseDuration(ttlParams.Duration) + second := int(metricsDuration.Seconds()) + query := fmt.Sprintf("ALTER TABLE %v.%v MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + INTERVAL %v SECOND", signozMetricDBName, signozSampleName, second) + _, err := r.db.Exec(query) + + if err != nil { + zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err)) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while setting ttl. Err=%v", err)} + } + + default: + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while setting ttl. ttl type should be , got %v", ttlParams.Type)} + } + + return &model.SetTTLResponseItem{Message: "ttl has been successfully set up"}, nil +} + +func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) { + + parseTTL := func(queryResp string) string { + values := strings.Split(queryResp, " ") + N := len(values) + ttlIdx := -1 + + for i := 0; i < N; i++ { + if strings.Contains(values[i], "toIntervalSecond") { + ttlIdx = i + break + } + } + if ttlIdx == -1 { + return "" + } + + output := strings.SplitN(values[ttlIdx], "(", 2) + timePart := strings.Trim(output[1], ")") + return timePart + } + + getMetricsTTL := func() (*model.DBResponseTTL, *model.ApiError) { + var dbResp model.DBResponseTTL + + query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v'", signozSampleName) + + err := r.db.QueryRowx(query).StructScan(&dbResp) + + if err != nil { + zap.S().Error(fmt.Errorf("error while getting ttl. Err=%v", err)) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while getting ttl. Err=%v", err)} + } + return &dbResp, nil + } + + getTracesTTL := func() (*model.DBResponseTTL, *model.ApiError) { + var dbResp model.DBResponseTTL + + query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v'", signozTraceTableName) + + err := r.db.QueryRowx(query).StructScan(&dbResp) + + if err != nil { + zap.S().Error(fmt.Errorf("error while getting ttl. Err=%v", err)) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while getting ttl. Err=%v", err)} + } + + return &dbResp, nil + } + + switch ttlParams.Type { + case constants.TraceTTL: + dbResp, err := getTracesTTL() + if err != nil { + return nil, err + } + + return &model.GetTTLResponseItem{TracesTime: parseTTL(dbResp.EngineFull)}, nil + + case constants.MetricsTTL: + dbResp, err := getMetricsTTL() + if err != nil { + return nil, err + } + + return &model.GetTTLResponseItem{MetricsTime: parseTTL(dbResp.EngineFull)}, nil + } + db1, err := getTracesTTL() + if err != nil { + return nil, err + } + + db2, err := getMetricsTTL() + if err != nil { + return nil, err + } + + return &model.GetTTLResponseItem{TracesTime: parseTTL(db1.EngineFull), MetricsTime: parseTTL(db2.EngineFull)}, nil + +} diff --git a/pkg/query-service/app/druidReader/reader.go b/pkg/query-service/app/druidReader/reader.go index bc937712b4..4c4ecb2d6f 100644 --- a/pkg/query-service/app/druidReader/reader.go +++ b/pkg/query-service/app/druidReader/reader.go @@ -110,3 +110,11 @@ func (druid *DruidReader) GetServiceMapDependencies(ctx context.Context, query * func (druid *DruidReader) SearchSpansAggregate(ctx context.Context, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error) { return druidQuery.SearchSpansAggregate(druid.Client, queryParams) } + +func (druid *DruidReader) SetTTL(_ context.Context, _ *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { + return nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("druid does not support setting ttl configuration")} +} + +func (druid *DruidReader) GetTTL(_ context.Context, _ *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) { + return nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("druid does not support setting ttl configuration")} +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 03eee773cb..1578afb0a2 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -196,6 +196,8 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) { router.HandleFunc("/api/v1/traces/{traceId}", aH.searchTraces).Methods(http.MethodGet) router.HandleFunc("/api/v1/usage", aH.getUsage).Methods(http.MethodGet) router.HandleFunc("/api/v1/serviceMapDependencies", aH.serviceMapDependencies).Methods(http.MethodGet) + router.HandleFunc("/api/v1/settings/ttl", aH.setTTL).Methods(http.MethodPost) + router.HandleFunc("/api/v1/settings/ttl", aH.getTTL).Methods(http.MethodGet) } func Intersection(a, b []int) (c []int) { @@ -719,6 +721,35 @@ func (aH *APIHandler) searchSpans(w http.ResponseWriter, r *http.Request) { aH.writeJSON(w, r, result) } +func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) { + ttlParams, err := parseDuration(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + + result, apiErr := (*aH.reader).SetTTL(context.Background(), ttlParams) + if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) { + return + } + + aH.writeJSON(w, r, result) + +} + +func (aH *APIHandler) getTTL(w http.ResponseWriter, r *http.Request) { + ttlParams, err := parseGetTTL(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + + result, apiErr := (*aH.reader).GetTTL(context.Background(), ttlParams) + if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) { + return + } + + aH.writeJSON(w, r, result) +} + // func (aH *APIHandler) getApplicationPercentiles(w http.ResponseWriter, r *http.Request) { // // vars := mux.Vars(r) diff --git a/pkg/query-service/app/interface.go b/pkg/query-service/app/interface.go index 72d90001c9..d5db44750b 100644 --- a/pkg/query-service/app/interface.go +++ b/pkg/query-service/app/interface.go @@ -9,12 +9,12 @@ import ( ) type Reader interface { + // Getter Interfaces GetInstantQueryMetricsResult(ctx context.Context, query *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError) GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error) GetServices(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceItem, error) // GetApplicationPercentiles(ctx context.Context, query *model.ApplicationPercentileParams) ([]godruid.Timeseries, error) - SearchSpans(ctx context.Context, query *model.SpanSearchParams) (*[]model.SearchSpansResult, error) GetServiceDBOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error) GetServiceExternalAvgDuration(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) GetServiceExternalErrors(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) @@ -24,7 +24,14 @@ type Reader interface { GetOperations(ctx context.Context, serviceName string) (*[]string, error) GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error) GetServicesList(ctx context.Context) (*[]string, error) - SearchTraces(ctx context.Context, traceID string) (*[]model.SearchSpansResult, error) GetServiceMapDependencies(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) + GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) + + // Search Interfaces SearchSpansAggregate(ctx context.Context, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error) + SearchSpans(ctx context.Context, query *model.SpanSearchParams) (*[]model.SearchSpansResult, error) + SearchTraces(ctx context.Context, traceID string) (*[]model.SearchSpansResult, error) + + // Setter Interfaces + SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) } diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index d3aa20abd3..6f7cbaeca6 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -11,6 +11,7 @@ import ( promModel "github.com/prometheus/common/model" + "go.signoz.io/query-service/constants" "go.signoz.io/query-service/model" "go.uber.org/zap" ) @@ -18,8 +19,8 @@ import ( var allowedDimesions = []string{"calls", "duration"} var allowedAggregations = map[string][]string{ - "calls": []string{"count", "rate_per_sec"}, - "duration": []string{"avg", "p50", "p95", "p99"}, + "calls": {"count", "rate_per_sec"}, + "duration": {"avg", "p50", "p95", "p99"}, } func parseGetTopEndpointsRequest(r *http.Request) (*model.GetTopEndpointsParams, error) { @@ -566,3 +567,44 @@ func parseTimestamp(param string, r *http.Request) (*string, error) { return &timeStr, nil } + +func parseDuration(r *http.Request) (*model.TTLParams, error) { + + // make sure either of the query params are present + typeTTL := r.URL.Query().Get("type") + duration := r.URL.Query().Get("duration") + + if len(typeTTL) == 0 || len(duration) == 0 { + return nil, fmt.Errorf("type and duration param cannot be empty from the query") + } + + // Validate the duration as a valid time.Duration + _, err := time.ParseDuration(duration) + if err != nil { + return nil, fmt.Errorf("duration parameter is not a valid time.Duration value. Err=%v", err) + } + + // Validate the type parameter + if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL { + return nil, fmt.Errorf("type param should be , got %v", typeTTL) + } + + return &model.TTLParams{Duration: duration, Type: typeTTL}, nil +} + +func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) { + + typeTTL := r.URL.Query().Get("type") + getAllTTL := false + + if len(typeTTL) == 0 { + getAllTTL = true + } else { + // Validate the type parameter + if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL { + return nil, fmt.Errorf("type param should be , got %v", typeTTL) + } + } + + return &model.GetTTLParams{Type: typeTTL, GetAllTTL: getAllTTL}, nil +} diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index c17581781a..82cdd9aaa9 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -8,3 +8,6 @@ const HTTPHostPort = "0.0.0.0:8080" var DruidClientUrl = os.Getenv("DruidClientUrl") var DruidDatasource = os.Getenv("DruidDatasource") + +const TraceTTL = "traces" +const MetricsTTL = "metrics" diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 67666eda8f..a082277b51 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -105,3 +105,13 @@ type SpanSearchParams struct { BatchSize int64 Tags []TagQuery } + +type TTLParams struct { + Type string + Duration string +} + +type GetTTLParams struct { + Type string + GetAllTTL bool +} diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index cef9610015..f35cfb306c 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -195,3 +195,16 @@ type SpanSearchAggregatesResponseItem struct { Time string `json:"time,omitempty" db:"time"` Value float32 `json:"value,omitempty" db:"value"` } + +type SetTTLResponseItem struct { + Message string `json:"message"` +} + +type DBResponseTTL struct { + EngineFull string `db:"engine_full"` +} + +type GetTTLResponseItem struct { + MetricsTime string `json:"metrics_ttl_duration"` + TracesTime string `json:"traces_ttl_duration"` +}