mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-07-31 12:52:00 +08:00
chore: query range v3 metrics use v4 tables (#5021)
This commit is contained in:
parent
12be6ce020
commit
de497bf5b6
@ -72,11 +72,17 @@ const (
|
||||
signozTraceTableName = "distributed_signoz_index_v2"
|
||||
signozTraceLocalTableName = "signoz_index_v2"
|
||||
signozMetricDBName = "signoz_metrics"
|
||||
signozSampleLocalTableName = "samples_v2"
|
||||
signozSampleTableName = "distributed_samples_v2"
|
||||
signozTSTableName = "distributed_time_series_v2"
|
||||
signozTSTableNameV4 = "distributed_time_series_v4"
|
||||
signozTSTableNameV41Day = "distributed_time_series_v4_1day"
|
||||
signozSampleLocalTableName = "samples_v4"
|
||||
signozSampleTableName = "distributed_samples_v4"
|
||||
|
||||
signozTSLocalTableNameV4 = "time_series_v4"
|
||||
signozTSTableNameV4 = "distributed_time_series_v4"
|
||||
|
||||
signozTSLocalTableNameV46Hrs = "time_series_v4_6hrs"
|
||||
signozTSTableNameV46Hrs = "distributed_time_series_v4_6hrs"
|
||||
|
||||
signozTSLocalTableNameV41Day = "time_series_v4_1day"
|
||||
signozTSTableNameV41Day = "distributed_time_series_v4_1day"
|
||||
|
||||
minTimespanForProgressiveSearch = time.Hour
|
||||
minTimespanForProgressiveSearchMargin = time.Minute
|
||||
@ -2382,15 +2388,17 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
||||
}
|
||||
|
||||
case constants.MetricsTTL:
|
||||
tableName := signozMetricDBName + "." + signozSampleLocalTableName
|
||||
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
|
||||
tableNames := []string{signozMetricDBName + "." + signozSampleLocalTableName, signozMetricDBName + "." + signozTSLocalTableNameV4, signozMetricDBName + "." + signozTSLocalTableNameV46Hrs, signozMetricDBName + "." + signozTSLocalTableNameV41Day}
|
||||
for _, tableName := range tableNames {
|
||||
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
|
||||
}
|
||||
if statusItem.Status == constants.StatusPending {
|
||||
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
|
||||
}
|
||||
}
|
||||
if statusItem.Status == constants.StatusPending {
|
||||
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
|
||||
}
|
||||
go func(tableName string) {
|
||||
metricTTL := func(tableName string) {
|
||||
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration)
|
||||
if dbErr != nil {
|
||||
zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr))
|
||||
@ -2434,7 +2442,10 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
||||
zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr))
|
||||
return
|
||||
}
|
||||
}(tableName)
|
||||
}
|
||||
for _, tableName := range tableNames {
|
||||
go metricTTL(tableName)
|
||||
}
|
||||
case constants.LogsTTL:
|
||||
tableName := r.logsDB + "." + r.logsLocalTable
|
||||
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||
@ -3259,7 +3270,7 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s
|
||||
|
||||
func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) {
|
||||
|
||||
queryStr := fmt.Sprintf("SELECT count() as count from %s.%s where metric_name not like 'signoz_%%' group by metric_name order by count desc;", signozMetricDBName, signozTSTableName)
|
||||
queryStr := fmt.Sprintf("SELECT countDistinct(fingerprint) as count from %s.%s where metric_name not like 'signoz_%%' group by metric_name order by count desc;", signozMetricDBName, signozTSTableNameV41Day)
|
||||
|
||||
rows, _ := r.db.Query(ctx, queryStr)
|
||||
|
||||
|
@ -483,49 +483,7 @@ func (aH *APIHandler) getRule(w http.ResponseWriter, r *http.Request) {
|
||||
aH.Respond(w, ruleResponse)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
|
||||
|
||||
metricNames := make([]string, 0)
|
||||
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
|
||||
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
|
||||
for _, query := range qp.CompositeQuery.BuilderQueries {
|
||||
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
|
||||
metricNames = append(metricNames, query.AggregateAttribute.Key)
|
||||
if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok {
|
||||
metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
|
||||
if aH.preferDelta {
|
||||
zap.L().Debug("fetching metric temporality")
|
||||
metricNameToTemporality, err = aH.reader.FetchTemporality(ctx, metricNames)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
|
||||
for name := range qp.CompositeQuery.BuilderQueries {
|
||||
query := qp.CompositeQuery.BuilderQueries[name]
|
||||
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
|
||||
if aH.preferDelta && metricNameToTemporality[query.AggregateAttribute.Key][v3.Delta] {
|
||||
query.Temporality = v3.Delta
|
||||
} else if metricNameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] {
|
||||
query.Temporality = v3.Cumulative
|
||||
} else {
|
||||
query.Temporality = v3.Unspecified
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// populateTemporality same as addTemporality but for v4 and better
|
||||
// populateTemporality adds the temporality to the query if it is not present
|
||||
func (aH *APIHandler) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
|
||||
|
||||
missingTemporality := make([]string, 0)
|
||||
@ -3320,8 +3278,7 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// add temporality for each metric
|
||||
|
||||
temporalityErr := aH.addTemporality(r.Context(), queryRangeParams)
|
||||
temporalityErr := aH.populateTemporality(r.Context(), queryRangeParams)
|
||||
if temporalityErr != nil {
|
||||
zap.L().Error("Error while adding temporality for metrics", zap.Error(temporalityErr))
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"math"
|
||||
"strings"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
@ -28,7 +29,7 @@ func stepForTableCumulative(start, end int64) int64 {
|
||||
return int64(step)
|
||||
}
|
||||
|
||||
func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableName string) (string, error) {
|
||||
func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery) (string, error) {
|
||||
|
||||
step := stepForTableCumulative(start, end)
|
||||
|
||||
@ -36,19 +37,19 @@ func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableNam
|
||||
|
||||
metricQueryGroupBy := mq.GroupBy
|
||||
|
||||
filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq)
|
||||
filterSubQuery, err := helpers.PrepareTimeseriesFilterQueryV3(start, end, mq)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
|
||||
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
|
||||
|
||||
// Select the aggregate value for interval
|
||||
queryTmplCounterInner :=
|
||||
"SELECT %s" +
|
||||
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
|
||||
" toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
|
||||
" %s as value" +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME +
|
||||
" INNER JOIN" +
|
||||
" (%s) as filtered_time_series" +
|
||||
" USING fingerprint" +
|
||||
@ -61,7 +62,7 @@ func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableNam
|
||||
"SELECT %s" +
|
||||
" toStartOfHour(now()) as ts," + // now() has no menaing & used as a placeholder for ts
|
||||
" %s as value" +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME +
|
||||
" INNER JOIN" +
|
||||
" (%s) as filtered_time_series" +
|
||||
" USING fingerprint" +
|
||||
|
@ -38,7 +38,7 @@ func TestPanelTableForCumulative(t *testing.T) {
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts",
|
||||
expected: "SELECT toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Cumulative' AND unix_milli >= 1689253200000 AND unix_milli < 1689257640000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1689255866000 AND unix_milli <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "latency p50",
|
||||
@ -66,7 +66,7 @@ func TestPanelTableForCumulative(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, le ORDER BY fingerprint, le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
|
||||
expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1689253200000 AND unix_milli < 1689257640000 AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1689255866000 AND unix_milli <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, le ORDER BY fingerprint, le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "latency p99 with group by",
|
||||
@ -88,13 +88,13 @@ func TestPanelTableForCumulative(t *testing.T) {
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT service_name,le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, service_name,le ORDER BY fingerprint, service_name ASC,le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT service_name,le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1689253200000 AND unix_milli < 1689257640000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1689255866000 AND unix_milli <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, service_name,le ORDER BY fingerprint, service_name ASC,le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
query, err := buildMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query, "distributed_time_series_v2")
|
||||
query, err := buildMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v\n", err)
|
||||
}
|
||||
|
@ -3,12 +3,13 @@ package v3
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
)
|
||||
|
||||
func buildDeltaMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName string) (string, error) {
|
||||
func buildDeltaMetricQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
||||
|
||||
metricQueryGroupBy := mq.GroupBy
|
||||
|
||||
@ -30,19 +31,19 @@ func buildDeltaMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableNam
|
||||
}
|
||||
}
|
||||
|
||||
filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq)
|
||||
filterSubQuery, err := helpers.PrepareTimeseriesFilterQueryV3(start, end, mq)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
|
||||
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
|
||||
|
||||
// Select the aggregate value for interval
|
||||
queryTmpl :=
|
||||
"SELECT %s" +
|
||||
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
|
||||
" toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
|
||||
" %s as value" +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME +
|
||||
" INNER JOIN" +
|
||||
" (%s) as filtered_time_series" +
|
||||
" USING fingerprint" +
|
||||
@ -140,9 +141,9 @@ func buildDeltaMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableNam
|
||||
case v3.AggregateOperatorNoOp:
|
||||
queryTmpl :=
|
||||
"SELECT fingerprint, labels as fullLabels," +
|
||||
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
|
||||
" toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
|
||||
" any(value) as value" +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME +
|
||||
" INNER JOIN" +
|
||||
" (%s) as filtered_time_series" +
|
||||
" USING fingerprint" +
|
||||
|
@ -4,12 +4,13 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
)
|
||||
|
||||
func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableName string) (string, error) {
|
||||
func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery) (string, error) {
|
||||
|
||||
// round up to the nearest multiple of 60
|
||||
step := int64(math.Ceil(float64(end-start+1)/1000/60) * 60)
|
||||
@ -43,17 +44,17 @@ func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tab
|
||||
}
|
||||
}
|
||||
|
||||
filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq)
|
||||
filterSubQuery, err := helpers.PrepareTimeseriesFilterQueryV3(start, end, mq)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
|
||||
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
|
||||
|
||||
queryTmpl :=
|
||||
"SELECT %s toStartOfHour(now()) as ts," + // now() has no menaing & used as a placeholder for ts
|
||||
" %s as value" +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME +
|
||||
" INNER JOIN" +
|
||||
" (%s) as filtered_time_series" +
|
||||
" USING fingerprint" +
|
||||
|
@ -38,7 +38,7 @@ func TestPanelTableForDelta(t *testing.T) {
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY ts ORDER BY ts",
|
||||
expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1689253200000 AND unix_milli < 1689257640000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1689255866000 AND unix_milli <= 1689257640000 GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "latency p50",
|
||||
@ -61,7 +61,7 @@ func TestPanelTableForDelta(t *testing.T) {
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
|
||||
expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1689253200000 AND unix_milli < 1689257640000 AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1689255866000 AND unix_milli <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "latency p99 with group by",
|
||||
@ -80,13 +80,13 @@ func TestPanelTableForDelta(t *testing.T) {
|
||||
},
|
||||
Expression: "A",
|
||||
},
|
||||
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1689253200000 AND unix_milli < 1689257640000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1689255866000 AND unix_milli <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
query, err := buildDeltaMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query, "distributed_time_series_v2")
|
||||
query, err := buildDeltaMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
@ -51,109 +52,23 @@ var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{
|
||||
// See https://github.com/SigNoz/signoz/issues/2151#issuecomment-1467249056
|
||||
var rateWithoutNegative = `If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) `
|
||||
|
||||
// buildMetricsTimeSeriesFilterQuery builds the sub-query to be used for filtering
|
||||
// timeseries based on search criteria
|
||||
func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.AttributeKey, mq *v3.BuilderQuery) (string, error) {
|
||||
metricName := mq.AggregateAttribute.Key
|
||||
aggregateOperator := mq.AggregateOperator
|
||||
var conditions []string
|
||||
if mq.Temporality == v3.Delta {
|
||||
conditions = append(conditions, fmt.Sprintf("metric_name = %s AND temporality = '%s' ", utils.ClickHouseFormattedValue(metricName), v3.Delta))
|
||||
} else {
|
||||
conditions = append(conditions, fmt.Sprintf("metric_name = %s AND temporality IN ['%s', '%s']", utils.ClickHouseFormattedValue(metricName), v3.Cumulative, v3.Unspecified))
|
||||
}
|
||||
|
||||
if fs != nil && len(fs.Items) != 0 {
|
||||
for _, item := range fs.Items {
|
||||
toFormat := item.Value
|
||||
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
|
||||
// if the received value is an array for like/match op, just take the first value
|
||||
// or should we throw an error?
|
||||
if op == v3.FilterOperatorLike || op == v3.FilterOperatorRegex || op == v3.FilterOperatorNotLike || op == v3.FilterOperatorNotRegex {
|
||||
x, ok := item.Value.([]interface{})
|
||||
if ok {
|
||||
if len(x) == 0 {
|
||||
continue
|
||||
}
|
||||
toFormat = x[0]
|
||||
}
|
||||
}
|
||||
|
||||
if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains {
|
||||
toFormat = fmt.Sprintf("%%%s%%", toFormat)
|
||||
}
|
||||
fmtVal := utils.ClickHouseFormattedValue(toFormat)
|
||||
switch op {
|
||||
case v3.FilterOperatorEqual:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorNotEqual:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorIn:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorNotIn:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorLike:
|
||||
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorNotLike:
|
||||
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorRegex:
|
||||
conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorNotRegex:
|
||||
conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorGreaterThan:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorGreaterThanOrEq:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorLessThan:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorLessThanOrEq:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorContains:
|
||||
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorNotContains:
|
||||
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorExists:
|
||||
conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", item.Key.Key))
|
||||
case v3.FilterOperatorNotExists:
|
||||
conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", item.Key.Key))
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported operation")
|
||||
}
|
||||
}
|
||||
}
|
||||
queryString := strings.Join(conditions, " AND ")
|
||||
|
||||
var selectLabels string
|
||||
if aggregateOperator == v3.AggregateOperatorNoOp || aggregateOperator == v3.AggregateOperatorRate {
|
||||
selectLabels = "labels,"
|
||||
} else {
|
||||
for _, tag := range groupTags {
|
||||
selectLabels += fmt.Sprintf(" JSONExtractString(labels, '%s') as %s,", tag.Key, tag.Key)
|
||||
}
|
||||
}
|
||||
|
||||
filterSubQuery := fmt.Sprintf("SELECT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, queryString)
|
||||
|
||||
return filterSubQuery, nil
|
||||
}
|
||||
|
||||
func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName string) (string, error) {
|
||||
func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
|
||||
|
||||
metricQueryGroupBy := mq.GroupBy
|
||||
|
||||
filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq)
|
||||
filterSubQuery, err := helpers.PrepareTimeseriesFilterQueryV3(start, end, mq)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
|
||||
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
|
||||
|
||||
// Select the aggregate value for interval
|
||||
queryTmpl :=
|
||||
"SELECT %s" +
|
||||
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
|
||||
" toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
|
||||
" %s as value" +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME +
|
||||
" INNER JOIN" +
|
||||
" (%s) as filtered_time_series" +
|
||||
" USING fingerprint" +
|
||||
@ -282,9 +197,9 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str
|
||||
case v3.AggregateOperatorNoOp:
|
||||
queryTmpl :=
|
||||
"SELECT fingerprint, labels as fullLabels," +
|
||||
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
|
||||
" toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
|
||||
" any(value) as value" +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
|
||||
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME +
|
||||
" INNER JOIN" +
|
||||
" (%s) as filtered_time_series" +
|
||||
" USING fingerprint" +
|
||||
@ -434,15 +349,15 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
|
||||
var err error
|
||||
if mq.Temporality == v3.Delta {
|
||||
if panelType == v3.PanelTypeTable {
|
||||
query, err = buildDeltaMetricQueryForTable(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||
query, err = buildDeltaMetricQueryForTable(start, end, mq.StepInterval, mq)
|
||||
} else {
|
||||
query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||
query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq)
|
||||
}
|
||||
} else {
|
||||
if panelType == v3.PanelTypeTable {
|
||||
query, err = buildMetricQueryForTable(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||
query, err = buildMetricQueryForTable(start, end, mq.StepInterval, mq)
|
||||
} else {
|
||||
query, err = buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME)
|
||||
query, err = buildMetricQuery(start, end, mq.StepInterval, mq)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -50,6 +50,7 @@ func TestBuildQueryWithFilters(t *testing.T) {
|
||||
}},
|
||||
AggregateOperator: v3.AggregateOperatorRateMax,
|
||||
Expression: "A",
|
||||
Temporality: v3.Cumulative,
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -57,7 +58,7 @@ func TestBuildQueryWithFilters(t *testing.T) {
|
||||
query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"], Options{PreferRPM: false})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Contains(t, query, "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'a') != 'b'")
|
||||
require.Contains(t, query, "WHERE metric_name = 'name' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'a') != 'b'")
|
||||
require.Contains(t, query, rateWithoutNegative)
|
||||
require.Contains(t, query, "not match(JSONExtractString(labels, 'code'), 'ERROR_*')")
|
||||
})
|
||||
@ -78,6 +79,7 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) {
|
||||
{Key: v3.AttributeKey{Key: "in"}, Value: []interface{}{"a", "b", "c"}, Operator: v3.FilterOperatorIn},
|
||||
}},
|
||||
AggregateOperator: v3.AggregateOperatorRateAvg,
|
||||
Temporality: v3.Cumulative,
|
||||
Expression: "A",
|
||||
},
|
||||
"B": {
|
||||
@ -85,6 +87,7 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) {
|
||||
StepInterval: 60,
|
||||
AggregateAttribute: v3.AttributeKey{Key: "name2"},
|
||||
AggregateOperator: v3.AggregateOperatorRateMax,
|
||||
Temporality: v3.Cumulative,
|
||||
Expression: "B",
|
||||
},
|
||||
},
|
||||
@ -94,158 +97,15 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) {
|
||||
query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"], Options{PreferRPM: false})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Contains(t, query, "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'in') IN ['a','b','c']")
|
||||
require.Contains(t, query, "WHERE metric_name = 'name' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'in') IN ['a','b','c']")
|
||||
require.Contains(t, query, rateWithoutNegative)
|
||||
})
|
||||
}
|
||||
|
||||
func TestBuildQueryOperators(t *testing.T) {
|
||||
testCases := []struct {
|
||||
operator v3.FilterOperator
|
||||
filterSet v3.FilterSet
|
||||
expectedWhereClause string
|
||||
}{
|
||||
{
|
||||
operator: v3.FilterOperatorEqual,
|
||||
filterSet: v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "service_name"}, Value: "route", Operator: v3.FilterOperatorEqual},
|
||||
},
|
||||
},
|
||||
expectedWhereClause: "JSONExtractString(labels, 'service_name') = 'route'",
|
||||
},
|
||||
{
|
||||
operator: v3.FilterOperatorNotEqual,
|
||||
filterSet: v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "service_name"}, Value: "route", Operator: v3.FilterOperatorNotEqual},
|
||||
},
|
||||
},
|
||||
expectedWhereClause: "JSONExtractString(labels, 'service_name') != 'route'",
|
||||
},
|
||||
{
|
||||
operator: v3.FilterOperatorRegex,
|
||||
filterSet: v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "service_name"}, Value: "out", Operator: v3.FilterOperatorRegex},
|
||||
},
|
||||
},
|
||||
expectedWhereClause: "match(JSONExtractString(labels, 'service_name'), 'out')",
|
||||
},
|
||||
{
|
||||
operator: v3.FilterOperatorNotRegex,
|
||||
filterSet: v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "service_name"}, Value: "out", Operator: v3.FilterOperatorNotRegex},
|
||||
},
|
||||
},
|
||||
expectedWhereClause: "not match(JSONExtractString(labels, 'service_name'), 'out')",
|
||||
},
|
||||
{
|
||||
operator: v3.FilterOperatorIn,
|
||||
filterSet: v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "service_name"}, Value: []interface{}{"route", "driver"}, Operator: v3.FilterOperatorIn},
|
||||
},
|
||||
},
|
||||
expectedWhereClause: "JSONExtractString(labels, 'service_name') IN ['route','driver']",
|
||||
},
|
||||
{
|
||||
operator: v3.FilterOperatorNotIn,
|
||||
filterSet: v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "service_name"}, Value: []interface{}{"route", "driver"}, Operator: v3.FilterOperatorNotIn},
|
||||
},
|
||||
},
|
||||
expectedWhereClause: "JSONExtractString(labels, 'service_name') NOT IN ['route','driver']",
|
||||
},
|
||||
{
|
||||
operator: v3.FilterOperatorExists,
|
||||
filterSet: v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "horn"}, Operator: v3.FilterOperatorExists},
|
||||
},
|
||||
},
|
||||
expectedWhereClause: "has(JSONExtractKeys(labels), 'horn')",
|
||||
},
|
||||
{
|
||||
operator: v3.FilterOperatorNotExists,
|
||||
filterSet: v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "horn"}, Operator: v3.FilterOperatorNotExists},
|
||||
},
|
||||
},
|
||||
expectedWhereClause: "not has(JSONExtractKeys(labels), 'horn')",
|
||||
},
|
||||
{
|
||||
operator: v3.FilterOperatorContains,
|
||||
filterSet: v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "service_name"}, Value: "out", Operator: v3.FilterOperatorContains},
|
||||
},
|
||||
},
|
||||
expectedWhereClause: "like(JSONExtractString(labels, 'service_name'), '%out%')",
|
||||
},
|
||||
{
|
||||
operator: v3.FilterOperatorNotContains,
|
||||
filterSet: v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "serice_name"}, Value: "out", Operator: v3.FilterOperatorNotContains},
|
||||
},
|
||||
},
|
||||
expectedWhereClause: "notLike(JSONExtractString(labels, 'serice_name'), '%out%')",
|
||||
},
|
||||
{
|
||||
operator: v3.FilterOperatorLike,
|
||||
filterSet: v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "service_name"}, Value: "dri", Operator: v3.FilterOperatorLike},
|
||||
},
|
||||
},
|
||||
expectedWhereClause: "like(JSONExtractString(labels, 'service_name'), 'dri')",
|
||||
},
|
||||
{
|
||||
operator: v3.FilterOperatorNotLike,
|
||||
filterSet: v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "serice_name"}, Value: "dri", Operator: v3.FilterOperatorNotLike},
|
||||
},
|
||||
},
|
||||
expectedWhereClause: "notLike(JSONExtractString(labels, 'serice_name'), 'dri')",
|
||||
},
|
||||
}
|
||||
|
||||
for i, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
|
||||
mq := v3.BuilderQuery{
|
||||
QueryName: "A",
|
||||
StepInterval: 60,
|
||||
AggregateAttribute: v3.AttributeKey{Key: "signoz_calls_total"},
|
||||
AggregateOperator: v3.AggregateOperatorSum,
|
||||
}
|
||||
whereClause, err := buildMetricsTimeSeriesFilterQuery(&tc.filterSet, []v3.AttributeKey{}, &mq)
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, whereClause, tc.expectedWhereClause)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildQueryXRate(t *testing.T) {
|
||||
t.Run("TestBuildQueryXRate", func(t *testing.T) {
|
||||
|
||||
tmpl := `SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts`
|
||||
tmpl := `SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'name' AND temporality = '' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts`
|
||||
|
||||
cases := []struct {
|
||||
aggregateOperator v3.AggregateOperator
|
||||
@ -298,7 +158,7 @@ func TestBuildQueryXRate(t *testing.T) {
|
||||
func TestBuildQueryRPM(t *testing.T) {
|
||||
t.Run("TestBuildQueryXRate", func(t *testing.T) {
|
||||
|
||||
tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts)`
|
||||
tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'name' AND temporality = '' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts)`
|
||||
|
||||
cases := []struct {
|
||||
aggregateOperator v3.AggregateOperator
|
||||
@ -377,7 +237,7 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
|
||||
},
|
||||
},
|
||||
// 20:10:00 - 20:41:00
|
||||
expected: "timestamp_ms >= 1686082200000 AND timestamp_ms < 1686084060000",
|
||||
expected: "unix_milli >= 1686082200000 AND unix_milli < 1686084060000",
|
||||
},
|
||||
{
|
||||
name: "TestBuildQueryAdjustedTimes start close to 50 seconds",
|
||||
@ -402,7 +262,7 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
|
||||
},
|
||||
},
|
||||
// 20:10:00 - 20:41:00
|
||||
expected: "timestamp_ms >= 1686082200000 AND timestamp_ms < 1686084060000",
|
||||
expected: "unix_milli >= 1686082200000 AND unix_milli < 1686084060000",
|
||||
},
|
||||
{
|
||||
name: "TestBuildQueryAdjustedTimes start close to 42 seconds with step 30 seconds",
|
||||
@ -427,7 +287,7 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
|
||||
},
|
||||
},
|
||||
// 20:11:00 - 20:41:00
|
||||
expected: "timestamp_ms >= 1686082260000 AND timestamp_ms < 1686084060000",
|
||||
expected: "unix_milli >= 1686082260000 AND unix_milli < 1686084060000",
|
||||
},
|
||||
{
|
||||
name: "TestBuildQueryAdjustedTimes start close to 42 seconds with step 30 seconds and end close to 30 seconds",
|
||||
@ -452,7 +312,7 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
|
||||
},
|
||||
},
|
||||
// 20:11:00 - 20:41:00
|
||||
expected: "timestamp_ms >= 1686082260000 AND timestamp_ms < 1686084060000",
|
||||
expected: "unix_milli >= 1686082260000 AND unix_milli < 1686084060000",
|
||||
},
|
||||
{
|
||||
name: "TestBuildQueryAdjustedTimes start close to 42 seconds with step 300 seconds and end close to 30 seconds",
|
||||
@ -479,7 +339,7 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
|
||||
// 20:05:00 - 20:41:00
|
||||
// 20:10:00 is the nearest 5 minute interval, but we round down to 20:05:00
|
||||
// as this is a rate query and we want to include the previous value for the first interval
|
||||
expected: "timestamp_ms >= 1686081900000 AND timestamp_ms < 1686084060000",
|
||||
expected: "unix_milli >= 1686081900000 AND unix_milli < 1686084060000",
|
||||
},
|
||||
{
|
||||
name: "TestBuildQueryAdjustedTimes start close to 42 seconds with step 180 seconds and end close to 30 seconds",
|
||||
@ -506,7 +366,7 @@ func TestBuildQueryAdjustedTimes(t *testing.T) {
|
||||
// 20:06:00 - 20:39:00
|
||||
// 20:09:00 is the nearest 3 minute interval, but we round down to 20:06:00
|
||||
// as this is a rate query and we want to include the previous value for the first interval
|
||||
expected: "timestamp_ms >= 1686081960000 AND timestamp_ms < 1686084060000",
|
||||
expected: "unix_milli >= 1686081960000 AND unix_milli < 1686084060000",
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -117,3 +117,88 @@ func PrepareTimeseriesFilterQuery(start, end int64, mq *v3.BuilderQuery) (string
|
||||
|
||||
return filterSubQuery, nil
|
||||
}
|
||||
|
||||
// PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria
|
||||
func PrepareTimeseriesFilterQueryV3(start, end int64, mq *v3.BuilderQuery) (string, error) {
|
||||
var conditions []string
|
||||
var fs *v3.FilterSet = mq.Filters
|
||||
var groupTags []v3.AttributeKey = mq.GroupBy
|
||||
|
||||
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key)))
|
||||
conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality))
|
||||
|
||||
start, end, tableName := which(start, end)
|
||||
|
||||
conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end))
|
||||
|
||||
if fs != nil && len(fs.Items) != 0 {
|
||||
for _, item := range fs.Items {
|
||||
toFormat := item.Value
|
||||
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
|
||||
if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains {
|
||||
toFormat = fmt.Sprintf("%%%s%%", toFormat)
|
||||
}
|
||||
fmtVal := utils.ClickHouseFormattedValue(toFormat)
|
||||
switch op {
|
||||
case v3.FilterOperatorEqual:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorNotEqual:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorIn:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorNotIn:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorLike:
|
||||
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorNotLike:
|
||||
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorRegex:
|
||||
conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorNotRegex:
|
||||
conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorGreaterThan:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorGreaterThanOrEq:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorLessThan:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorLessThanOrEq:
|
||||
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorContains:
|
||||
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorNotContains:
|
||||
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal))
|
||||
case v3.FilterOperatorExists:
|
||||
conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", item.Key.Key))
|
||||
case v3.FilterOperatorNotExists:
|
||||
conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", item.Key.Key))
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported filter operator")
|
||||
}
|
||||
}
|
||||
}
|
||||
whereClause := strings.Join(conditions, " AND ")
|
||||
|
||||
var selectLabels string
|
||||
|
||||
if mq.AggregateOperator == v3.AggregateOperatorNoOp || mq.AggregateOperator == v3.AggregateOperatorRate {
|
||||
selectLabels += "labels, "
|
||||
} else {
|
||||
for _, tag := range groupTags {
|
||||
selectLabels += fmt.Sprintf("JSONExtractString(labels, '%s') as %s, ", tag.Key, tag.Key)
|
||||
}
|
||||
}
|
||||
|
||||
// The table JOIN key always exists
|
||||
selectLabels += "fingerprint"
|
||||
|
||||
filterSubQuery := fmt.Sprintf(
|
||||
"SELECT DISTINCT %s FROM %s.%s WHERE %s",
|
||||
selectLabels,
|
||||
constants.SIGNOZ_METRIC_DBNAME,
|
||||
tableName,
|
||||
whereClause,
|
||||
)
|
||||
|
||||
return filterSubQuery, nil
|
||||
}
|
||||
|
@ -572,8 +572,8 @@ func TestQueryRange(t *testing.T) {
|
||||
}
|
||||
q := NewQuerier(opts)
|
||||
expectedTimeRangeInQueryString := []string{
|
||||
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms < %d", 1675115520000, 1675115580000+120*60*1000),
|
||||
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms < %d", 1675115520000+120*60*1000, 1675115580000+180*60*1000),
|
||||
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000),
|
||||
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000+120*60*1000, 1675115580000+180*60*1000),
|
||||
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", 1675115580000*1000000, (1675115580000+120*60*1000)*int64(1000000)),
|
||||
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)),
|
||||
}
|
||||
@ -683,7 +683,7 @@ func TestQueryRangeValueType(t *testing.T) {
|
||||
q := NewQuerier(opts)
|
||||
// No caching
|
||||
expectedTimeRangeInQueryString := []string{
|
||||
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms < %d", 1675115520000, 1675115580000+120*60*1000),
|
||||
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000),
|
||||
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)),
|
||||
}
|
||||
|
||||
|
@ -27,6 +27,7 @@ func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) {
|
||||
{Key: v3.AttributeKey{Key: "in"}, Value: []interface{}{"a", "b", "c"}, Operator: v3.FilterOperatorIn},
|
||||
}},
|
||||
AggregateOperator: v3.AggregateOperatorRateMax,
|
||||
Temporality: v3.Cumulative,
|
||||
Expression: "A",
|
||||
},
|
||||
"B": {
|
||||
@ -35,6 +36,7 @@ func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) {
|
||||
AggregateAttribute: v3.AttributeKey{Key: "name2"},
|
||||
DataSource: v3.DataSourceMetrics,
|
||||
AggregateOperator: v3.AggregateOperatorRateAvg,
|
||||
Temporality: v3.Cumulative,
|
||||
Expression: "B",
|
||||
},
|
||||
"C": {
|
||||
@ -55,7 +57,7 @@ func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Contains(t, queries["C"], "SELECT A.`ts` as `ts`, A.value / B.value")
|
||||
require.Contains(t, queries["C"], "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'in') IN ['a','b','c']")
|
||||
require.Contains(t, queries["C"], "WHERE metric_name = 'name' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'in') IN ['a','b','c']")
|
||||
require.Contains(t, queries["C"], "(value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window)))")
|
||||
})
|
||||
}
|
||||
@ -257,7 +259,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
queryToTest: "A",
|
||||
expected: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts",
|
||||
expected: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts",
|
||||
},
|
||||
{
|
||||
name: "TestQueryWithExpression - Error rate",
|
||||
@ -327,7 +329,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
queryToTest: "C",
|
||||
expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`",
|
||||
expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`",
|
||||
},
|
||||
{
|
||||
name: "TestQuery - Quantile",
|
||||
@ -355,7 +357,7 @@ func TestDeltaQueryBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
queryToTest: "A",
|
||||
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts",
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -206,12 +206,9 @@ var GroupByColMap = map[string]struct{}{
|
||||
|
||||
const (
|
||||
SIGNOZ_METRIC_DBNAME = "signoz_metrics"
|
||||
SIGNOZ_SAMPLES_TABLENAME = "distributed_samples_v2"
|
||||
SIGNOZ_SAMPLES_V4_TABLENAME = "distributed_samples_v4"
|
||||
SIGNOZ_TIMESERIES_TABLENAME = "distributed_time_series_v2"
|
||||
SIGNOZ_TRACE_DBNAME = "signoz_traces"
|
||||
SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2"
|
||||
SIGNOZ_TIMESERIES_LOCAL_TABLENAME = "time_series_v2"
|
||||
SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4"
|
||||
SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs"
|
||||
SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day"
|
||||
|
Loading…
x
Reference in New Issue
Block a user