From de497bf5b66b9fe342fcbc53cbfa55dec2d11f8a Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 21 May 2024 12:01:21 +0530 Subject: [PATCH] chore: query range v3 metrics use v4 tables (#5021) --- .../app/clickhouseReader/reader.go | 41 +++-- pkg/query-service/app/http_handler.go | 47 +---- .../app/metrics/v3/cumulative_table.go | 13 +- .../app/metrics/v3/cumulative_table_test.go | 8 +- pkg/query-service/app/metrics/v3/delta.go | 15 +- .../app/metrics/v3/delta_table.go | 9 +- .../app/metrics/v3/delta_table_test.go | 8 +- .../app/metrics/v3/query_builder.go | 109 ++---------- .../app/metrics/v3/query_builder_test.go | 166 ++---------------- .../app/metrics/v4/helpers/sub_query.go | 85 +++++++++ pkg/query-service/app/querier/querier_test.go | 6 +- .../app/queryBuilder/query_builder_test.go | 10 +- pkg/query-service/constants/constants.go | 3 - 13 files changed, 175 insertions(+), 345 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index fcc2efeb15..b7e1ec6a67 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -72,11 +72,17 @@ const ( signozTraceTableName = "distributed_signoz_index_v2" signozTraceLocalTableName = "signoz_index_v2" signozMetricDBName = "signoz_metrics" - signozSampleLocalTableName = "samples_v2" - signozSampleTableName = "distributed_samples_v2" - signozTSTableName = "distributed_time_series_v2" - signozTSTableNameV4 = "distributed_time_series_v4" - signozTSTableNameV41Day = "distributed_time_series_v4_1day" + signozSampleLocalTableName = "samples_v4" + signozSampleTableName = "distributed_samples_v4" + + signozTSLocalTableNameV4 = "time_series_v4" + signozTSTableNameV4 = "distributed_time_series_v4" + + signozTSLocalTableNameV46Hrs = "time_series_v4_6hrs" + signozTSTableNameV46Hrs = "distributed_time_series_v4_6hrs" + + signozTSLocalTableNameV41Day = "time_series_v4_1day" + signozTSTableNameV41Day = "distributed_time_series_v4_1day" minTimespanForProgressiveSearch = time.Hour minTimespanForProgressiveSearchMargin = time.Minute @@ -2382,15 +2388,17 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } case constants.MetricsTTL: - tableName := signozMetricDBName + "." + signozSampleLocalTableName - statusItem, err := r.checkTTLStatusItem(ctx, tableName) - if err != nil { - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")} + tableNames := []string{signozMetricDBName + "." + signozSampleLocalTableName, signozMetricDBName + "." + signozTSLocalTableNameV4, signozMetricDBName + "." + signozTSLocalTableNameV46Hrs, signozMetricDBName + "." + signozTSLocalTableNameV41Day} + for _, tableName := range tableNames { + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} + } + if statusItem.Status == constants.StatusPending { + return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} + } } - if statusItem.Status == constants.StatusPending { - return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} - } - go func(tableName string) { + metricTTL := func(tableName string) { _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) if dbErr != nil { zap.L().Error("Error in inserting to ttl_status table", zap.Error(dbErr)) @@ -2434,7 +2442,10 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) return } - }(tableName) + } + for _, tableName := range tableNames { + go metricTTL(tableName) + } case constants.LogsTTL: tableName := r.logsDB + "." + r.logsLocalTable statusItem, err := r.checkTTLStatusItem(ctx, tableName) @@ -3259,7 +3270,7 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) { - queryStr := fmt.Sprintf("SELECT count() as count from %s.%s where metric_name not like 'signoz_%%' group by metric_name order by count desc;", signozMetricDBName, signozTSTableName) + queryStr := fmt.Sprintf("SELECT countDistinct(fingerprint) as count from %s.%s where metric_name not like 'signoz_%%' group by metric_name order by count desc;", signozMetricDBName, signozTSTableNameV41Day) rows, _ := r.db.Query(ctx, queryStr) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 3a9ea9d420..e3f7c5a165 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -483,49 +483,7 @@ func (aH *APIHandler) getRule(w http.ResponseWriter, r *http.Request) { aH.Respond(w, ruleResponse) } -func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error { - - metricNames := make([]string, 0) - metricNameToTemporality := make(map[string]map[v3.Temporality]bool) - if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { - for _, query := range qp.CompositeQuery.BuilderQueries { - if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" { - metricNames = append(metricNames, query.AggregateAttribute.Key) - if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok { - metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool) - } - } - } - } - - var err error - - if aH.preferDelta { - zap.L().Debug("fetching metric temporality") - metricNameToTemporality, err = aH.reader.FetchTemporality(ctx, metricNames) - if err != nil { - return err - } - } - - if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { - for name := range qp.CompositeQuery.BuilderQueries { - query := qp.CompositeQuery.BuilderQueries[name] - if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" { - if aH.preferDelta && metricNameToTemporality[query.AggregateAttribute.Key][v3.Delta] { - query.Temporality = v3.Delta - } else if metricNameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] { - query.Temporality = v3.Cumulative - } else { - query.Temporality = v3.Unspecified - } - } - } - } - return nil -} - -// populateTemporality same as addTemporality but for v4 and better +// populateTemporality adds the temporality to the query if it is not present func (aH *APIHandler) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error { missingTemporality := make([]string, 0) @@ -3320,8 +3278,7 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) { } // add temporality for each metric - - temporalityErr := aH.addTemporality(r.Context(), queryRangeParams) + temporalityErr := aH.populateTemporality(r.Context(), queryRangeParams) if temporalityErr != nil { zap.L().Error("Error while adding temporality for metrics", zap.Error(temporalityErr)) RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil) diff --git a/pkg/query-service/app/metrics/v3/cumulative_table.go b/pkg/query-service/app/metrics/v3/cumulative_table.go index db9c909abf..d57743cb58 100644 --- a/pkg/query-service/app/metrics/v3/cumulative_table.go +++ b/pkg/query-service/app/metrics/v3/cumulative_table.go @@ -5,6 +5,7 @@ import ( "math" "strings" + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils" @@ -28,7 +29,7 @@ func stepForTableCumulative(start, end int64) int64 { return int64(step) } -func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableName string) (string, error) { +func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery) (string, error) { step := stepForTableCumulative(start, end) @@ -36,19 +37,19 @@ func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableNam metricQueryGroupBy := mq.GroupBy - filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq) + filterSubQuery, err := helpers.PrepareTimeseriesFilterQueryV3(start, end, mq) if err != nil { return "", err } - samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) // Select the aggregate value for interval queryTmplCounterInner := "SELECT %s" + - " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," + " %s as value" + - " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME + " INNER JOIN" + " (%s) as filtered_time_series" + " USING fingerprint" + @@ -61,7 +62,7 @@ func buildMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableNam "SELECT %s" + " toStartOfHour(now()) as ts," + // now() has no menaing & used as a placeholder for ts " %s as value" + - " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME + " INNER JOIN" + " (%s) as filtered_time_series" + " USING fingerprint" + diff --git a/pkg/query-service/app/metrics/v3/cumulative_table_test.go b/pkg/query-service/app/metrics/v3/cumulative_table_test.go index a6b489e5ec..26748b9f09 100644 --- a/pkg/query-service/app/metrics/v3/cumulative_table_test.go +++ b/pkg/query-service/app/metrics/v3/cumulative_table_test.go @@ -38,7 +38,7 @@ func TestPanelTableForCumulative(t *testing.T) { }, Expression: "A", }, - expected: "SELECT toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts", + expected: "SELECT toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Cumulative' AND unix_milli >= 1689253200000 AND unix_milli < 1689257640000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1689255866000 AND unix_milli <= 1689257640000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts", }, { name: "latency p50", @@ -66,7 +66,7 @@ func TestPanelTableForCumulative(t *testing.T) { }, }, }, - expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, le ORDER BY fingerprint, le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts", + expected: "SELECT toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1689253200000 AND unix_milli < 1689257640000 AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1689255866000 AND unix_milli <= 1689257640000 GROUP BY fingerprint, le,ts ORDER BY fingerprint, le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, le ORDER BY fingerprint, le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts", }, { name: "latency p99 with group by", @@ -88,13 +88,13 @@ func TestPanelTableForCumulative(t *testing.T) { }, Expression: "A", }, - expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT service_name,le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, service_name,le ORDER BY fingerprint, service_name ASC,le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", + expected: "SELECT service_name, toStartOfHour(now()) as ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(rate_value)/29 as value FROM (SELECT service_name,le, ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, service_name,le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1689253200000 AND unix_milli < 1689257640000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1689255866000 AND unix_milli <= 1689257640000 GROUP BY fingerprint, service_name,le,ts ORDER BY fingerprint, service_name ASC,le ASC, ts) WINDOW rate_window as (PARTITION BY fingerprint, service_name,le ORDER BY fingerprint, service_name ASC,le ASC, ts)) WHERE isNaN(rate_value) = 0 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", }, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - query, err := buildMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query, "distributed_time_series_v2") + query, err := buildMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query) if err != nil { t.Fatalf("unexpected error: %v\n", err) } diff --git a/pkg/query-service/app/metrics/v3/delta.go b/pkg/query-service/app/metrics/v3/delta.go index f82e086bea..c2a1893af0 100644 --- a/pkg/query-service/app/metrics/v3/delta.go +++ b/pkg/query-service/app/metrics/v3/delta.go @@ -3,12 +3,13 @@ package v3 import ( "fmt" + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils" ) -func buildDeltaMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName string) (string, error) { +func buildDeltaMetricQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) { metricQueryGroupBy := mq.GroupBy @@ -30,19 +31,19 @@ func buildDeltaMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableNam } } - filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq) + filterSubQuery, err := helpers.PrepareTimeseriesFilterQueryV3(start, end, mq) if err != nil { return "", err } - samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) // Select the aggregate value for interval queryTmpl := "SELECT %s" + - " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," + " %s as value" + - " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME + " INNER JOIN" + " (%s) as filtered_time_series" + " USING fingerprint" + @@ -140,9 +141,9 @@ func buildDeltaMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableNam case v3.AggregateOperatorNoOp: queryTmpl := "SELECT fingerprint, labels as fullLabels," + - " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," + " any(value) as value" + - " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME + " INNER JOIN" + " (%s) as filtered_time_series" + " USING fingerprint" + diff --git a/pkg/query-service/app/metrics/v3/delta_table.go b/pkg/query-service/app/metrics/v3/delta_table.go index 7d98d27b1a..4fdf152d95 100644 --- a/pkg/query-service/app/metrics/v3/delta_table.go +++ b/pkg/query-service/app/metrics/v3/delta_table.go @@ -4,12 +4,13 @@ import ( "fmt" "math" + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils" ) -func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tableName string) (string, error) { +func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery) (string, error) { // round up to the nearest multiple of 60 step := int64(math.Ceil(float64(end-start+1)/1000/60) * 60) @@ -43,17 +44,17 @@ func buildDeltaMetricQueryForTable(start, end, _ int64, mq *v3.BuilderQuery, tab } } - filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq) + filterSubQuery, err := helpers.PrepareTimeseriesFilterQueryV3(start, end, mq) if err != nil { return "", err } - samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) queryTmpl := "SELECT %s toStartOfHour(now()) as ts," + // now() has no menaing & used as a placeholder for ts " %s as value" + - " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME + " INNER JOIN" + " (%s) as filtered_time_series" + " USING fingerprint" + diff --git a/pkg/query-service/app/metrics/v3/delta_table_test.go b/pkg/query-service/app/metrics/v3/delta_table_test.go index d22807f2c1..3cb0598cfa 100644 --- a/pkg/query-service/app/metrics/v3/delta_table_test.go +++ b/pkg/query-service/app/metrics/v3/delta_table_test.go @@ -38,7 +38,7 @@ func TestPanelTableForDelta(t *testing.T) { }, Expression: "A", }, - expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY ts ORDER BY ts", + expected: "SELECT toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1689253200000 AND unix_milli < 1689257640000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch']) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1689255866000 AND unix_milli <= 1689257640000 GROUP BY ts ORDER BY ts", }, { name: "latency p50", @@ -61,7 +61,7 @@ func TestPanelTableForDelta(t *testing.T) { }, Expression: "A", }, - expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts", + expected: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.500) as value FROM (SELECT le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1689253200000 AND unix_milli < 1689257640000 AND JSONExtractString(labels, 'service_name') = 'frontend') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1689255866000 AND unix_milli <= 1689257640000 GROUP BY le,ts ORDER BY le ASC, ts) GROUP BY ts ORDER BY ts", }, { name: "latency p99 with group by", @@ -80,13 +80,13 @@ func TestPanelTableForDelta(t *testing.T) { }, Expression: "A", }, - expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1689255866000 AND timestamp_ms <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", + expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name,le, toStartOfHour(now()) as ts, sum(value)/1800 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1689253200000 AND unix_milli < 1689257640000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1689255866000 AND unix_milli <= 1689257640000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", }, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - query, err := buildDeltaMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query, "distributed_time_series_v2") + query, err := buildDeltaMetricQueryForTable(1689255866000, 1689257640000, 1800, c.query) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/query-service/app/metrics/v3/query_builder.go b/pkg/query-service/app/metrics/v3/query_builder.go index 1e0a49198b..b5453e97b4 100644 --- a/pkg/query-service/app/metrics/v3/query_builder.go +++ b/pkg/query-service/app/metrics/v3/query_builder.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" @@ -51,109 +52,23 @@ var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{ // See https://github.com/SigNoz/signoz/issues/2151#issuecomment-1467249056 var rateWithoutNegative = `If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) ` -// buildMetricsTimeSeriesFilterQuery builds the sub-query to be used for filtering -// timeseries based on search criteria -func buildMetricsTimeSeriesFilterQuery(fs *v3.FilterSet, groupTags []v3.AttributeKey, mq *v3.BuilderQuery) (string, error) { - metricName := mq.AggregateAttribute.Key - aggregateOperator := mq.AggregateOperator - var conditions []string - if mq.Temporality == v3.Delta { - conditions = append(conditions, fmt.Sprintf("metric_name = %s AND temporality = '%s' ", utils.ClickHouseFormattedValue(metricName), v3.Delta)) - } else { - conditions = append(conditions, fmt.Sprintf("metric_name = %s AND temporality IN ['%s', '%s']", utils.ClickHouseFormattedValue(metricName), v3.Cumulative, v3.Unspecified)) - } - - if fs != nil && len(fs.Items) != 0 { - for _, item := range fs.Items { - toFormat := item.Value - op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator)))) - // if the received value is an array for like/match op, just take the first value - // or should we throw an error? - if op == v3.FilterOperatorLike || op == v3.FilterOperatorRegex || op == v3.FilterOperatorNotLike || op == v3.FilterOperatorNotRegex { - x, ok := item.Value.([]interface{}) - if ok { - if len(x) == 0 { - continue - } - toFormat = x[0] - } - } - - if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains { - toFormat = fmt.Sprintf("%%%s%%", toFormat) - } - fmtVal := utils.ClickHouseFormattedValue(toFormat) - switch op { - case v3.FilterOperatorEqual: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorNotEqual: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorIn: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorNotIn: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorLike: - conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) - case v3.FilterOperatorNotLike: - conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) - case v3.FilterOperatorRegex: - conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) - case v3.FilterOperatorNotRegex: - conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) - case v3.FilterOperatorGreaterThan: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorGreaterThanOrEq: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorLessThan: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorLessThanOrEq: - conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal)) - case v3.FilterOperatorContains: - conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) - case v3.FilterOperatorNotContains: - conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) - case v3.FilterOperatorExists: - conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", item.Key.Key)) - case v3.FilterOperatorNotExists: - conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", item.Key.Key)) - default: - return "", fmt.Errorf("unsupported operation") - } - } - } - queryString := strings.Join(conditions, " AND ") - - var selectLabels string - if aggregateOperator == v3.AggregateOperatorNoOp || aggregateOperator == v3.AggregateOperatorRate { - selectLabels = "labels," - } else { - for _, tag := range groupTags { - selectLabels += fmt.Sprintf(" JSONExtractString(labels, '%s') as %s,", tag.Key, tag.Key) - } - } - - filterSubQuery := fmt.Sprintf("SELECT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, queryString) - - return filterSubQuery, nil -} - -func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName string) (string, error) { +func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) { metricQueryGroupBy := mq.GroupBy - filterSubQuery, err := buildMetricsTimeSeriesFilterQuery(mq.Filters, metricQueryGroupBy, mq) + filterSubQuery, err := helpers.PrepareTimeseriesFilterQueryV3(start, end, mq) if err != nil { return "", err } - samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) // Select the aggregate value for interval queryTmpl := "SELECT %s" + - " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," + " %s as value" + - " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME + " INNER JOIN" + " (%s) as filtered_time_series" + " USING fingerprint" + @@ -282,9 +197,9 @@ func buildMetricQuery(start, end, step int64, mq *v3.BuilderQuery, tableName str case v3.AggregateOperatorNoOp: queryTmpl := "SELECT fingerprint, labels as fullLabels," + - " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," + " any(value) as value" + - " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME + " INNER JOIN" + " (%s) as filtered_time_series" + " USING fingerprint" + @@ -434,15 +349,15 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P var err error if mq.Temporality == v3.Delta { if panelType == v3.PanelTypeTable { - query, err = buildDeltaMetricQueryForTable(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + query, err = buildDeltaMetricQueryForTable(start, end, mq.StepInterval, mq) } else { - query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + query, err = buildDeltaMetricQuery(start, end, mq.StepInterval, mq) } } else { if panelType == v3.PanelTypeTable { - query, err = buildMetricQueryForTable(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + query, err = buildMetricQueryForTable(start, end, mq.StepInterval, mq) } else { - query, err = buildMetricQuery(start, end, mq.StepInterval, mq, constants.SIGNOZ_TIMESERIES_TABLENAME) + query, err = buildMetricQuery(start, end, mq.StepInterval, mq) } } diff --git a/pkg/query-service/app/metrics/v3/query_builder_test.go b/pkg/query-service/app/metrics/v3/query_builder_test.go index 2ad6013de6..5b85036007 100644 --- a/pkg/query-service/app/metrics/v3/query_builder_test.go +++ b/pkg/query-service/app/metrics/v3/query_builder_test.go @@ -50,6 +50,7 @@ func TestBuildQueryWithFilters(t *testing.T) { }}, AggregateOperator: v3.AggregateOperatorRateMax, Expression: "A", + Temporality: v3.Cumulative, }, }, }, @@ -57,7 +58,7 @@ func TestBuildQueryWithFilters(t *testing.T) { query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"], Options{PreferRPM: false}) require.NoError(t, err) - require.Contains(t, query, "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'a') != 'b'") + require.Contains(t, query, "WHERE metric_name = 'name' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'a') != 'b'") require.Contains(t, query, rateWithoutNegative) require.Contains(t, query, "not match(JSONExtractString(labels, 'code'), 'ERROR_*')") }) @@ -78,6 +79,7 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) { {Key: v3.AttributeKey{Key: "in"}, Value: []interface{}{"a", "b", "c"}, Operator: v3.FilterOperatorIn}, }}, AggregateOperator: v3.AggregateOperatorRateAvg, + Temporality: v3.Cumulative, Expression: "A", }, "B": { @@ -85,6 +87,7 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) { StepInterval: 60, AggregateAttribute: v3.AttributeKey{Key: "name2"}, AggregateOperator: v3.AggregateOperatorRateMax, + Temporality: v3.Cumulative, Expression: "B", }, }, @@ -94,158 +97,15 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) { query, err := PrepareMetricQuery(q.Start, q.End, q.CompositeQuery.QueryType, q.CompositeQuery.PanelType, q.CompositeQuery.BuilderQueries["A"], Options{PreferRPM: false}) require.NoError(t, err) - require.Contains(t, query, "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'in') IN ['a','b','c']") + require.Contains(t, query, "WHERE metric_name = 'name' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'in') IN ['a','b','c']") require.Contains(t, query, rateWithoutNegative) }) } -func TestBuildQueryOperators(t *testing.T) { - testCases := []struct { - operator v3.FilterOperator - filterSet v3.FilterSet - expectedWhereClause string - }{ - { - operator: v3.FilterOperatorEqual, - filterSet: v3.FilterSet{ - Operator: "AND", - Items: []v3.FilterItem{ - {Key: v3.AttributeKey{Key: "service_name"}, Value: "route", Operator: v3.FilterOperatorEqual}, - }, - }, - expectedWhereClause: "JSONExtractString(labels, 'service_name') = 'route'", - }, - { - operator: v3.FilterOperatorNotEqual, - filterSet: v3.FilterSet{ - Operator: "AND", - Items: []v3.FilterItem{ - {Key: v3.AttributeKey{Key: "service_name"}, Value: "route", Operator: v3.FilterOperatorNotEqual}, - }, - }, - expectedWhereClause: "JSONExtractString(labels, 'service_name') != 'route'", - }, - { - operator: v3.FilterOperatorRegex, - filterSet: v3.FilterSet{ - Operator: "AND", - Items: []v3.FilterItem{ - {Key: v3.AttributeKey{Key: "service_name"}, Value: "out", Operator: v3.FilterOperatorRegex}, - }, - }, - expectedWhereClause: "match(JSONExtractString(labels, 'service_name'), 'out')", - }, - { - operator: v3.FilterOperatorNotRegex, - filterSet: v3.FilterSet{ - Operator: "AND", - Items: []v3.FilterItem{ - {Key: v3.AttributeKey{Key: "service_name"}, Value: "out", Operator: v3.FilterOperatorNotRegex}, - }, - }, - expectedWhereClause: "not match(JSONExtractString(labels, 'service_name'), 'out')", - }, - { - operator: v3.FilterOperatorIn, - filterSet: v3.FilterSet{ - Operator: "AND", - Items: []v3.FilterItem{ - {Key: v3.AttributeKey{Key: "service_name"}, Value: []interface{}{"route", "driver"}, Operator: v3.FilterOperatorIn}, - }, - }, - expectedWhereClause: "JSONExtractString(labels, 'service_name') IN ['route','driver']", - }, - { - operator: v3.FilterOperatorNotIn, - filterSet: v3.FilterSet{ - Operator: "AND", - Items: []v3.FilterItem{ - {Key: v3.AttributeKey{Key: "service_name"}, Value: []interface{}{"route", "driver"}, Operator: v3.FilterOperatorNotIn}, - }, - }, - expectedWhereClause: "JSONExtractString(labels, 'service_name') NOT IN ['route','driver']", - }, - { - operator: v3.FilterOperatorExists, - filterSet: v3.FilterSet{ - Operator: "AND", - Items: []v3.FilterItem{ - {Key: v3.AttributeKey{Key: "horn"}, Operator: v3.FilterOperatorExists}, - }, - }, - expectedWhereClause: "has(JSONExtractKeys(labels), 'horn')", - }, - { - operator: v3.FilterOperatorNotExists, - filterSet: v3.FilterSet{ - Operator: "AND", - Items: []v3.FilterItem{ - {Key: v3.AttributeKey{Key: "horn"}, Operator: v3.FilterOperatorNotExists}, - }, - }, - expectedWhereClause: "not has(JSONExtractKeys(labels), 'horn')", - }, - { - operator: v3.FilterOperatorContains, - filterSet: v3.FilterSet{ - Operator: "AND", - Items: []v3.FilterItem{ - {Key: v3.AttributeKey{Key: "service_name"}, Value: "out", Operator: v3.FilterOperatorContains}, - }, - }, - expectedWhereClause: "like(JSONExtractString(labels, 'service_name'), '%out%')", - }, - { - operator: v3.FilterOperatorNotContains, - filterSet: v3.FilterSet{ - Operator: "AND", - Items: []v3.FilterItem{ - {Key: v3.AttributeKey{Key: "serice_name"}, Value: "out", Operator: v3.FilterOperatorNotContains}, - }, - }, - expectedWhereClause: "notLike(JSONExtractString(labels, 'serice_name'), '%out%')", - }, - { - operator: v3.FilterOperatorLike, - filterSet: v3.FilterSet{ - Operator: "AND", - Items: []v3.FilterItem{ - {Key: v3.AttributeKey{Key: "service_name"}, Value: "dri", Operator: v3.FilterOperatorLike}, - }, - }, - expectedWhereClause: "like(JSONExtractString(labels, 'service_name'), 'dri')", - }, - { - operator: v3.FilterOperatorNotLike, - filterSet: v3.FilterSet{ - Operator: "AND", - Items: []v3.FilterItem{ - {Key: v3.AttributeKey{Key: "serice_name"}, Value: "dri", Operator: v3.FilterOperatorNotLike}, - }, - }, - expectedWhereClause: "notLike(JSONExtractString(labels, 'serice_name'), 'dri')", - }, - } - - for i, tc := range testCases { - t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { - mq := v3.BuilderQuery{ - QueryName: "A", - StepInterval: 60, - AggregateAttribute: v3.AttributeKey{Key: "signoz_calls_total"}, - AggregateOperator: v3.AggregateOperatorSum, - } - whereClause, err := buildMetricsTimeSeriesFilterQuery(&tc.filterSet, []v3.AttributeKey{}, &mq) - require.NoError(t, err) - require.Contains(t, whereClause, tc.expectedWhereClause) - }) - } -} - func TestBuildQueryXRate(t *testing.T) { t.Run("TestBuildQueryXRate", func(t *testing.T) { - tmpl := `SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts` + tmpl := `SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'name' AND temporality = '' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts` cases := []struct { aggregateOperator v3.AggregateOperator @@ -298,7 +158,7 @@ func TestBuildQueryXRate(t *testing.T) { func TestBuildQueryRPM(t *testing.T) { t.Run("TestBuildQueryXRate", func(t *testing.T) { - tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified']) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND timestamp_ms >= 1650991920000 AND timestamp_ms < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts)` + tmpl := `SELECT ts, ceil(value * 60) as value FROM (SELECT ts, %s(rate_value) as value FROM (SELECT ts, If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as rate_value FROM(SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'name' AND temporality = '' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'name' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts) ) WHERE isNaN(rate_value) = 0 GROUP BY ts ORDER BY ts)` cases := []struct { aggregateOperator v3.AggregateOperator @@ -377,7 +237,7 @@ func TestBuildQueryAdjustedTimes(t *testing.T) { }, }, // 20:10:00 - 20:41:00 - expected: "timestamp_ms >= 1686082200000 AND timestamp_ms < 1686084060000", + expected: "unix_milli >= 1686082200000 AND unix_milli < 1686084060000", }, { name: "TestBuildQueryAdjustedTimes start close to 50 seconds", @@ -402,7 +262,7 @@ func TestBuildQueryAdjustedTimes(t *testing.T) { }, }, // 20:10:00 - 20:41:00 - expected: "timestamp_ms >= 1686082200000 AND timestamp_ms < 1686084060000", + expected: "unix_milli >= 1686082200000 AND unix_milli < 1686084060000", }, { name: "TestBuildQueryAdjustedTimes start close to 42 seconds with step 30 seconds", @@ -427,7 +287,7 @@ func TestBuildQueryAdjustedTimes(t *testing.T) { }, }, // 20:11:00 - 20:41:00 - expected: "timestamp_ms >= 1686082260000 AND timestamp_ms < 1686084060000", + expected: "unix_milli >= 1686082260000 AND unix_milli < 1686084060000", }, { name: "TestBuildQueryAdjustedTimes start close to 42 seconds with step 30 seconds and end close to 30 seconds", @@ -452,7 +312,7 @@ func TestBuildQueryAdjustedTimes(t *testing.T) { }, }, // 20:11:00 - 20:41:00 - expected: "timestamp_ms >= 1686082260000 AND timestamp_ms < 1686084060000", + expected: "unix_milli >= 1686082260000 AND unix_milli < 1686084060000", }, { name: "TestBuildQueryAdjustedTimes start close to 42 seconds with step 300 seconds and end close to 30 seconds", @@ -479,7 +339,7 @@ func TestBuildQueryAdjustedTimes(t *testing.T) { // 20:05:00 - 20:41:00 // 20:10:00 is the nearest 5 minute interval, but we round down to 20:05:00 // as this is a rate query and we want to include the previous value for the first interval - expected: "timestamp_ms >= 1686081900000 AND timestamp_ms < 1686084060000", + expected: "unix_milli >= 1686081900000 AND unix_milli < 1686084060000", }, { name: "TestBuildQueryAdjustedTimes start close to 42 seconds with step 180 seconds and end close to 30 seconds", @@ -506,7 +366,7 @@ func TestBuildQueryAdjustedTimes(t *testing.T) { // 20:06:00 - 20:39:00 // 20:09:00 is the nearest 3 minute interval, but we round down to 20:06:00 // as this is a rate query and we want to include the previous value for the first interval - expected: "timestamp_ms >= 1686081960000 AND timestamp_ms < 1686084060000", + expected: "unix_milli >= 1686081960000 AND unix_milli < 1686084060000", }, } diff --git a/pkg/query-service/app/metrics/v4/helpers/sub_query.go b/pkg/query-service/app/metrics/v4/helpers/sub_query.go index d4cd103719..e1edc5a964 100644 --- a/pkg/query-service/app/metrics/v4/helpers/sub_query.go +++ b/pkg/query-service/app/metrics/v4/helpers/sub_query.go @@ -117,3 +117,88 @@ func PrepareTimeseriesFilterQuery(start, end int64, mq *v3.BuilderQuery) (string return filterSubQuery, nil } + +// PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria +func PrepareTimeseriesFilterQueryV3(start, end int64, mq *v3.BuilderQuery) (string, error) { + var conditions []string + var fs *v3.FilterSet = mq.Filters + var groupTags []v3.AttributeKey = mq.GroupBy + + conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key))) + conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality)) + + start, end, tableName := which(start, end) + + conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end)) + + if fs != nil && len(fs.Items) != 0 { + for _, item := range fs.Items { + toFormat := item.Value + op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator)))) + if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains { + toFormat = fmt.Sprintf("%%%s%%", toFormat) + } + fmtVal := utils.ClickHouseFormattedValue(toFormat) + switch op { + case v3.FilterOperatorEqual: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotEqual: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorIn: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotIn: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorLike: + conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotLike: + conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorRegex: + conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotRegex: + conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorGreaterThan: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') > %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorGreaterThanOrEq: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') >= %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorLessThan: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') < %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorLessThanOrEq: + conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') <= %s", item.Key.Key, fmtVal)) + case v3.FilterOperatorContains: + conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorNotContains: + conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key.Key, fmtVal)) + case v3.FilterOperatorExists: + conditions = append(conditions, fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", item.Key.Key)) + case v3.FilterOperatorNotExists: + conditions = append(conditions, fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", item.Key.Key)) + default: + return "", fmt.Errorf("unsupported filter operator") + } + } + } + whereClause := strings.Join(conditions, " AND ") + + var selectLabels string + + if mq.AggregateOperator == v3.AggregateOperatorNoOp || mq.AggregateOperator == v3.AggregateOperatorRate { + selectLabels += "labels, " + } else { + for _, tag := range groupTags { + selectLabels += fmt.Sprintf("JSONExtractString(labels, '%s') as %s, ", tag.Key, tag.Key) + } + } + + // The table JOIN key always exists + selectLabels += "fingerprint" + + filterSubQuery := fmt.Sprintf( + "SELECT DISTINCT %s FROM %s.%s WHERE %s", + selectLabels, + constants.SIGNOZ_METRIC_DBNAME, + tableName, + whereClause, + ) + + return filterSubQuery, nil +} diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index 37514b6f23..80a3a07422 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -572,8 +572,8 @@ func TestQueryRange(t *testing.T) { } q := NewQuerier(opts) expectedTimeRangeInQueryString := []string{ - fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms < %d", 1675115520000, 1675115580000+120*60*1000), - fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms < %d", 1675115520000+120*60*1000, 1675115580000+180*60*1000), + fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000), + fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000+120*60*1000, 1675115580000+180*60*1000), fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", 1675115580000*1000000, (1675115580000+120*60*1000)*int64(1000000)), fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)), } @@ -683,7 +683,7 @@ func TestQueryRangeValueType(t *testing.T) { q := NewQuerier(opts) // No caching expectedTimeRangeInQueryString := []string{ - fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms < %d", 1675115520000, 1675115580000+120*60*1000), + fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000), fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)), } diff --git a/pkg/query-service/app/queryBuilder/query_builder_test.go b/pkg/query-service/app/queryBuilder/query_builder_test.go index 4291bf2407..3d170a7255 100644 --- a/pkg/query-service/app/queryBuilder/query_builder_test.go +++ b/pkg/query-service/app/queryBuilder/query_builder_test.go @@ -27,6 +27,7 @@ func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) { {Key: v3.AttributeKey{Key: "in"}, Value: []interface{}{"a", "b", "c"}, Operator: v3.FilterOperatorIn}, }}, AggregateOperator: v3.AggregateOperatorRateMax, + Temporality: v3.Cumulative, Expression: "A", }, "B": { @@ -35,6 +36,7 @@ func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) { AggregateAttribute: v3.AttributeKey{Key: "name2"}, DataSource: v3.DataSourceMetrics, AggregateOperator: v3.AggregateOperatorRateAvg, + Temporality: v3.Cumulative, Expression: "B", }, "C": { @@ -55,7 +57,7 @@ func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) { require.NoError(t, err) require.Contains(t, queries["C"], "SELECT A.`ts` as `ts`, A.value / B.value") - require.Contains(t, queries["C"], "WHERE metric_name = 'name' AND temporality IN ['Cumulative', 'Unspecified'] AND JSONExtractString(labels, 'in') IN ['a','b','c']") + require.Contains(t, queries["C"], "WHERE metric_name = 'name' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'in') IN ['a','b','c']") require.Contains(t, queries["C"], "(value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window)))") }) } @@ -257,7 +259,7 @@ func TestDeltaQueryBuilder(t *testing.T) { }, }, queryToTest: "A", - expected: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts", + expected: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts", }, { name: "TestQueryWithExpression - Error rate", @@ -327,7 +329,7 @@ func TestDeltaQueryBuilder(t *testing.T) { }, }, queryToTest: "C", - expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`", + expected: "SELECT A.`ts` as `ts`, A.value * 100 / B.value as value FROM (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, 'status_code') IN ['STATUS_CODE_ERROR'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as A INNER JOIN (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_count' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND JSONExtractString(labels, 'service_name') IN ['frontend'] AND JSONExtractString(labels, 'operation') IN ['HTTP GET /dispatch'] AND JSONExtractString(labels, '__temporality__') = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_count' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY ts ORDER BY ts) as B ON A.`ts` = B.`ts`", }, { name: "TestQuery - Quantile", @@ -355,7 +357,7 @@ func TestDeltaQueryBuilder(t *testing.T) { }, }, queryToTest: "A", - expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' ) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", + expected: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) as value FROM (SELECT service_name,le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli <= 1651078380000 GROUP BY service_name,le,ts ORDER BY service_name ASC,le ASC, ts) GROUP BY service_name,ts ORDER BY service_name ASC, ts", }, } diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index fe2ad8c86c..6e398a42e0 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -206,12 +206,9 @@ var GroupByColMap = map[string]struct{}{ const ( SIGNOZ_METRIC_DBNAME = "signoz_metrics" - SIGNOZ_SAMPLES_TABLENAME = "distributed_samples_v2" SIGNOZ_SAMPLES_V4_TABLENAME = "distributed_samples_v4" - SIGNOZ_TIMESERIES_TABLENAME = "distributed_time_series_v2" SIGNOZ_TRACE_DBNAME = "signoz_traces" SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2" - SIGNOZ_TIMESERIES_LOCAL_TABLENAME = "time_series_v2" SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4" SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs" SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day"