Merge branch 'develop' into GH-4325

This commit is contained in:
Vikrant Gupta 2024-01-16 17:40:28 +05:30 committed by GitHub
commit 52750e5248
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 785 additions and 202 deletions

View File

@ -331,6 +331,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler) (*http.Server, e
apiHandler.RegisterMetricsRoutes(r, am)
apiHandler.RegisterLogsRoutes(r, am)
apiHandler.RegisterQueryRangeV3Routes(r, am)
apiHandler.RegisterQueryRangeV4Routes(r, am)
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},

View File

@ -29,6 +29,7 @@ import (
metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
"go.signoz.io/signoz/pkg/query-service/app/parser"
"go.signoz.io/signoz/pkg/query-service/app/querier"
querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/auth"
@ -78,6 +79,7 @@ type APIHandler struct {
featureFlags interfaces.FeatureLookup
ready func(http.HandlerFunc) http.HandlerFunc
querier interfaces.Querier
querierV2 interfaces.Querier
queryBuilder *queryBuilder.QueryBuilder
preferDelta bool
preferSpanMetrics bool
@ -142,7 +144,16 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
FeatureLookup: opts.FeatureFlags,
}
querierOptsV2 := querierV2.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
}
querier := querier.NewQuerier(querierOpts)
querierv2 := querierV2.NewQuerier(querierOptsV2)
aH := &APIHandler{
reader: opts.Reader,
@ -158,6 +169,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
featureFlags: opts.FeatureFlags,
LogsParsingPipelineController: opts.LogsParsingPipelineController,
querier: querier,
querierV2: querierv2,
}
builderOpts := queryBuilder.QueryBuilderOptions{
@ -320,6 +332,11 @@ func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMid
subRouter.HandleFunc("/logs/livetail", am.ViewAccess(aH.liveTailLogs)).Methods(http.MethodGet)
}
func (aH *APIHandler) RegisterQueryRangeV4Routes(router *mux.Router, am *AuthMiddleware) {
subRouter := router.PathPrefix("/api/v4").Subrouter()
subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeV4)).Methods(http.MethodPost)
}
func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) {
writeHttpResponse(w, data)
}
@ -542,7 +559,7 @@ func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParam
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
for name := range qp.CompositeQuery.BuilderQueries {
query := qp.CompositeQuery.BuilderQueries[name]
if query.DataSource == v3.DataSourceMetrics {
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
if aH.preferDelta && metricNameToTemporality[query.AggregateAttribute.Key][v3.Delta] {
query.Temporality = v3.Delta
} else if metricNameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] {
@ -3241,3 +3258,67 @@ func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) {
}
}
}
func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) {
var result []*v3.Result
var err error
var errQuriesByName map[string]string
var spanKeys map[string]v3.AttributeKey
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
// check if any enrichment is required for logs if yes then enrich them
if logsv3.EnrichmentRequired(queryRangeParams) {
// get the fields if any logs query is present
var fields map[string]v3.AttributeKey
fields, err = aH.getLogFieldsV3(ctx, queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
logsv3.Enrich(queryRangeParams, fields)
}
spanKeys, err = aH.getSpanKeysV3(ctx, queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
}
result, err, errQuriesByName = aH.querierV2.QueryRange(ctx, queryRangeParams, spanKeys)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}
func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) {
queryRangeParams, apiErrorObj := ParseQueryRangeParams(r)
if apiErrorObj != nil {
zap.S().Errorf(apiErrorObj.Err.Error())
RespondError(w, apiErrorObj, nil)
return
}
// add temporality for each metric
temporalityErr := aH.addTemporality(r.Context(), queryRangeParams)
if temporalityErr != nil {
zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr)
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
return
}
aH.queryRangeV4(r.Context(), queryRangeParams, w, r)
}

View File

@ -3,11 +3,12 @@ package cumulative
import (
"fmt"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
// prepareMetricQueryTable prepares the query to be used for fetching metrics
func prepareMetricQueryTable(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
// PrepareMetricQueryCumulativeTable prepares the query to be used for fetching metrics
func PrepareMetricQueryCumulativeTable(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
var query string
temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq)
@ -15,9 +16,9 @@ func prepareMetricQueryTable(start, end, step int64, mq *v3.BuilderQuery) (strin
return "", err
}
groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...)
orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
selectLabels := groupByAttributeKeyTags(mq.GroupBy...)
groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...)
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
selectLabels := helpers.GroupByAttributeKeyTags(mq.GroupBy...)
queryTmpl :=
"SELECT %s," +

