mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 04:29:04 +08:00
chore: add /v4/query_range endpoint (#4361)
This commit is contained in:
parent
739b1bf387
commit
abaf6126e5
@ -331,6 +331,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler) (*http.Server, e
|
|||||||
apiHandler.RegisterMetricsRoutes(r, am)
|
apiHandler.RegisterMetricsRoutes(r, am)
|
||||||
apiHandler.RegisterLogsRoutes(r, am)
|
apiHandler.RegisterLogsRoutes(r, am)
|
||||||
apiHandler.RegisterQueryRangeV3Routes(r, am)
|
apiHandler.RegisterQueryRangeV3Routes(r, am)
|
||||||
|
apiHandler.RegisterQueryRangeV4Routes(r, am)
|
||||||
|
|
||||||
c := cors.New(cors.Options{
|
c := cors.New(cors.Options{
|
||||||
AllowedOrigins: []string{"*"},
|
AllowedOrigins: []string{"*"},
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
|
metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/parser"
|
"go.signoz.io/signoz/pkg/query-service/app/parser"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/querier"
|
"go.signoz.io/signoz/pkg/query-service/app/querier"
|
||||||
|
querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||||
"go.signoz.io/signoz/pkg/query-service/auth"
|
"go.signoz.io/signoz/pkg/query-service/auth"
|
||||||
@ -78,6 +79,7 @@ type APIHandler struct {
|
|||||||
featureFlags interfaces.FeatureLookup
|
featureFlags interfaces.FeatureLookup
|
||||||
ready func(http.HandlerFunc) http.HandlerFunc
|
ready func(http.HandlerFunc) http.HandlerFunc
|
||||||
querier interfaces.Querier
|
querier interfaces.Querier
|
||||||
|
querierV2 interfaces.Querier
|
||||||
queryBuilder *queryBuilder.QueryBuilder
|
queryBuilder *queryBuilder.QueryBuilder
|
||||||
preferDelta bool
|
preferDelta bool
|
||||||
preferSpanMetrics bool
|
preferSpanMetrics bool
|
||||||
@ -142,7 +144,16 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
|||||||
FeatureLookup: opts.FeatureFlags,
|
FeatureLookup: opts.FeatureFlags,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
querierOptsV2 := querierV2.QuerierOptions{
|
||||||
|
Reader: opts.Reader,
|
||||||
|
Cache: opts.Cache,
|
||||||
|
KeyGenerator: queryBuilder.NewKeyGenerator(),
|
||||||
|
FluxInterval: opts.FluxInterval,
|
||||||
|
FeatureLookup: opts.FeatureFlags,
|
||||||
|
}
|
||||||
|
|
||||||
querier := querier.NewQuerier(querierOpts)
|
querier := querier.NewQuerier(querierOpts)
|
||||||
|
querierv2 := querierV2.NewQuerier(querierOptsV2)
|
||||||
|
|
||||||
aH := &APIHandler{
|
aH := &APIHandler{
|
||||||
reader: opts.Reader,
|
reader: opts.Reader,
|
||||||
@ -158,6 +169,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
|||||||
featureFlags: opts.FeatureFlags,
|
featureFlags: opts.FeatureFlags,
|
||||||
LogsParsingPipelineController: opts.LogsParsingPipelineController,
|
LogsParsingPipelineController: opts.LogsParsingPipelineController,
|
||||||
querier: querier,
|
querier: querier,
|
||||||
|
querierV2: querierv2,
|
||||||
}
|
}
|
||||||
|
|
||||||
builderOpts := queryBuilder.QueryBuilderOptions{
|
builderOpts := queryBuilder.QueryBuilderOptions{
|
||||||
@ -320,6 +332,11 @@ func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMid
|
|||||||
subRouter.HandleFunc("/logs/livetail", am.ViewAccess(aH.liveTailLogs)).Methods(http.MethodGet)
|
subRouter.HandleFunc("/logs/livetail", am.ViewAccess(aH.liveTailLogs)).Methods(http.MethodGet)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (aH *APIHandler) RegisterQueryRangeV4Routes(router *mux.Router, am *AuthMiddleware) {
|
||||||
|
subRouter := router.PathPrefix("/api/v4").Subrouter()
|
||||||
|
subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeV4)).Methods(http.MethodPost)
|
||||||
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) {
|
func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) {
|
||||||
writeHttpResponse(w, data)
|
writeHttpResponse(w, data)
|
||||||
}
|
}
|
||||||
@ -542,7 +559,7 @@ func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParam
|
|||||||
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
|
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
|
||||||
for name := range qp.CompositeQuery.BuilderQueries {
|
for name := range qp.CompositeQuery.BuilderQueries {
|
||||||
query := qp.CompositeQuery.BuilderQueries[name]
|
query := qp.CompositeQuery.BuilderQueries[name]
|
||||||
if query.DataSource == v3.DataSourceMetrics {
|
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
|
||||||
if aH.preferDelta && metricNameToTemporality[query.AggregateAttribute.Key][v3.Delta] {
|
if aH.preferDelta && metricNameToTemporality[query.AggregateAttribute.Key][v3.Delta] {
|
||||||
query.Temporality = v3.Delta
|
query.Temporality = v3.Delta
|
||||||
} else if metricNameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] {
|
} else if metricNameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] {
|
||||||
@ -3241,3 +3258,67 @@ func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
|
var result []*v3.Result
|
||||||
|
var err error
|
||||||
|
var errQuriesByName map[string]string
|
||||||
|
var spanKeys map[string]v3.AttributeKey
|
||||||
|
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
|
||||||
|
// check if any enrichment is required for logs if yes then enrich them
|
||||||
|
if logsv3.EnrichmentRequired(queryRangeParams) {
|
||||||
|
// get the fields if any logs query is present
|
||||||
|
var fields map[string]v3.AttributeKey
|
||||||
|
fields, err = aH.getLogFieldsV3(ctx, queryRangeParams)
|
||||||
|
if err != nil {
|
||||||
|
apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||||
|
RespondError(w, apiErrObj, errQuriesByName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logsv3.Enrich(queryRangeParams, fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
spanKeys, err = aH.getSpanKeysV3(ctx, queryRangeParams)
|
||||||
|
if err != nil {
|
||||||
|
apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||||
|
RespondError(w, apiErrObj, errQuriesByName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err, errQuriesByName = aH.querierV2.QueryRange(ctx, queryRangeParams, spanKeys)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||||
|
RespondError(w, apiErrObj, errQuriesByName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := v3.QueryRangeResponse{
|
||||||
|
Result: result,
|
||||||
|
}
|
||||||
|
|
||||||
|
aH.Respond(w, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) {
|
||||||
|
queryRangeParams, apiErrorObj := ParseQueryRangeParams(r)
|
||||||
|
|
||||||
|
if apiErrorObj != nil {
|
||||||
|
zap.S().Errorf(apiErrorObj.Err.Error())
|
||||||
|
RespondError(w, apiErrorObj, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// add temporality for each metric
|
||||||
|
|
||||||
|
temporalityErr := aH.addTemporality(r.Context(), queryRangeParams)
|
||||||
|
if temporalityErr != nil {
|
||||||
|
zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr)
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
aH.queryRangeV4(r.Context(), queryRangeParams, w, r)
|
||||||
|
}
|
||||||
|
@ -3,11 +3,12 @@ package cumulative
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
|
||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
// prepareMetricQueryTable prepares the query to be used for fetching metrics
|
// PrepareMetricQueryCumulativeTable prepares the query to be used for fetching metrics
|
||||||
func prepareMetricQueryTable(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
func PrepareMetricQueryCumulativeTable(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
||||||
var query string
|
var query string
|
||||||
|
|
||||||
temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq)
|
temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq)
|
||||||
@ -15,9 +16,9 @@ func prepareMetricQueryTable(start, end, step int64, mq *v3.BuilderQuery) (strin
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...)
|
groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...)
|
||||||
orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
|
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
|
||||||
selectLabels := groupByAttributeKeyTags(mq.GroupBy...)
|
selectLabels := helpers.GroupByAttributeKeyTags(mq.GroupBy...)
|
||||||
|
|
||||||
queryTmpl :=
|
queryTmpl :=
|
||||||
"SELECT %s," +
|
"SELECT %s," +
|
||||||
|
@ -99,7 +99,7 @@ func TestPrepareTableQuery(t *testing.T) {
|
|||||||
|
|
||||||
for _, testCase := range testCases {
|
for _, testCase := range testCases {
|
||||||
t.Run(testCase.name, func(t *testing.T) {
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
query, err := prepareMetricQueryTable(
|
query, err := PrepareMetricQueryCumulativeTable(
|
||||||
testCase.start,
|
testCase.start,
|
||||||
testCase.end,
|
testCase.end,
|
||||||
testCase.builderQuery.StepInterval,
|
testCase.builderQuery.StepInterval,
|
||||||
|
@ -3,7 +3,7 @@ package cumulative
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
v4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
|
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
|
||||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||||
@ -107,7 +107,7 @@ const (
|
|||||||
func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
||||||
var subQuery string
|
var subQuery string
|
||||||
|
|
||||||
timeSeriesSubQuery, err := v4.PrepareTimeseriesFilterQuery(mq)
|
timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -127,15 +127,8 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
|
|||||||
" GROUP BY fingerprint, ts" +
|
" GROUP BY fingerprint, ts" +
|
||||||
" ORDER BY fingerprint, ts"
|
" ORDER BY fingerprint, ts"
|
||||||
|
|
||||||
var selectLabelsAny string
|
selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy)
|
||||||
for _, tag := range mq.GroupBy {
|
selectLabels := helpers.SelectLabels(mq.GroupBy)
|
||||||
selectLabelsAny += fmt.Sprintf("any(%s) as %s,", tag.Key, tag.Key)
|
|
||||||
}
|
|
||||||
|
|
||||||
var selectLabels string
|
|
||||||
for _, tag := range mq.GroupBy {
|
|
||||||
selectLabels += tag.Key + ","
|
|
||||||
}
|
|
||||||
|
|
||||||
switch mq.TimeAggregation {
|
switch mq.TimeAggregation {
|
||||||
case v3.TimeAggregationAvg:
|
case v3.TimeAggregationAvg:
|
||||||
@ -177,8 +170,8 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
|
|||||||
return subQuery, nil
|
return subQuery, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// prepareMetricQueryCumulativeTimeSeries prepares the query to be used for fetching metrics
|
// PrepareMetricQueryCumulativeTimeSeries prepares the query to be used for fetching metrics
|
||||||
func prepareMetricQueryCumulativeTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
func PrepareMetricQueryCumulativeTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
||||||
var query string
|
var query string
|
||||||
|
|
||||||
temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq)
|
temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq)
|
||||||
@ -186,9 +179,9 @@ func prepareMetricQueryCumulativeTimeSeries(start, end, step int64, mq *v3.Build
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...)
|
groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...)
|
||||||
orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
|
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
|
||||||
selectLabels := groupByAttributeKeyTags(mq.GroupBy...)
|
selectLabels := helpers.GroupByAttributeKeyTags(mq.GroupBy...)
|
||||||
|
|
||||||
queryTmpl :=
|
queryTmpl :=
|
||||||
"SELECT %s," +
|
"SELECT %s," +
|
||||||
|
@ -216,7 +216,7 @@ func TestPrepareTimeseriesQuery(t *testing.T) {
|
|||||||
|
|
||||||
for _, testCase := range testCases {
|
for _, testCase := range testCases {
|
||||||
t.Run(testCase.name, func(t *testing.T) {
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
query, err := prepareMetricQueryCumulativeTimeSeries(
|
query, err := PrepareMetricQueryCumulativeTimeSeries(
|
||||||
testCase.start,
|
testCase.start,
|
||||||
testCase.end,
|
testCase.end,
|
||||||
testCase.builderQuery.StepInterval,
|
testCase.builderQuery.StepInterval,
|
||||||
|
@ -1,61 +0,0 @@
|
|||||||
package delta
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
|
||||||
)
|
|
||||||
|
|
||||||
// groupingSets returns a string of comma separated tags for group by clause
|
|
||||||
// `ts` is always added to the group by clause
|
|
||||||
func groupingSets(tags ...string) string {
|
|
||||||
withTs := append(tags, "ts")
|
|
||||||
if len(withTs) > 1 {
|
|
||||||
return fmt.Sprintf(`GROUPING SETS ( (%s), (%s) )`, strings.Join(withTs, ", "), strings.Join(tags, ", "))
|
|
||||||
} else {
|
|
||||||
return strings.Join(withTs, ", ")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// groupingSetsByAttributeKeyTags returns a string of comma separated tags for group by clause
|
|
||||||
func groupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string {
|
|
||||||
groupTags := []string{}
|
|
||||||
for _, tag := range tags {
|
|
||||||
groupTags = append(groupTags, tag.Key)
|
|
||||||
}
|
|
||||||
return groupingSets(groupTags...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// groupBy returns a string of comma separated tags for group by clause
|
|
||||||
func groupByAttributeKeyTags(tags ...v3.AttributeKey) string {
|
|
||||||
groupTags := []string{}
|
|
||||||
for _, tag := range tags {
|
|
||||||
groupTags = append(groupTags, tag.Key)
|
|
||||||
}
|
|
||||||
groupTags = append(groupTags, "ts")
|
|
||||||
return strings.Join(groupTags, ", ")
|
|
||||||
}
|
|
||||||
|
|
||||||
// orderBy returns a string of comma separated tags for order by clause
|
|
||||||
// if the order is not specified, it defaults to ASC
|
|
||||||
func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string {
|
|
||||||
var orderBy []string
|
|
||||||
for _, tag := range tags {
|
|
||||||
found := false
|
|
||||||
for _, item := range items {
|
|
||||||
if item.ColumnName == tag.Key {
|
|
||||||
found = true
|
|
||||||
orderBy = append(orderBy, fmt.Sprintf("%s %s", item.ColumnName, item.Order))
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag.Key))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
orderBy = append(orderBy, "ts ASC")
|
|
||||||
|
|
||||||
return strings.Join(orderBy, ", ")
|
|
||||||
}
|
|
@ -3,11 +3,17 @@ package delta
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
|
||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
// prepareMetricQueryDeltaTable builds the query to be used for fetching metrics
|
// PrepareMetricQueryDeltaTable builds the query to be used for fetching metrics
|
||||||
func prepareMetricQueryDeltaTable(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
func PrepareMetricQueryDeltaTable(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
||||||
|
|
||||||
|
if canShortCircuit(mq) {
|
||||||
|
return prepareQueryOptimized(start, end, step, mq)
|
||||||
|
}
|
||||||
|
|
||||||
var query string
|
var query string
|
||||||
|
|
||||||
temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq)
|
temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq)
|
||||||
@ -15,9 +21,9 @@ func prepareMetricQueryDeltaTable(start, end, step int64, mq *v3.BuilderQuery) (
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...)
|
groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...)
|
||||||
orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
|
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
|
||||||
selectLabels := groupByAttributeKeyTags(mq.GroupBy...)
|
selectLabels := helpers.GroupByAttributeKeyTags(mq.GroupBy...)
|
||||||
|
|
||||||
queryTmpl :=
|
queryTmpl :=
|
||||||
"SELECT %s," +
|
"SELECT %s," +
|
||||||
|
@ -95,13 +95,13 @@ func TestPrepareTableQuery(t *testing.T) {
|
|||||||
},
|
},
|
||||||
start: 1701794980000,
|
start: 1701794980000,
|
||||||
end: 1701796780000,
|
end: 1701796780000,
|
||||||
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
|
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, testCase := range testCases {
|
for _, testCase := range testCases {
|
||||||
t.Run(testCase.name, func(t *testing.T) {
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
query, err := prepareMetricQueryDeltaTable(
|
query, err := PrepareMetricQueryDeltaTable(
|
||||||
testCase.start,
|
testCase.start,
|
||||||
testCase.end,
|
testCase.end,
|
||||||
testCase.builderQuery.StepInterval,
|
testCase.builderQuery.StepInterval,
|
||||||
|
@ -210,13 +210,13 @@ func TestPrepareTimeseriesQuery(t *testing.T) {
|
|||||||
},
|
},
|
||||||
start: 1701794980000,
|
start: 1701794980000,
|
||||||
end: 1701796780000,
|
end: 1701796780000,
|
||||||
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
|
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, testCase := range testCases {
|
for _, testCase := range testCases {
|
||||||
t.Run(testCase.name, func(t *testing.T) {
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
query, err := prepareMetricQueryDeltaTimeSeries(
|
query, err := PrepareMetricQueryDeltaTimeSeries(
|
||||||
testCase.start,
|
testCase.start,
|
||||||
testCase.end,
|
testCase.end,
|
||||||
testCase.builderQuery.StepInterval,
|
testCase.builderQuery.StepInterval,
|
||||||
|
@ -3,18 +3,18 @@ package delta
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
v4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
|
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
|
||||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// prepareTimeAggregationSubQueryTimeSeries builds the sub-query to be used for temporal aggregation
|
// prepareTimeAggregationSubQuery builds the sub-query to be used for temporal aggregation
|
||||||
func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
||||||
|
|
||||||
var subQuery string
|
var subQuery string
|
||||||
|
|
||||||
timeSeriesSubQuery, err := v4.PrepareTimeseriesFilterQuery(mq)
|
timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -34,15 +34,7 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
|
|||||||
" GROUP BY fingerprint, ts" +
|
" GROUP BY fingerprint, ts" +
|
||||||
" ORDER BY fingerprint, ts"
|
" ORDER BY fingerprint, ts"
|
||||||
|
|
||||||
var selectLabelsAny string
|
selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy)
|
||||||
for _, tag := range mq.GroupBy {
|
|
||||||
selectLabelsAny += fmt.Sprintf("any(%s) as %s,", tag.Key, tag.Key)
|
|
||||||
}
|
|
||||||
|
|
||||||
var selectLabels string
|
|
||||||
for _, tag := range mq.GroupBy {
|
|
||||||
selectLabels += tag.Key + ","
|
|
||||||
}
|
|
||||||
|
|
||||||
switch mq.TimeAggregation {
|
switch mq.TimeAggregation {
|
||||||
case v3.TimeAggregationAvg:
|
case v3.TimeAggregationAvg:
|
||||||
@ -76,8 +68,58 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
|
|||||||
return subQuery, nil
|
return subQuery, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// prepareMetricQueryDeltaTimeSeries builds the query to be used for fetching metrics
|
// See `canShortCircuit` below for details
|
||||||
func prepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
||||||
|
|
||||||
|
groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...)
|
||||||
|
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
|
||||||
|
selectLabels := helpers.SelectLabels(mq.GroupBy)
|
||||||
|
|
||||||
|
var query string
|
||||||
|
|
||||||
|
timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
samplesTableFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
|
||||||
|
|
||||||
|
// Select the aggregate value for interval
|
||||||
|
queryTmpl :=
|
||||||
|
"SELECT %s" +
|
||||||
|
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
|
||||||
|
" %s as value" +
|
||||||
|
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||||
|
" INNER JOIN" +
|
||||||
|
" (%s) as filtered_time_series" +
|
||||||
|
" USING fingerprint" +
|
||||||
|
" WHERE " + samplesTableFilter +
|
||||||
|
" GROUP BY %s" +
|
||||||
|
" ORDER BY %s"
|
||||||
|
|
||||||
|
switch mq.SpaceAggregation {
|
||||||
|
case v3.SpaceAggregationSum:
|
||||||
|
op := "sum(value)"
|
||||||
|
if mq.TimeAggregation == v3.TimeAggregationRate {
|
||||||
|
op = "sum(value)/" + fmt.Sprintf("%d", step)
|
||||||
|
}
|
||||||
|
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
|
||||||
|
case v3.SpaceAggregationMin:
|
||||||
|
op := "min(value)"
|
||||||
|
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
|
||||||
|
case v3.SpaceAggregationMax:
|
||||||
|
op := "max(value)"
|
||||||
|
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
|
||||||
|
}
|
||||||
|
return query, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// PrepareMetricQueryDeltaTimeSeries builds the query to be used for fetching metrics
|
||||||
|
func PrepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
||||||
|
|
||||||
|
if canShortCircuit(mq) {
|
||||||
|
return prepareQueryOptimized(start, end, step, mq)
|
||||||
|
}
|
||||||
|
|
||||||
var query string
|
var query string
|
||||||
|
|
||||||
@ -86,9 +128,9 @@ func prepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQue
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...)
|
groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...)
|
||||||
orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
|
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
|
||||||
selectLabels := groupByAttributeKeyTags(mq.GroupBy...)
|
selectLabels := helpers.GroupByAttributeKeyTags(mq.GroupBy...)
|
||||||
|
|
||||||
queryTmpl :=
|
queryTmpl :=
|
||||||
"SELECT %s," +
|
"SELECT %s," +
|
||||||
@ -118,3 +160,37 @@ func prepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQue
|
|||||||
|
|
||||||
return query, nil
|
return query, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// canShortCircuit returns true if we can use the optimized query
|
||||||
|
// for the given query
|
||||||
|
// This is used to avoid the group by fingerprint thus improving the performance
|
||||||
|
// for certain queries
|
||||||
|
// cases where we can short circuit:
|
||||||
|
// 1. time aggregation = (rate|increase) and space aggregation = sum
|
||||||
|
// - rate = sum(value)/step, increase = sum(value) - sum of sums is same as sum of all values
|
||||||
|
//
|
||||||
|
// 2. time aggregation = sum and space aggregation = sum
|
||||||
|
// - sum of sums is same as sum of all values
|
||||||
|
//
|
||||||
|
// 3. time aggregation = min and space aggregation = min
|
||||||
|
// - min of mins is same as min of all values
|
||||||
|
//
|
||||||
|
// 4. time aggregation = max and space aggregation = max
|
||||||
|
// - max of maxs is same as max of all values
|
||||||
|
//
|
||||||
|
// all of this is true only for delta metrics
|
||||||
|
func canShortCircuit(mq *v3.BuilderQuery) bool {
|
||||||
|
if (mq.TimeAggregation == v3.TimeAggregationRate || mq.TimeAggregation == v3.TimeAggregationIncrease) && mq.SpaceAggregation == v3.SpaceAggregationSum {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if mq.TimeAggregation == v3.TimeAggregationSum && mq.SpaceAggregation == v3.SpaceAggregationSum {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if mq.TimeAggregation == v3.TimeAggregationMin && mq.SpaceAggregation == v3.SpaceAggregationMin {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if mq.TimeAggregation == v3.TimeAggregationMax && mq.SpaceAggregation == v3.SpaceAggregationMax {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package cumulative
|
package helpers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -18,8 +18,8 @@ func groupingSets(tags ...string) string {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// groupingSetsByAttributeKeyTags returns a string of comma separated tags for group by clause
|
// GroupingSetsByAttributeKeyTags returns a string of comma separated tags for group by clause
|
||||||
func groupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string {
|
func GroupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string {
|
||||||
groupTags := []string{}
|
groupTags := []string{}
|
||||||
for _, tag := range tags {
|
for _, tag := range tags {
|
||||||
groupTags = append(groupTags, tag.Key)
|
groupTags = append(groupTags, tag.Key)
|
||||||
@ -27,8 +27,8 @@ func groupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string {
|
|||||||
return groupingSets(groupTags...)
|
return groupingSets(groupTags...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// groupBy returns a string of comma separated tags for group by clause
|
// GroupByAttributeKeyTags returns a string of comma separated tags for group by clause
|
||||||
func groupByAttributeKeyTags(tags ...v3.AttributeKey) string {
|
func GroupByAttributeKeyTags(tags ...v3.AttributeKey) string {
|
||||||
groupTags := []string{}
|
groupTags := []string{}
|
||||||
for _, tag := range tags {
|
for _, tag := range tags {
|
||||||
groupTags = append(groupTags, tag.Key)
|
groupTags = append(groupTags, tag.Key)
|
||||||
@ -37,9 +37,9 @@ func groupByAttributeKeyTags(tags ...v3.AttributeKey) string {
|
|||||||
return strings.Join(groupTags, ", ")
|
return strings.Join(groupTags, ", ")
|
||||||
}
|
}
|
||||||
|
|
||||||
// orderBy returns a string of comma separated tags for order by clause
|
// OrderByAttributeKeyTags returns a string of comma separated tags for order by clause
|
||||||
// if the order is not specified, it defaults to ASC
|
// if the order is not specified, it defaults to ASC
|
||||||
func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string {
|
func OrderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string {
|
||||||
var orderBy []string
|
var orderBy []string
|
||||||
for _, tag := range tags {
|
for _, tag := range tags {
|
||||||
found := false
|
found := false
|
||||||
@ -59,3 +59,19 @@ func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string
|
|||||||
|
|
||||||
return strings.Join(orderBy, ", ")
|
return strings.Join(orderBy, ", ")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SelectLabelsAny(tags []v3.AttributeKey) string {
|
||||||
|
var selectLabelsAny []string
|
||||||
|
for _, tag := range tags {
|
||||||
|
selectLabelsAny = append(selectLabelsAny, fmt.Sprintf("any(%s) as %s,", tag.Key, tag.Key))
|
||||||
|
}
|
||||||
|
return strings.Join(selectLabelsAny, " ")
|
||||||
|
}
|
||||||
|
|
||||||
|
func SelectLabels(tags []v3.AttributeKey) string {
|
||||||
|
var selectLabels []string
|
||||||
|
for _, tag := range tags {
|
||||||
|
selectLabels = append(selectLabels, fmt.Sprintf("%s,", tag.Key))
|
||||||
|
}
|
||||||
|
return strings.Join(selectLabels, " ")
|
||||||
|
}
|
86
pkg/query-service/app/metrics/v4/helpers/sub_query.go
Normal file
86
pkg/query-service/app/metrics/v4/helpers/sub_query.go
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
package helpers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria
|
||||||
|
func PrepareTimeseriesFilterQuery(mq *v3.BuilderQuery) (string, error) {
|
||||||
|
var conditions []string
|
||||||
|
var fs *v3.FilterSet = mq.Filters
|
||||||
|
var groupTags []v3.AttributeKey = mq.GroupBy
|
||||||
|
|
||||||
|
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key)))
|
||||||
|
conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality))
|
||||||
|
|
||||||
|
if fs != nil && len(fs.Items) != 0 {
|
||||||
|
for _, item := range fs.Items {
|
||||||
|
toFormat := item.Value
|
||||||
|
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
|
||||||
|
if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains {
|
||||||
|
toFormat = fmt.Sprintf("%%%s%%", toFormat)
|
||||||
|
}
|
||||||
|
fmtVal := utils.ClickHouseFormattedValue(toFormat)
|
||||||
|
switch op {
|
||||||
|
case v3.FilterOperatorEqual:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal))
|
||||||
|
case v3.FilterOperatorNotEqual:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal))
|
||||||
|
case v3.FilterOperatorIn:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal))
|
||||||
|
case v3.FilterOperatorNotIn:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal))
|
||||||
|
case v3.FilterOperatorLike:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||||
|
case v3.FilterOperatorNotLike:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||||
|
case v3.FilterOperatorRegex:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||||
|
case v3.FilterOperatorNotRegex:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||||
|
case v3.FilterOperatorGreaterThan:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal))
|
||||||
|
case v3.FilterOperatorGreaterThanOrEq:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal))
|
||||||
|
case v3.FilterOperatorLessThan:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal))
|
||||||
|
case v3.FilterOperatorLessThanOrEq:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal))
|
||||||
|
case v3.FilterOperatorContains:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||||
|
case v3.FilterOperatorNotContains:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||||
|
case v3.FilterOperatorExists:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", item.Key.Key))
|
||||||
|
case v3.FilterOperatorNotExists:
|
||||||
|
conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", item.Key.Key))
|
||||||
|
default:
|
||||||
|
return "", fmt.Errorf("unsupported filter operator")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
whereClause := strings.Join(conditions, " AND ")
|
||||||
|
|
||||||
|
var selectLabels string
|
||||||
|
for _, tag := range groupTags {
|
||||||
|
selectLabels += fmt.Sprintf("JSONExtractString(labels, '%s') as %s, ", tag.Key, tag.Key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The table JOIN key always exists
|
||||||
|
selectLabels += "fingerprint"
|
||||||
|
|
||||||
|
filterSubQuery := fmt.Sprintf(
|
||||||
|
"SELECT DISTINCT %s FROM %s.%s WHERE %s",
|
||||||
|
selectLabels,
|
||||||
|
constants.SIGNOZ_METRIC_DBNAME,
|
||||||
|
constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME,
|
||||||
|
whereClause,
|
||||||
|
)
|
||||||
|
|
||||||
|
return filterSubQuery, nil
|
||||||
|
}
|
@ -2,100 +2,66 @@ package v4
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
|
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
|
||||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/cumulative"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/delta"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/common"
|
||||||
"go.signoz.io/signoz/pkg/query-service/model"
|
"go.signoz.io/signoz/pkg/query-service/model"
|
||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria
|
|
||||||
func PrepareTimeseriesFilterQuery(mq *v3.BuilderQuery) (string, error) {
|
|
||||||
var conditions []string
|
|
||||||
var fs *v3.FilterSet = mq.Filters
|
|
||||||
var groupTags []v3.AttributeKey = mq.GroupBy
|
|
||||||
|
|
||||||
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key)))
|
|
||||||
conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality))
|
|
||||||
|
|
||||||
if fs != nil && len(fs.Items) != 0 {
|
|
||||||
for _, item := range fs.Items {
|
|
||||||
toFormat := item.Value
|
|
||||||
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
|
|
||||||
if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains {
|
|
||||||
toFormat = fmt.Sprintf("%%%s%%", toFormat)
|
|
||||||
}
|
|
||||||
fmtVal := utils.ClickHouseFormattedValue(toFormat)
|
|
||||||
switch op {
|
|
||||||
case v3.FilterOperatorEqual:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal))
|
|
||||||
case v3.FilterOperatorNotEqual:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal))
|
|
||||||
case v3.FilterOperatorIn:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal))
|
|
||||||
case v3.FilterOperatorNotIn:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal))
|
|
||||||
case v3.FilterOperatorLike:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
|
||||||
case v3.FilterOperatorNotLike:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
|
||||||
case v3.FilterOperatorRegex:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
|
||||||
case v3.FilterOperatorNotRegex:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
|
||||||
case v3.FilterOperatorGreaterThan:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal))
|
|
||||||
case v3.FilterOperatorGreaterThanOrEq:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal))
|
|
||||||
case v3.FilterOperatorLessThan:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal))
|
|
||||||
case v3.FilterOperatorLessThanOrEq:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal))
|
|
||||||
case v3.FilterOperatorContains:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
|
||||||
case v3.FilterOperatorNotContains:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
|
||||||
case v3.FilterOperatorExists:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", item.Key.Key))
|
|
||||||
case v3.FilterOperatorNotExists:
|
|
||||||
conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", item.Key.Key))
|
|
||||||
default:
|
|
||||||
return "", fmt.Errorf("unsupported filter operator")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
whereClause := strings.Join(conditions, " AND ")
|
|
||||||
|
|
||||||
var selectLabels string
|
|
||||||
for _, tag := range groupTags {
|
|
||||||
selectLabels += fmt.Sprintf("JSONExtractString(labels, '%s') as %s, ", tag.Key, tag.Key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// The table JOIN key always exists
|
|
||||||
selectLabels += "fingerprint"
|
|
||||||
|
|
||||||
filterSubQuery := fmt.Sprintf(
|
|
||||||
"SELECT DISTINCT %s FROM %s.%s WHERE %s",
|
|
||||||
selectLabels,
|
|
||||||
constants.SIGNOZ_METRIC_DBNAME,
|
|
||||||
constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME,
|
|
||||||
whereClause,
|
|
||||||
)
|
|
||||||
|
|
||||||
return filterSubQuery, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// PrepareMetricQuery prepares the query to be used for fetching metrics
|
// PrepareMetricQuery prepares the query to be used for fetching metrics
|
||||||
// from the database
|
// from the database
|
||||||
// start and end are in milliseconds
|
// start and end are in milliseconds
|
||||||
// step is in seconds
|
// step is in seconds
|
||||||
func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options metricsV3.Options) (string, error) {
|
func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options metricsV3.Options) (string, error) {
|
||||||
|
|
||||||
// TODO(srikanthcc): implement
|
start, end = common.AdjustedMetricTimeRange(start, end, mq.StepInterval, mq.TimeAggregation)
|
||||||
return "", nil
|
|
||||||
|
groupBy := helpers.GroupByAttributeKeyTags(mq.GroupBy...)
|
||||||
|
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
|
||||||
|
|
||||||
|
if mq.Quantile != 0 {
|
||||||
|
// If quantile is set, we need to group by le
|
||||||
|
// and set the space aggregation to sum
|
||||||
|
// and time aggregation to rate
|
||||||
|
mq.TimeAggregation = v3.TimeAggregationRate
|
||||||
|
mq.SpaceAggregation = v3.SpaceAggregationSum
|
||||||
|
mq.GroupBy = append(mq.GroupBy, v3.AttributeKey{
|
||||||
|
Key: "le",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
var query string
|
||||||
|
var err error
|
||||||
|
if mq.Temporality == v3.Delta {
|
||||||
|
if panelType == v3.PanelTypeTable {
|
||||||
|
query, err = delta.PrepareMetricQueryDeltaTable(start, end, mq.StepInterval, mq)
|
||||||
|
} else {
|
||||||
|
query, err = delta.PrepareMetricQueryDeltaTimeSeries(start, end, mq.StepInterval, mq)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if panelType == v3.PanelTypeTable {
|
||||||
|
query, err = cumulative.PrepareMetricQueryCumulativeTable(start, end, mq.StepInterval, mq)
|
||||||
|
} else {
|
||||||
|
query, err = cumulative.PrepareMetricQueryCumulativeTimeSeries(start, end, mq.StepInterval, mq)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if mq.Quantile != 0 {
|
||||||
|
query = fmt.Sprintf(`SELECT %s, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s`, groupBy, mq.Quantile, query, groupBy, orderBy)
|
||||||
|
}
|
||||||
|
|
||||||
|
return query, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func BuildPromQuery(promQuery *v3.PromQuery, step, start, end int64) *model.QueryRangeParams {
|
func BuildPromQuery(promQuery *v3.PromQuery, step, start, end int64) *model.QueryRangeParams {
|
||||||
|
@ -4,6 +4,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
|
||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -142,7 +144,385 @@ func TestPrepareTimeseriesFilterQuery(t *testing.T) {
|
|||||||
|
|
||||||
for _, testCase := range testCases {
|
for _, testCase := range testCases {
|
||||||
t.Run(testCase.name, func(t *testing.T) {
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
query, err := PrepareTimeseriesFilterQuery(testCase.builderQuery)
|
query, err := helpers.PrepareTimeseriesFilterQuery(testCase.builderQuery)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Contains(t, query, testCase.expectedQueryContains)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrepareMetricQueryCumulativeRate(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
builderQuery *v3.BuilderQuery
|
||||||
|
expectedQueryContains string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative",
|
||||||
|
builderQuery: &v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "signoz_calls_total",
|
||||||
|
},
|
||||||
|
Temporality: v3.Cumulative,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "service_name",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
},
|
||||||
|
Operator: v3.FilterOperatorContains,
|
||||||
|
Value: "frontend",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
GroupBy: []v3.AttributeKey{{
|
||||||
|
Key: "service_name",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
}},
|
||||||
|
Expression: "A",
|
||||||
|
Disabled: false,
|
||||||
|
TimeAggregation: v3.TimeAggregationRate,
|
||||||
|
SpaceAggregation: v3.SpaceAggregationSum,
|
||||||
|
},
|
||||||
|
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative, multiple group by",
|
||||||
|
builderQuery: &v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "signoz_calls_total",
|
||||||
|
},
|
||||||
|
Temporality: v3.Cumulative,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{},
|
||||||
|
},
|
||||||
|
GroupBy: []v3.AttributeKey{
|
||||||
|
{
|
||||||
|
Key: "service_name",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: "endpoint",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Expression: "A",
|
||||||
|
Disabled: false,
|
||||||
|
TimeAggregation: v3.TimeAggregationRate,
|
||||||
|
SpaceAggregation: v3.SpaceAggregationSum,
|
||||||
|
},
|
||||||
|
expectedQueryContains: "SELECT service_name, endpoint, ts, sum(per_series_value) as value FROM (SELECT service_name, endpoint, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(endpoint) as endpoint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, endpoint, ts), (service_name, endpoint) ) ORDER BY service_name ASC, endpoint ASC, ts ASC",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, testCase := range testCases {
|
||||||
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
|
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Contains(t, query, testCase.expectedQueryContains)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrepareMetricQueryDeltaRate(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
builderQuery *v3.BuilderQuery
|
||||||
|
expectedQueryContains string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "test time aggregation = rate, space aggregation = sum, temporality = delta, no group by",
|
||||||
|
builderQuery: &v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "signoz_calls_total",
|
||||||
|
},
|
||||||
|
Temporality: v3.Delta,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{},
|
||||||
|
},
|
||||||
|
Expression: "A",
|
||||||
|
Disabled: false,
|
||||||
|
TimeAggregation: v3.TimeAggregationRate,
|
||||||
|
SpaceAggregation: v3.SpaceAggregationSum,
|
||||||
|
},
|
||||||
|
expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts ASC",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "test time aggregation = rate, space aggregation = sum, temporality = delta, group by service_name",
|
||||||
|
builderQuery: &v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "signoz_calls_total",
|
||||||
|
},
|
||||||
|
Temporality: v3.Delta,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{},
|
||||||
|
},
|
||||||
|
GroupBy: []v3.AttributeKey{{
|
||||||
|
Key: "service_name",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
}},
|
||||||
|
Expression: "A",
|
||||||
|
Disabled: false,
|
||||||
|
TimeAggregation: v3.TimeAggregationRate,
|
||||||
|
SpaceAggregation: v3.SpaceAggregationSum,
|
||||||
|
},
|
||||||
|
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, testCase := range testCases {
|
||||||
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
|
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Contains(t, query, testCase.expectedQueryContains)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
builderQuery *v3.BuilderQuery
|
||||||
|
expectedQueryContains string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "test temporality = cumulative, quantile = 0.99",
|
||||||
|
builderQuery: &v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "signoz_latency_bucket",
|
||||||
|
},
|
||||||
|
Temporality: v3.Cumulative,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "service_name",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
},
|
||||||
|
Operator: v3.FilterOperatorContains,
|
||||||
|
Value: "frontend",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
GroupBy: []v3.AttributeKey{{
|
||||||
|
Key: "service_name",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
}},
|
||||||
|
Expression: "A",
|
||||||
|
Disabled: false,
|
||||||
|
Quantile: 0.99,
|
||||||
|
},
|
||||||
|
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "test temporality = cumulative, quantile = 0.99 without group by",
|
||||||
|
builderQuery: &v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "signoz_latency_bucket",
|
||||||
|
},
|
||||||
|
Temporality: v3.Cumulative,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "service_name",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
},
|
||||||
|
Operator: v3.FilterOperatorContains,
|
||||||
|
Value: "frontend",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Expression: "A",
|
||||||
|
Disabled: false,
|
||||||
|
Quantile: 0.99,
|
||||||
|
},
|
||||||
|
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, testCase := range testCases {
|
||||||
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
|
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Contains(t, query, testCase.expectedQueryContains)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrepreMetricQueryDeltaQuantile(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
builderQuery *v3.BuilderQuery
|
||||||
|
expectedQueryContains string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "test temporality = delta, quantile = 0.99 group by service_name",
|
||||||
|
builderQuery: &v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "signoz_latency_bucket",
|
||||||
|
},
|
||||||
|
Temporality: v3.Delta,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "service_name",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
},
|
||||||
|
Operator: v3.FilterOperatorContains,
|
||||||
|
Value: "frontend",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
GroupBy: []v3.AttributeKey{{
|
||||||
|
Key: "service_name",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
}},
|
||||||
|
Expression: "A",
|
||||||
|
Disabled: false,
|
||||||
|
Quantile: 0.99,
|
||||||
|
},
|
||||||
|
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "test temporality = delta, quantile = 0.99 no group by",
|
||||||
|
builderQuery: &v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "signoz_latency_bucket",
|
||||||
|
},
|
||||||
|
Temporality: v3.Delta,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{
|
||||||
|
{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: "service_name",
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
},
|
||||||
|
Operator: v3.FilterOperatorContains,
|
||||||
|
Value: "frontend",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Expression: "A",
|
||||||
|
Disabled: false,
|
||||||
|
Quantile: 0.99,
|
||||||
|
},
|
||||||
|
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, testCase := range testCases {
|
||||||
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
|
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Contains(t, query, testCase.expectedQueryContains)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrepareMetricQueryGauge(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
builderQuery *v3.BuilderQuery
|
||||||
|
expectedQueryContains string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "test gauge query with no group by",
|
||||||
|
builderQuery: &v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "system_cpu_usage",
|
||||||
|
},
|
||||||
|
Temporality: v3.Unspecified,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{},
|
||||||
|
},
|
||||||
|
Expression: "A",
|
||||||
|
TimeAggregation: v3.TimeAggregationAvg,
|
||||||
|
SpaceAggregation: v3.SpaceAggregationSum,
|
||||||
|
Disabled: false,
|
||||||
|
},
|
||||||
|
expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified') as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "test gauge query with group by host_name",
|
||||||
|
builderQuery: &v3.BuilderQuery{
|
||||||
|
QueryName: "A",
|
||||||
|
StepInterval: 60,
|
||||||
|
DataSource: v3.DataSourceMetrics,
|
||||||
|
AggregateAttribute: v3.AttributeKey{
|
||||||
|
Key: "system_cpu_usage",
|
||||||
|
},
|
||||||
|
Temporality: v3.Unspecified,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{},
|
||||||
|
},
|
||||||
|
GroupBy: []v3.AttributeKey{{
|
||||||
|
Key: "host_name",
|
||||||
|
DataType: v3.AttributeKeyDataTypeString,
|
||||||
|
Type: v3.AttributeKeyTypeTag,
|
||||||
|
}},
|
||||||
|
TimeAggregation: v3.TimeAggregationAvg,
|
||||||
|
SpaceAggregation: v3.SpaceAggregationSum,
|
||||||
|
Expression: "A",
|
||||||
|
Disabled: false,
|
||||||
|
},
|
||||||
|
expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified') as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (host_name, ts), (host_name) ) ORDER BY host_name ASC, ts ASC",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, testCase := range testCases {
|
||||||
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
|
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.Contains(t, query, testCase.expectedQueryContains)
|
assert.Contains(t, query, testCase.expectedQueryContains)
|
||||||
})
|
})
|
||||||
|
@ -267,6 +267,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) {
|
|||||||
api.RegisterMetricsRoutes(r, am)
|
api.RegisterMetricsRoutes(r, am)
|
||||||
api.RegisterLogsRoutes(r, am)
|
api.RegisterLogsRoutes(r, am)
|
||||||
api.RegisterQueryRangeV3Routes(r, am)
|
api.RegisterQueryRangeV3Routes(r, am)
|
||||||
|
api.RegisterQueryRangeV4Routes(r, am)
|
||||||
|
|
||||||
c := cors.New(cors.Options{
|
c := cors.New(cors.Options{
|
||||||
AllowedOrigins: []string{"*"},
|
AllowedOrigins: []string{"*"},
|
||||||
|
19
pkg/query-service/common/metrics.go
Normal file
19
pkg/query-service/common/metrics.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
func AdjustedMetricTimeRange(start, end, step int64, aggregaOperator v3.TimeAggregation) (int64, int64) {
|
||||||
|
start = start - (start % (step * 1000))
|
||||||
|
// if the query is a rate query, we adjust the start time by one more step
|
||||||
|
// so that we can calculate the rate for the first data point
|
||||||
|
if aggregaOperator.IsRateOperator() {
|
||||||
|
start -= step * 1000
|
||||||
|
}
|
||||||
|
adjustStep := int64(math.Min(float64(step), 60))
|
||||||
|
end = end - (end % (adjustStep * 1000))
|
||||||
|
return start, end
|
||||||
|
}
|
@ -462,6 +462,15 @@ const (
|
|||||||
TimeAggregationIncrease TimeAggregation = "increase"
|
TimeAggregationIncrease TimeAggregation = "increase"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (t TimeAggregation) IsRateOperator() bool {
|
||||||
|
switch t {
|
||||||
|
case TimeAggregationRate, TimeAggregationIncrease:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type SpaceAggregation string
|
type SpaceAggregation string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -500,6 +509,7 @@ type BuilderQuery struct {
|
|||||||
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
|
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
|
||||||
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
|
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
|
||||||
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
|
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
|
||||||
|
Quantile float64 `json:"quantile,omitempty"`
|
||||||
Functions []Function `json:"functions,omitempty"`
|
Functions []Function `json:"functions,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -517,9 +527,17 @@ func (b *BuilderQuery) Validate() error {
|
|||||||
if err := b.DataSource.Validate(); err != nil {
|
if err := b.DataSource.Validate(); err != nil {
|
||||||
return fmt.Errorf("data source is invalid: %w", err)
|
return fmt.Errorf("data source is invalid: %w", err)
|
||||||
}
|
}
|
||||||
|
if b.DataSource == DataSourceMetrics {
|
||||||
|
if b.TimeAggregation == TimeAggregationUnspecified && b.Quantile == 0 {
|
||||||
if err := b.AggregateOperator.Validate(); err != nil {
|
if err := b.AggregateOperator.Validate(); err != nil {
|
||||||
return fmt.Errorf("aggregate operator is invalid: %w", err)
|
return fmt.Errorf("aggregate operator is invalid: %w", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := b.AggregateOperator.Validate(); err != nil {
|
||||||
|
return fmt.Errorf("aggregate operator is invalid: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
if b.AggregateAttribute == (AttributeKey{}) && b.AggregateOperator.RequireAttribute(b.DataSource) {
|
if b.AggregateAttribute == (AttributeKey{}) && b.AggregateOperator.RequireAttribute(b.DataSource) {
|
||||||
return fmt.Errorf("aggregate attribute is required")
|
return fmt.Errorf("aggregate attribute is required")
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user