View File

@ -99,7 +99,7 @@ func TestPrepareTableQuery(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query, err := prepareMetricQueryTable(
query, err := PrepareMetricQueryCumulativeTable(
testCase.start,
testCase.end,
testCase.builderQuery.StepInterval,

View File

@ -3,7 +3,7 @@ package cumulative
import (
"fmt"
v4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
@ -107,7 +107,7 @@ const (
func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
var subQuery string
timeSeriesSubQuery, err := v4.PrepareTimeseriesFilterQuery(mq)
timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq)
if err != nil {
return "", err
}
@ -127,15 +127,8 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
" GROUP BY fingerprint, ts" +
" ORDER BY fingerprint, ts"
var selectLabelsAny string
for _, tag := range mq.GroupBy {
selectLabelsAny += fmt.Sprintf("any(%s) as %s,", tag.Key, tag.Key)
}
var selectLabels string
for _, tag := range mq.GroupBy {
selectLabels += tag.Key + ","
}
selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy)
selectLabels := helpers.SelectLabels(mq.GroupBy)
switch mq.TimeAggregation {
case v3.TimeAggregationAvg:
@ -177,8 +170,8 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
return subQuery, nil
}
// prepareMetricQueryCumulativeTimeSeries prepares the query to be used for fetching metrics
func prepareMetricQueryCumulativeTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
// PrepareMetricQueryCumulativeTimeSeries prepares the query to be used for fetching metrics
func PrepareMetricQueryCumulativeTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
var query string
temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq)
@ -186,9 +179,9 @@ func prepareMetricQueryCumulativeTimeSeries(start, end, step int64, mq *v3.Build
return "", err
}
groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...)
orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
selectLabels := groupByAttributeKeyTags(mq.GroupBy...)
groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...)
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
selectLabels := helpers.GroupByAttributeKeyTags(mq.GroupBy...)
queryTmpl :=
"SELECT %s," +

View File

@ -216,7 +216,7 @@ func TestPrepareTimeseriesQuery(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query, err := prepareMetricQueryCumulativeTimeSeries(
query, err := PrepareMetricQueryCumulativeTimeSeries(
testCase.start,
testCase.end,
testCase.builderQuery.StepInterval,

View File

@ -1,61 +0,0 @@
package delta
import (
"fmt"
"strings"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
// groupingSets returns a string of comma separated tags for group by clause
// `ts` is always added to the group by clause
func groupingSets(tags ...string) string {
withTs := append(tags, "ts")
if len(withTs) > 1 {
return fmt.Sprintf(`GROUPING SETS ( (%s), (%s) )`, strings.Join(withTs, ", "), strings.Join(tags, ", "))
} else {
return strings.Join(withTs, ", ")
}
}
// groupingSetsByAttributeKeyTags returns a string of comma separated tags for group by clause
func groupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string {
groupTags := []string{}
for _, tag := range tags {
groupTags = append(groupTags, tag.Key)
}
return groupingSets(groupTags...)
}
// groupBy returns a string of comma separated tags for group by clause
func groupByAttributeKeyTags(tags ...v3.AttributeKey) string {
groupTags := []string{}
for _, tag := range tags {
groupTags = append(groupTags, tag.Key)
}
groupTags = append(groupTags, "ts")
return strings.Join(groupTags, ", ")
}
// orderBy returns a string of comma separated tags for order by clause
// if the order is not specified, it defaults to ASC
func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string {
var orderBy []string
for _, tag := range tags {
found := false
for _, item := range items {
if item.ColumnName == tag.Key {
found = true
orderBy = append(orderBy, fmt.Sprintf("%s %s", item.ColumnName, item.Order))
break
}
}
if !found {
orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag.Key))
}
}
orderBy = append(orderBy, "ts ASC")
return strings.Join(orderBy, ", ")
}

View File

@ -3,11 +3,17 @@ package delta
import (
"fmt"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
// prepareMetricQueryDeltaTable builds the query to be used for fetching metrics
func prepareMetricQueryDeltaTable(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
// PrepareMetricQueryDeltaTable builds the query to be used for fetching metrics
func PrepareMetricQueryDeltaTable(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
if canShortCircuit(mq) {
return prepareQueryOptimized(start, end, step, mq)
}
var query string
temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq)
@ -15,9 +21,9 @@ func prepareMetricQueryDeltaTable(start, end, step int64, mq *v3.BuilderQuery) (
return "", err
}
groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...)
orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
selectLabels := groupByAttributeKeyTags(mq.GroupBy...)
groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...)
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
selectLabels := helpers.GroupByAttributeKeyTags(mq.GroupBy...)
queryTmpl :=
"SELECT %s," +

View File

@ -95,13 +95,13 @@ func TestPrepareTableQuery(t *testing.T) {
},
start: 1701794980000,
end: 1701796780000,
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query, err := prepareMetricQueryDeltaTable(
query, err := PrepareMetricQueryDeltaTable(
testCase.start,
testCase.end,
testCase.builderQuery.StepInterval,

View File

@ -210,13 +210,13 @@ func TestPrepareTimeseriesQuery(t *testing.T) {
},
start: 1701794980000,
end: 1701796780000,
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query, err := prepareMetricQueryDeltaTimeSeries(
query, err := PrepareMetricQueryDeltaTimeSeries(
testCase.start,
testCase.end,
testCase.builderQuery.StepInterval,

View File

@ -3,18 +3,18 @@ package delta
import (
"fmt"
v4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
)
// prepareTimeAggregationSubQueryTimeSeries builds the sub-query to be used for temporal aggregation
// prepareTimeAggregationSubQuery builds the sub-query to be used for temporal aggregation
func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
var subQuery string
timeSeriesSubQuery, err := v4.PrepareTimeseriesFilterQuery(mq)
timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq)
if err != nil {
return "", err
}
@ -34,15 +34,7 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
" GROUP BY fingerprint, ts" +
" ORDER BY fingerprint, ts"
var selectLabelsAny string
for _, tag := range mq.GroupBy {
selectLabelsAny += fmt.Sprintf("any(%s) as %s,", tag.Key, tag.Key)
}
var selectLabels string
for _, tag := range mq.GroupBy {
selectLabels += tag.Key + ","
}
selectLabelsAny := helpers.SelectLabelsAny(mq.GroupBy)
switch mq.TimeAggregation {
case v3.TimeAggregationAvg:
@ -76,8 +68,58 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
return subQuery, nil
}
// prepareMetricQueryDeltaTimeSeries builds the query to be used for fetching metrics
func prepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
// See `canShortCircuit` below for details
func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...)
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
selectLabels := helpers.SelectLabels(mq.GroupBy)
var query string
timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq)
if err != nil {
return "", err
}
samplesTableFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
// Select the aggregate value for interval
queryTmpl :=
"SELECT %s" +
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
" %s as value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
" INNER JOIN" +
" (%s) as filtered_time_series" +
" USING fingerprint" +
" WHERE " + samplesTableFilter +
" GROUP BY %s" +
" ORDER BY %s"
switch mq.SpaceAggregation {
case v3.SpaceAggregationSum:
op := "sum(value)"
if mq.TimeAggregation == v3.TimeAggregationRate {
op = "sum(value)/" + fmt.Sprintf("%d", step)
}
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
case v3.SpaceAggregationMin:
op := "min(value)"
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
case v3.SpaceAggregationMax:
op := "max(value)"
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
}
return query, nil
}
// PrepareMetricQueryDeltaTimeSeries builds the query to be used for fetching metrics
func PrepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
if canShortCircuit(mq) {
return prepareQueryOptimized(start, end, step, mq)
}
var query string
@ -86,9 +128,9 @@ func prepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQue
return "", err
}
groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...)
orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
selectLabels := groupByAttributeKeyTags(mq.GroupBy...)
groupBy := helpers.GroupingSetsByAttributeKeyTags(mq.GroupBy...)
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
selectLabels := helpers.GroupByAttributeKeyTags(mq.GroupBy...)
queryTmpl :=
"SELECT %s," +
@ -118,3 +160,37 @@ func prepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQue
return query, nil
}
// canShortCircuit returns true if we can use the optimized query
// for the given query
// This is used to avoid the group by fingerprint thus improving the performance
// for certain queries
// cases where we can short circuit:
// 1. time aggregation = (rate|increase) and space aggregation = sum
// - rate = sum(value)/step, increase = sum(value) - sum of sums is same as sum of all values
//
// 2. time aggregation = sum and space aggregation = sum
// - sum of sums is same as sum of all values
//
// 3. time aggregation = min and space aggregation = min
// - min of mins is same as min of all values
//
// 4. time aggregation = max and space aggregation = max
// - max of maxs is same as max of all values
//
// all of this is true only for delta metrics
func canShortCircuit(mq *v3.BuilderQuery) bool {
if (mq.TimeAggregation == v3.TimeAggregationRate || mq.TimeAggregation == v3.TimeAggregationIncrease) && mq.SpaceAggregation == v3.SpaceAggregationSum {
return true
}
if mq.TimeAggregation == v3.TimeAggregationSum && mq.SpaceAggregation == v3.SpaceAggregationSum {
return true
}
if mq.TimeAggregation == v3.TimeAggregationMin && mq.SpaceAggregation == v3.SpaceAggregationMin {
return true
}
if mq.TimeAggregation == v3.TimeAggregationMax && mq.SpaceAggregation == v3.SpaceAggregationMax {
return true
}
return false
}

View File

@ -1,4 +1,4 @@
package cumulative
package helpers
import (
"fmt"
@ -18,8 +18,8 @@ func groupingSets(tags ...string) string {
}
}
// groupingSetsByAttributeKeyTags returns a string of comma separated tags for group by clause
func groupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string {
// GroupingSetsByAttributeKeyTags returns a string of comma separated tags for group by clause
func GroupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string {
groupTags := []string{}
for _, tag := range tags {
groupTags = append(groupTags, tag.Key)
@ -27,8 +27,8 @@ func groupingSetsByAttributeKeyTags(tags ...v3.AttributeKey) string {
return groupingSets(groupTags...)
}
// groupBy returns a string of comma separated tags for group by clause
func groupByAttributeKeyTags(tags ...v3.AttributeKey) string {
// GroupByAttributeKeyTags returns a string of comma separated tags for group by clause
func GroupByAttributeKeyTags(tags ...v3.AttributeKey) string {
groupTags := []string{}
for _, tag := range tags {
groupTags = append(groupTags, tag.Key)
@ -37,9 +37,9 @@ func groupByAttributeKeyTags(tags ...v3.AttributeKey) string {
return strings.Join(groupTags, ", ")
}
// orderBy returns a string of comma separated tags for order by clause
// OrderByAttributeKeyTags returns a string of comma separated tags for order by clause
// if the order is not specified, it defaults to ASC
func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string {
func OrderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string {
var orderBy []string
for _, tag := range tags {
found := false
@ -59,3 +59,19 @@ func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string
return strings.Join(orderBy, ", ")
}
func SelectLabelsAny(tags []v3.AttributeKey) string {
var selectLabelsAny []string
for _, tag := range tags {
selectLabelsAny = append(selectLabelsAny, fmt.Sprintf("any(%s) as %s,", tag.Key, tag.Key))
}
return strings.Join(selectLabelsAny, " ")
}
func SelectLabels(tags []v3.AttributeKey) string {
var selectLabels []string
for _, tag := range tags {
selectLabels = append(selectLabels, fmt.Sprintf("%s,", tag.Key))
}
return strings.Join(selectLabels, " ")
}

View File

@ -0,0 +1,86 @@
package helpers
import (
"fmt"
"strings"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
)
// PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria
func PrepareTimeseriesFilterQuery(mq *v3.BuilderQuery) (string, error) {
var conditions []string
var fs *v3.FilterSet = mq.Filters
var groupTags []v3.AttributeKey = mq.GroupBy
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key)))
conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality))
if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
toFormat := item.Value
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains {
toFormat = fmt.Sprintf("%%%s%%", toFormat)
}
fmtVal := utils.ClickHouseFormattedValue(toFormat)
switch op {
case v3.FilterOperatorEqual:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal))
case v3.FilterOperatorNotEqual:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal))
case v3.FilterOperatorIn:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal))
case v3.FilterOperatorNotIn:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal))
case v3.FilterOperatorLike:
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorNotLike:
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorRegex:
conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorNotRegex:
conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorGreaterThan:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal))
case v3.FilterOperatorGreaterThanOrEq:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal))
case v3.FilterOperatorLessThan:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal))
case v3.FilterOperatorLessThanOrEq:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal))
case v3.FilterOperatorContains:
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorNotContains:
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorExists:
conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", item.Key.Key))
case v3.FilterOperatorNotExists:
conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", item.Key.Key))
default:
return "", fmt.Errorf("unsupported filter operator")
}
}
}
whereClause := strings.Join(conditions, " AND ")
var selectLabels string
for _, tag := range groupTags {
selectLabels += fmt.Sprintf("JSONExtractString(labels, '%s') as %s, ", tag.Key, tag.Key)
}
// The table JOIN key always exists
selectLabels += "fingerprint"
filterSubQuery := fmt.Sprintf(
"SELECT DISTINCT %s FROM %s.%s WHERE %s",
selectLabels,
constants.SIGNOZ_METRIC_DBNAME,
constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME,
whereClause,
)
return filterSubQuery, nil
}

View File

@ -2,100 +2,66 @@ package v4
import (
"fmt"
"strings"
"time"
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/cumulative"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/delta"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
)
// PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria
func PrepareTimeseriesFilterQuery(mq *v3.BuilderQuery) (string, error) {
var conditions []string
var fs *v3.FilterSet = mq.Filters
var groupTags []v3.AttributeKey = mq.GroupBy
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key)))
conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality))
if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
toFormat := item.Value
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains {
toFormat = fmt.Sprintf("%%%s%%", toFormat)
}
fmtVal := utils.ClickHouseFormattedValue(toFormat)
switch op {
case v3.FilterOperatorEqual:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal))
case v3.FilterOperatorNotEqual:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal))
case v3.FilterOperatorIn:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal))
case v3.FilterOperatorNotIn:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal))
case v3.FilterOperatorLike:
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorNotLike:
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorRegex:
conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorNotRegex:
conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorGreaterThan:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal))
case v3.FilterOperatorGreaterThanOrEq:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal))
case v3.FilterOperatorLessThan:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal))
case v3.FilterOperatorLessThanOrEq:
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal))
case v3.FilterOperatorContains:
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorNotContains:
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
case v3.FilterOperatorExists:
conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", item.Key.Key))
case v3.FilterOperatorNotExists:
conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", item.Key.Key))
default:
return "", fmt.Errorf("unsupported filter operator")
}
}
}
whereClause := strings.Join(conditions, " AND ")
var selectLabels string
for _, tag := range groupTags {
selectLabels += fmt.Sprintf("JSONExtractString(labels, '%s') as %s, ", tag.Key, tag.Key)
}
// The table JOIN key always exists
selectLabels += "fingerprint"
filterSubQuery := fmt.Sprintf(
"SELECT DISTINCT %s FROM %s.%s WHERE %s",
selectLabels,
constants.SIGNOZ_METRIC_DBNAME,
constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME,
whereClause,
)
return filterSubQuery, nil
}
// PrepareMetricQuery prepares the query to be used for fetching metrics
// from the database
// start and end are in milliseconds
// step is in seconds
func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options metricsV3.Options) (string, error) {
// TODO(srikanthcc): implement
return "", nil
start, end = common.AdjustedMetricTimeRange(start, end, mq.StepInterval, mq.TimeAggregation)
groupBy := helpers.GroupByAttributeKeyTags(mq.GroupBy...)
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
if mq.Quantile != 0 {
// If quantile is set, we need to group by le
// and set the space aggregation to sum
// and time aggregation to rate
mq.TimeAggregation = v3.TimeAggregationRate
mq.SpaceAggregation = v3.SpaceAggregationSum
mq.GroupBy = append(mq.GroupBy, v3.AttributeKey{
Key: "le",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
})
}
var query string
var err error
if mq.Temporality == v3.Delta {
if panelType == v3.PanelTypeTable {
query, err = delta.PrepareMetricQueryDeltaTable(start, end, mq.StepInterval, mq)
} else {
query, err = delta.PrepareMetricQueryDeltaTimeSeries(start, end, mq.StepInterval, mq)
}
} else {
if panelType == v3.PanelTypeTable {
query, err = cumulative.PrepareMetricQueryCumulativeTable(start, end, mq.StepInterval, mq)
} else {
query, err = cumulative.PrepareMetricQueryCumulativeTimeSeries(start, end, mq.StepInterval, mq)
}
}
if err != nil {
return "", err
}
if mq.Quantile != 0 {
query = fmt.Sprintf(`SELECT %s, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s`, groupBy, mq.Quantile, query, groupBy, orderBy)
}
return query, nil
}
func BuildPromQuery(promQuery *v3.PromQuery, step, start, end int64) *model.QueryRangeParams {

View File

@ -4,6 +4,8 @@ import (
"testing"
"github.com/stretchr/testify/assert"
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
@ -142,7 +144,385 @@ func TestPrepareTimeseriesFilterQuery(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query, err := PrepareTimeseriesFilterQuery(testCase.builderQuery)
query, err := helpers.PrepareTimeseriesFilterQuery(testCase.builderQuery)
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}
func TestPrepareMetricQueryCumulativeRate(t *testing.T) {
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
expectedQueryContains string
}{
{
name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_calls_total",
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorContains,
Value: "frontend",
},
},
},
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
}},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
},
{
name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative, multiple group by",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_calls_total",
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{
{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
{
Key: "endpoint",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
expectedQueryContains: "SELECT service_name, endpoint, ts, sum(per_series_value) as value FROM (SELECT service_name, endpoint, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(endpoint) as endpoint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, endpoint, ts), (service_name, endpoint) ) ORDER BY service_name ASC, endpoint ASC, ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}
func TestPrepareMetricQueryDeltaRate(t *testing.T) {
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
expectedQueryContains string
}{
{
name: "test time aggregation = rate, space aggregation = sum, temporality = delta, no group by",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_calls_total",
},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts ASC",
},
{
name: "test time aggregation = rate, space aggregation = sum, temporality = delta, group by service_name",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_calls_total",
},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
}},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}
func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) {
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
expectedQueryContains string
}{
{
name: "test temporality = cumulative, quantile = 0.99",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_latency_bucket",
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorContains,
Value: "frontend",
},
},
},
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
}},
Expression: "A",
Disabled: false,
Quantile: 0.99,
},
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
},
{
name: "test temporality = cumulative, quantile = 0.99 without group by",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_latency_bucket",
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorContains,
Value: "frontend",
},
},
},
Expression: "A",
Disabled: false,
Quantile: 0.99,
},
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}
func TestPrepreMetricQueryDeltaQuantile(t *testing.T) {
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
expectedQueryContains string
}{
{
name: "test temporality = delta, quantile = 0.99 group by service_name",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_latency_bucket",
},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorContains,
Value: "frontend",
},
},
},
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
}},
Expression: "A",
Disabled: false,
Quantile: 0.99,
},
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
},
{
name: "test temporality = delta, quantile = 0.99 no group by",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_latency_bucket",
},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorContains,
Value: "frontend",
},
},
},
Expression: "A",
Disabled: false,
Quantile: 0.99,
},
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}
func TestPrepareMetricQueryGauge(t *testing.T) {
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
expectedQueryContains string
}{
{
name: "test gauge query with no group by",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "system_cpu_usage",
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
Expression: "A",
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified') as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC",
},
{
name: "test gauge query with group by host_name",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "system_cpu_usage",
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{{
Key: "host_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
}},
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Expression: "A",
Disabled: false,
},
expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified') as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (host_name, ts), (host_name) ) ORDER BY host_name ASC, ts ASC",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})

View File

@ -267,6 +267,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) {
api.RegisterMetricsRoutes(r, am)
api.RegisterLogsRoutes(r, am)
api.RegisterQueryRangeV3Routes(r, am)
api.RegisterQueryRangeV4Routes(r, am)
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},

View File

@ -0,0 +1,19 @@
package common
import (
"math"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func AdjustedMetricTimeRange(start, end, step int64, aggregaOperator v3.TimeAggregation) (int64, int64) {
start = start - (start % (step * 1000))
// if the query is a rate query, we adjust the start time by one more step
// so that we can calculate the rate for the first data point
if aggregaOperator.IsRateOperator() {
start -= step * 1000
}
adjustStep := int64(math.Min(float64(step), 60))
end = end - (end % (adjustStep * 1000))
return start, end
}

View File

@ -462,6 +462,15 @@ const (
TimeAggregationIncrease TimeAggregation = "increase"
)
func (t TimeAggregation) IsRateOperator() bool {
switch t {
case TimeAggregationRate, TimeAggregationIncrease:
return true
default:
return false
}
}
type SpaceAggregation string
const (
@ -500,6 +509,7 @@ type BuilderQuery struct {
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
Quantile float64 `json:"quantile,omitempty"`
Functions []Function `json:"functions,omitempty"`
}
@ -517,8 +527,16 @@ func (b *BuilderQuery) Validate() error {
if err := b.DataSource.Validate(); err != nil {
return fmt.Errorf("data source is invalid: %w", err)
}
if err := b.AggregateOperator.Validate(); err != nil {
return fmt.Errorf("aggregate operator is invalid: %w", err)
if b.DataSource == DataSourceMetrics {
if b.TimeAggregation == TimeAggregationUnspecified && b.Quantile == 0 {
if err := b.AggregateOperator.Validate(); err != nil {
return fmt.Errorf("aggregate operator is invalid: %w", err)
}
}
} else {
if err := b.AggregateOperator.Validate(); err != nil {
return fmt.Errorf("aggregate operator is invalid: %w", err)
}
}
if b.AggregateAttribute == (AttributeKey{}) && b.AggregateOperator.RequireAttribute(b.DataSource) {
return fmt.Errorf("aggregate attribute is required")